"""定时从调度库拉取任务、执行爬虫并上报状态的通用入口。""" import time import json import requests import schedule from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline from commons.feishu_webhook import send_text RESULT_REPORT_URL = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" _TASK_SELECT = ( "select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`," "`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` " "from `retrieve_collect_task_allocate` where `platform`=%s and `status`=1 limit 1" ) class CollectScheduleRunner: """按平台 ID 拉取一条待执行任务,运行爬虫并上报结果。""" def __init__(self, platform_name, platform_id, spider_cls): self.platform_name = platform_name self.platform_id = platform_id self.spider_cls = spider_cls self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = None def _report_status(self, status): if status not in (2, 3, 4): logger.warning("未知状态值: %s, 跳过状态上报", status) return now = int(time.time()) if status == 2: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": now, } elif status == 3: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": now, } else: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": now, } try: res = requests.get(RESULT_REPORT_URL, params=params, timeout=20) res.raise_for_status() logger.debug("状态上报: %s", res.text[:500]) except Exception as e: logger.warning("状态上报失败: %s", e) def get_task(self): task_list = self.db_online.select_data(_TASK_SELECT, (self.platform_id,)) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] logger.info(json.dumps(task_dict)) return task_dict def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info("%s暂无任务", self.platform_name) return # self._report_status(2) self.crawl_count, is_success = self.spider_cls(self.task_dict).run() # send_text( # f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n" # f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, " # f"爬取数据: {self.crawl_count}条" # ) # self._report_status(3 if is_success else 4) def run_scheduled_loop(platform_name, platform_id, spider_cls, *, interval_minutes=5, sleep_seconds=3, ): """先立即跑一轮,再按间隔定时执行。""" def scheduled_job(): try: logger.info("开始执行%s爬虫任务", platform_name) CollectScheduleRunner(platform_name, platform_id, spider_cls).run() logger.info("%s爬虫任务执行完成", platform_name) except Exception as e: logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True) CollectScheduleRunner(platform_name, platform_id, spider_cls).run() schedule.every(interval_minutes).minutes.do(scheduled_job) logger.info("定时任务已启动,每%s分钟执行一次%s爬虫", interval_minutes, platform_name) while True: schedule.run_pending() time.sleep(sleep_seconds)