import time from logger_config import logger from config import get_search_keywords_from_db, has_running_task import main as main_module # 无任务时的轮询间隔(秒)。仅在当前轮没有触发采集时才会休眠。 POLL_SECONDS = 600 # 有任务正在执行时的等待间隔(秒)。用于“等待执行结束后尽快续跑”。 RUNNING_WAIT_SECONDS = 5 PLATFORM = 9 def tick(platform: int = PLATFORM) -> str: """ 执行一轮调度检查。 设计边界: - 仅负责“是否触发采集”的判定,不负责具体采集流程。 - 采集执行入口固定为 `main.main()`,本函数不拆分任务。 参数约束: - platform: 平台标识,需与任务表中的 platform 字段一致。 返回约定: - "executed": 本轮发现待执行任务并已触发采集入口。 - "running": 检测到执行中任务,本轮不触发新采集。 - "idle": 无待执行任务或调度层异常。 """ try: # 先检查是否已有执行中任务,避免同平台并发触发重复采集。 if has_running_task(platform=platform): logger.info( f"[调度器] 检测到执行中任务(status=2, platform={platform}),本轮不触发采集。" ) return "running" # 读取当前平台待执行任务,仅用数量判断“是否需要触发本轮采集”。 tasks = get_search_keywords_from_db(platform=platform) task_count = len(tasks) logger.info(f"[调度器] 当前待执行任务数: {task_count}") if task_count == 0: return "idle" # 调用主采集入口。具体按任务逐个执行的控制由 main.py 内部负责。 logger.info(f"[调度器] 发现待执行任务(platform={platform}),触发 main.main()") main_module.main() return "executed" except Exception as e: # 调度层异常统一降级为“本轮未触发采集”,交由外层循环继续下一轮。 logger.error(f"[调度器] 本轮调度执行失败: {str(e)}") return "idle" def run_scheduler(interval_seconds: int = POLL_SECONDS, platform: int = PLATFORM) -> None: """ 启动持续调度循环。 调度策略: - 若本轮触发了采集(tick 返回 "executed"),立即进入下一轮,不做固定休眠。 - 若检测到执行中任务(tick 返回 "running"),短间隔等待后重试,直到执行中任务结束。 - 若无待执行任务(tick 返回 "idle"),休眠 `interval_seconds` 后再轮询。 可见副作用: - 会持续写调度日志。 - 触发采集时会进入 main.py 的浏览器与数据库流程。 """ logger.info( f"[调度器] 已启动,platform={platform},无任务时轮询间隔={interval_seconds}秒。" ) while True: tick_state = tick(platform=platform) if tick_state == "executed": logger.info("[调度器] 本轮已执行任务,立即进入下一轮。") continue if tick_state == "running": logger.info(f"[调度器] 存在执行中任务,{RUNNING_WAIT_SECONDS}秒后再次检查。") time.sleep(RUNNING_WAIT_SECONDS) continue logger.info(f"[调度器] 无任务,{interval_seconds}秒后再次轮询。") time.sleep(interval_seconds) if __name__ == "__main__": run_scheduler()