| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import time
- from logger_config import logger
- from config import get_search_keywords_from_db, has_running_task
- import main as main_module
- # 无任务时的轮询间隔(秒)。仅在当前轮没有触发采集时才会休眠。
- POLL_SECONDS = 600
- # 有任务正在执行时的等待间隔(秒)。用于“等待执行结束后尽快续跑”。
- RUNNING_WAIT_SECONDS = 5
- PLATFORM = 9
- def tick(platform: int = PLATFORM) -> str:
- """
- 执行一轮调度检查。
- 设计边界:
- - 仅负责“是否触发采集”的判定,不负责具体采集流程。
- - 采集执行入口固定为 `main.main()`,本函数不拆分任务。
- 参数约束:
- - platform: 平台标识,需与任务表中的 platform 字段一致。
- 返回约定:
- - "executed": 本轮发现待执行任务并已触发采集入口。
- - "running": 检测到执行中任务,本轮不触发新采集。
- - "idle": 无待执行任务或调度层异常。
- """
- try:
- # 先检查是否已有执行中任务,避免同平台并发触发重复采集。
- if has_running_task(platform=platform):
- logger.info(
- f"[调度器] 检测到执行中任务(status=2, platform={platform}),本轮不触发采集。"
- )
- return "running"
- # 读取当前平台待执行任务,仅用数量判断“是否需要触发本轮采集”。
- tasks = get_search_keywords_from_db(platform=platform)
- task_count = len(tasks)
- logger.info(f"[调度器] 当前待执行任务数: {task_count}")
- if task_count == 0:
- return "idle"
- # 调用主采集入口。具体按任务逐个执行的控制由 main.py 内部负责。
- logger.info(f"[调度器] 发现待执行任务(platform={platform}),触发 main.main()")
- main_module.main()
- return "executed"
- except Exception as e:
- # 调度层异常统一降级为“本轮未触发采集”,交由外层循环继续下一轮。
- logger.error(f"[调度器] 本轮调度执行失败: {str(e)}")
- return "idle"
- def run_scheduler(interval_seconds: int = POLL_SECONDS, platform: int = PLATFORM) -> None:
- """
- 启动持续调度循环。
- 调度策略:
- - 若本轮触发了采集(tick 返回 "executed"),立即进入下一轮,不做固定休眠。
- - 若检测到执行中任务(tick 返回 "running"),短间隔等待后重试,直到执行中任务结束。
- - 若无待执行任务(tick 返回 "idle"),休眠 `interval_seconds` 后再轮询。
- 可见副作用:
- - 会持续写调度日志。
- - 触发采集时会进入 main.py 的浏览器与数据库流程。
- """
- logger.info(
- f"[调度器] 已启动,platform={platform},无任务时轮询间隔={interval_seconds}秒。"
- )
- while True:
- tick_state = tick(platform=platform)
- if tick_state == "executed":
- logger.info("[调度器] 本轮已执行任务,立即进入下一轮。")
- continue
- if tick_state == "running":
- logger.info(f"[调度器] 存在执行中任务,{RUNNING_WAIT_SECONDS}秒后再次检查。")
- time.sleep(RUNNING_WAIT_SECONDS)
- continue
- logger.info(f"[调度器] 无任务,{interval_seconds}秒后再次轮询。")
- time.sleep(interval_seconds)
- if __name__ == "__main__":
- run_scheduler()
|