| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- """定时从调度库拉取任务、执行爬虫并上报状态的通用入口。"""
- import time
- import json
- import requests
- import schedule
- from commons.Logger import logger
- from commons.conn_mysql import MySQLPoolOnline
- from commons.feishu_webhook import send_text
- RESULT_REPORT_URL = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
- _TASK_SELECT = (
- "select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,"
- "`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` "
- "from `retrieve_collect_task_allocate` where `platform`=%s and `status`=1 limit 1"
- )
- class CollectScheduleRunner:
- """按平台 ID 拉取一条待执行任务,运行爬虫并上报结果。"""
- def __init__(self, platform_name, platform_id, spider_cls):
- self.platform_name = platform_name
- self.platform_id = platform_id
- self.spider_cls = spider_cls
- self.db_online = MySQLPoolOnline()
- self.crawl_count = ""
- self.task_id = ""
- self.task_dict = None
- def _report_status(self, status):
- if status not in (2, 3, 4):
- logger.warning("未知状态值: %s, 跳过状态上报", status)
- return
- now = int(time.time())
- if status == 2:
- params = {
- "collect_task_allocate_id": self.task_id,
- "status": status,
- "finish_status": 0,
- "start_time": now,
- }
- elif status == 3:
- params = {
- "collect_task_allocate_id": self.task_id,
- "status": status,
- "finish_status": 1,
- "real_count": self.crawl_count,
- "end_time": now,
- }
- else:
- params = {
- "collect_task_allocate_id": self.task_id,
- "status": status,
- "finish_status": 0,
- "end_time": now,
- }
- try:
- res = requests.get(RESULT_REPORT_URL, params=params, timeout=20)
- res.raise_for_status()
- logger.debug("状态上报: %s", res.text[:500])
- except Exception as e:
- logger.warning("状态上报失败: %s", e)
- def get_task(self):
- task_list = self.db_online.select_data(_TASK_SELECT, (self.platform_id,))
- if not task_list:
- return {}
- task_dict = task_list[0]
- self.task_id = task_dict["id"]
- logger.info(json.dumps(task_dict))
- return task_dict
- def run(self):
- self.task_dict = self.get_task()
- if not self.task_dict:
- logger.info("%s暂无任务", self.platform_name)
- return
- # self._report_status(2)
- self.crawl_count, is_success = self.spider_cls(self.task_dict).run()
- # send_text(
- # f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n"
- # f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, "
- # f"爬取数据: {self.crawl_count}条"
- # )
- # self._report_status(3 if is_success else 4)
- def run_scheduled_loop(platform_name, platform_id, spider_cls, *, interval_minutes=5, sleep_seconds=3, ):
- """先立即跑一轮,再按间隔定时执行。"""
- def scheduled_job():
- try:
- logger.info("开始执行%s爬虫任务", platform_name)
- CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
- logger.info("%s爬虫任务执行完成", platform_name)
- except Exception as e:
- logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True)
- CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
- schedule.every(interval_minutes).minutes.do(scheduled_job)
- logger.info("定时任务已启动,每%s分钟执行一次%s爬虫", interval_minutes, platform_name)
- while True:
- schedule.run_pending()
- time.sleep(sleep_seconds)
|