collect_schedule_runner.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. """定时从调度库拉取任务、执行爬虫并上报状态的通用入口。"""
  2. import random
  3. import time
  4. import json
  5. import requests
  6. from commons.Logger import logger
  7. from commons.conn_mysql import MySQLPoolOnline
  8. from commons.feishu_webhook import send_text
  9. RESULT_REPORT_URL = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
  10. _TASK_SELECT = (
  11. "select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,"
  12. "`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` "
  13. "from `retrieve_collect_task_allocate` where `platform`=%s and `status`=1 limit 1"
  14. )
  15. class CollectScheduleRunner:
  16. """按平台 ID 拉取一条待执行任务,运行爬虫并上报结果。"""
  17. def __init__(self, platform_name, platform_id, spider_cls):
  18. self.platform_name = platform_name
  19. self.platform_id = platform_id
  20. self.spider_cls = spider_cls
  21. self.db_online = MySQLPoolOnline()
  22. self.crawl_count = ""
  23. self.task_id = ""
  24. self.task_dict = None
  25. def _report_status(self, status):
  26. if status not in (2, 3, 4):
  27. logger.warning("未知状态值: %s, 跳过状态上报", status)
  28. return
  29. now = int(time.time())
  30. if status == 2:
  31. params = {
  32. "collect_task_allocate_id": self.task_id,
  33. "status": status,
  34. "finish_status": 0,
  35. "start_time": now,
  36. }
  37. elif status == 3:
  38. params = {
  39. "collect_task_allocate_id": self.task_id,
  40. "status": status,
  41. "finish_status": 1,
  42. "real_count": self.crawl_count,
  43. "end_time": now,
  44. }
  45. else:
  46. params = {
  47. "collect_task_allocate_id": self.task_id,
  48. "status": status,
  49. "finish_status": 0,
  50. "end_time": now,
  51. }
  52. try:
  53. res = requests.get(RESULT_REPORT_URL, params=params, timeout=20)
  54. res.raise_for_status()
  55. logger.debug("状态上报: %s", res.text[:500])
  56. except Exception as e:
  57. logger.warning("状态上报失败: %s", e)
  58. def get_task(self):
  59. task_list = self.db_online.select_data(_TASK_SELECT, (self.platform_id,))
  60. if not task_list:
  61. return {}
  62. task_dict = task_list[0]
  63. self.task_id = task_dict["id"]
  64. logger.info(json.dumps(task_dict))
  65. return task_dict
  66. def heartbeat_task(self):
  67. url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
  68. params = {
  69. "collect_task_allocate_id": self.task_id,
  70. }
  71. try:
  72. res = requests.get(url, params=params, timeout=20)
  73. logger.info("心跳任务上报成功")
  74. except Exception as e:
  75. logger.info("心跳任务上报失败")
  76. def run(self):
  77. self.task_dict = self.get_task()
  78. if not self.task_dict:
  79. logger.info("%s暂无任务", self.platform_name)
  80. return
  81. self._report_status(2)
  82. self.crawl_count, is_success = self.spider_cls(self.task_dict).run()
  83. self.heartbeat_task()
  84. send_text(
  85. f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n"
  86. f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, "
  87. f"爬取数据: {self.crawl_count}条"
  88. )
  89. self._report_status(3 if is_success else 4)
  90. def run_scheduled_loop(
  91. platform_name,
  92. platform_id,
  93. spider_cls,
  94. *,
  95. interval_minutes=5,
  96. sleep_seconds=3,
  97. ):
  98. """循环拉取任务并执行爬虫;每轮结束后休眠 interval_minutes 分钟。"""
  99. idle_seconds = random.randint(180,300)
  100. logger.info(
  101. "循环任务已启动,平台=%s,每轮间隔 %s 秒",
  102. platform_name,
  103. idle_seconds,
  104. )
  105. while True:
  106. try:
  107. logger.info("开始执行%s爬虫任务", platform_name)
  108. CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
  109. logger.info("%s爬虫任务执行完成", platform_name)
  110. except Exception as e:
  111. logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True)
  112. time.sleep(idle_seconds)