scheduler.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import time
  2. from logger_config import logger
  3. from config import get_search_keywords_from_db, has_running_task
  4. import main as main_module
  5. # 无任务时的轮询间隔(秒)。仅在当前轮没有触发采集时才会休眠。
  6. POLL_SECONDS = 600
  7. # 有任务正在执行时的等待间隔(秒)。用于“等待执行结束后尽快续跑”。
  8. RUNNING_WAIT_SECONDS = 5
  9. PLATFORM = 9
  10. def tick(platform: int = PLATFORM) -> str:
  11. """
  12. 执行一轮调度检查。
  13. 设计边界:
  14. - 仅负责“是否触发采集”的判定,不负责具体采集流程。
  15. - 采集执行入口固定为 `main.main()`,本函数不拆分任务。
  16. 参数约束:
  17. - platform: 平台标识,需与任务表中的 platform 字段一致。
  18. 返回约定:
  19. - "executed": 本轮发现待执行任务并已触发采集入口。
  20. - "running": 检测到执行中任务,本轮不触发新采集。
  21. - "idle": 无待执行任务或调度层异常。
  22. """
  23. try:
  24. # 先检查是否已有执行中任务,避免同平台并发触发重复采集。
  25. if has_running_task(platform=platform):
  26. logger.info(
  27. f"[调度器] 检测到执行中任务(status=2, platform={platform}),本轮不触发采集。"
  28. )
  29. return "running"
  30. # 读取当前平台待执行任务,仅用数量判断“是否需要触发本轮采集”。
  31. tasks = get_search_keywords_from_db(platform=platform)
  32. task_count = len(tasks)
  33. logger.info(f"[调度器] 当前待执行任务数: {task_count}")
  34. if task_count == 0:
  35. return "idle"
  36. # 调用主采集入口。具体按任务逐个执行的控制由 main.py 内部负责。
  37. logger.info(f"[调度器] 发现待执行任务(platform={platform}),触发 main.main()")
  38. main_module.main()
  39. return "executed"
  40. except Exception as e:
  41. # 调度层异常统一降级为“本轮未触发采集”,交由外层循环继续下一轮。
  42. logger.error(f"[调度器] 本轮调度执行失败: {str(e)}")
  43. return "idle"
  44. def run_scheduler(interval_seconds: int = POLL_SECONDS, platform: int = PLATFORM) -> None:
  45. """
  46. 启动持续调度循环。
  47. 调度策略:
  48. - 若本轮触发了采集(tick 返回 "executed"),立即进入下一轮,不做固定休眠。
  49. - 若检测到执行中任务(tick 返回 "running"),短间隔等待后重试,直到执行中任务结束。
  50. - 若无待执行任务(tick 返回 "idle"),休眠 `interval_seconds` 后再轮询。
  51. 可见副作用:
  52. - 会持续写调度日志。
  53. - 触发采集时会进入 main.py 的浏览器与数据库流程。
  54. """
  55. logger.info(
  56. f"[调度器] 已启动,platform={platform},无任务时轮询间隔={interval_seconds}秒。"
  57. )
  58. while True:
  59. tick_state = tick(platform=platform)
  60. if tick_state == "executed":
  61. logger.info("[调度器] 本轮已执行任务,立即进入下一轮。")
  62. continue
  63. if tick_state == "running":
  64. logger.info(f"[调度器] 存在执行中任务,{RUNNING_WAIT_SECONDS}秒后再次检查。")
  65. time.sleep(RUNNING_WAIT_SECONDS)
  66. continue
  67. logger.info(f"[调度器] 无任务,{interval_seconds}秒后再次轮询。")
  68. time.sleep(interval_seconds)
  69. if __name__ == "__main__":
  70. run_scheduler()