"""定时从调度库拉取任务、执行爬虫并上报状态的通用入口。""" import random import time import json import requests from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline from commons.feishu_webhook import send_text RESULT_REPORT_URL = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" _TASK_SELECT = ( "select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`," "`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` " "from `retrieve_collect_task_allocate` where `platform`=%s and `status`=1 limit 1" ) class CollectScheduleRunner: """按平台 ID 拉取一条待执行任务,运行爬虫并上报结果。""" def __init__(self, platform_name, platform_id, spider_cls): self.platform_name = platform_name self.platform_id = platform_id self.spider_cls = spider_cls self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = None def _report_status(self, status): if status not in (2, 3, 4): logger.warning("未知状态值: %s, 跳过状态上报", status) return now = int(time.time()) if status == 2: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": now, } elif status == 3: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": now, } else: params = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": now, } try: res = requests.get(RESULT_REPORT_URL, params=params, timeout=20) res.raise_for_status() logger.debug("状态上报: %s", res.text[:500]) except Exception as e: logger.warning("状态上报失败: %s", e) def get_task(self): task_list = self.db_online.select_data(_TASK_SELECT, (self.platform_id,)) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] logger.info(json.dumps(task_dict)) return task_dict def heartbeat_task(self): url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat" params = { "collect_task_allocate_id": self.task_id, } try: res = requests.get(RESULT_REPORT_URL, params=params, timeout=20) logger.info("心跳任务上报成功") except Exception as e: logger.info("心跳任务上报失败") def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info("%s暂无任务", self.platform_name) return self._report_status(2) self.crawl_count, is_success = self.spider_cls(self.task_dict).run() self.heartbeat_task() send_text( f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n" f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, " f"爬取数据: {self.crawl_count}条" ) self._report_status(3 if is_success else 4) def run_scheduled_loop( platform_name, platform_id, spider_cls, *, interval_minutes=5, sleep_seconds=3, ): """循环拉取任务并执行爬虫;每轮结束后休眠 interval_minutes 分钟。""" idle_seconds = random.randint(180,300) logger.info( "循环任务已启动,平台=%s,每轮间隔 %s 秒", platform_name, idle_seconds, ) while True: try: logger.info("开始执行%s爬虫任务", platform_name) CollectScheduleRunner(platform_name, platform_id, spider_cls).run() logger.info("%s爬虫任务执行完成", platform_name) except Exception as e: logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True) time.sleep(idle_seconds)