collect_schedule_runner.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """定时从调度库拉取任务、执行爬虫并上报状态的通用入口。"""
  2. import time
  3. import json
  4. import requests
  5. import schedule
  6. from commons.Logger import logger
  7. from commons.conn_mysql import MySQLPoolOnline
  8. from commons.feishu_webhook import send_text
  9. RESULT_REPORT_URL = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
  10. _TASK_SELECT = (
  11. "select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,"
  12. "`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` "
  13. "from `retrieve_collect_task_allocate` where `platform`=%s and `status`=1 limit 1"
  14. )
  15. class CollectScheduleRunner:
  16. """按平台 ID 拉取一条待执行任务,运行爬虫并上报结果。"""
  17. def __init__(self, platform_name, platform_id, spider_cls):
  18. self.platform_name = platform_name
  19. self.platform_id = platform_id
  20. self.spider_cls = spider_cls
  21. self.db_online = MySQLPoolOnline()
  22. self.crawl_count = ""
  23. self.task_id = ""
  24. self.task_dict = None
  25. def _report_status(self, status):
  26. if status not in (2, 3, 4):
  27. logger.warning("未知状态值: %s, 跳过状态上报", status)
  28. return
  29. now = int(time.time())
  30. if status == 2:
  31. params = {
  32. "collect_task_allocate_id": self.task_id,
  33. "status": status,
  34. "finish_status": 0,
  35. "start_time": now,
  36. }
  37. elif status == 3:
  38. params = {
  39. "collect_task_allocate_id": self.task_id,
  40. "status": status,
  41. "finish_status": 1,
  42. "real_count": self.crawl_count,
  43. "end_time": now,
  44. }
  45. else:
  46. params = {
  47. "collect_task_allocate_id": self.task_id,
  48. "status": status,
  49. "finish_status": 0,
  50. "end_time": now,
  51. }
  52. try:
  53. res = requests.get(RESULT_REPORT_URL, params=params, timeout=20)
  54. res.raise_for_status()
  55. logger.debug("状态上报: %s", res.text[:500])
  56. except Exception as e:
  57. logger.warning("状态上报失败: %s", e)
  58. def get_task(self):
  59. task_list = self.db_online.select_data(_TASK_SELECT, (self.platform_id,))
  60. if not task_list:
  61. return {}
  62. task_dict = task_list[0]
  63. self.task_id = task_dict["id"]
  64. logger.info(json.dumps(task_dict))
  65. return task_dict
  66. def run(self):
  67. self.task_dict = self.get_task()
  68. if not self.task_dict:
  69. logger.info("%s暂无任务", self.platform_name)
  70. return
  71. # self._report_status(2)
  72. self.crawl_count, is_success = self.spider_cls(self.task_dict).run()
  73. # send_text(
  74. # f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n"
  75. # f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, "
  76. # f"爬取数据: {self.crawl_count}条"
  77. # )
  78. # self._report_status(3 if is_success else 4)
  79. def run_scheduled_loop(platform_name, platform_id, spider_cls, *, interval_minutes=5, sleep_seconds=3, ):
  80. """先立即跑一轮,再按间隔定时执行。"""
  81. def scheduled_job():
  82. try:
  83. logger.info("开始执行%s爬虫任务", platform_name)
  84. CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
  85. logger.info("%s爬虫任务执行完成", platform_name)
  86. except Exception as e:
  87. logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True)
  88. CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
  89. schedule.every(interval_minutes).minutes.do(scheduled_job)
  90. logger.info("定时任务已启动,每%s分钟执行一次%s爬虫", interval_minutes, platform_name)
  91. while True:
  92. schedule.run_pending()
  93. time.sleep(sleep_seconds)