start_run_taobao.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import json
  2. import random
  3. import time
  4. import requests
  5. from commons.Logger import logger
  6. from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
  7. from spiders.taobao.taobao_crawl import TaobaoCrawl
  8. import schedule
  9. from commons.feishu_webhook import send_text
  10. from commons.config import TB_DEVICE_ID
  11. platform_name = "淘宝"
  12. class TaobaoMain:
  13. def __init__(self):
  14. # self.db_online = MySQLPool39()
  15. self.db_online = MySQLPoolOnline()
  16. self.crawl_count = ""
  17. self.task_id = ""
  18. self.task_dict = None
  19. def get_status(self, status):
  20. if status not in (2, 3, 4):
  21. logger.warning(f"未知状态值: {status}, 跳过状态上报")
  22. return
  23. if status == 2:
  24. parmas = {
  25. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
  26. "start_time": int(time.time())
  27. }
  28. if status == 3:
  29. parmas = {
  30. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1,
  31. "real_count": self.crawl_count, "end_time": int(time.time()),
  32. }
  33. if status == 4:
  34. parmas = {
  35. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
  36. "end_time": int(time.time())}
  37. # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report"
  38. url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
  39. try:
  40. res = requests.get(url, params=parmas, timeout=20)
  41. res.raise_for_status()
  42. logger.info("状态上报: %s", res.text[:500])
  43. except Exception as e:
  44. logger.warning(f"状态上报失败: {e}")
  45. def heartbeat_task(self):
  46. url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
  47. params = {
  48. "collect_task_allocate_id": self.task_id,
  49. }
  50. try:
  51. res = requests.get(url, params=params, timeout=20)
  52. logger.info("心跳任务上报成功")
  53. except Exception as e:
  54. logger.info(f"心跳任务上报失败{str(e)}")
  55. def get_task(self):
  56. """获取当前设备绑定的京东待执行任务。"""
  57. sql = """
  58. SELECT t.*
  59. FROM `retrieve_collect_task_allocate` t
  60. INNER JOIN `retrieve_collect_equipment_account` a
  61. ON t.`collect_equipment_account_id` = a.`id`
  62. WHERE a.`device_id` = %s
  63. AND t.`platform` = 1
  64. AND t.`status` = 1
  65. LIMIT 1
  66. """
  67. task_list = self.db_online.select_data(sql, (TB_DEVICE_ID,))
  68. if not task_list:
  69. return {}
  70. task_dict = task_list[0]
  71. self.task_id = task_dict["id"]
  72. return task_dict
  73. def run(self):
  74. self.task_dict = self.get_task()
  75. if not self.task_dict:
  76. logger.info(f"{platform_name}暂无任务")
  77. return
  78. # logger.info(self.task_dict)
  79. self.get_status(2)
  80. self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
  81. self.heartbeat_task()
  82. # send_text(
  83. # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
  84. if is_success:
  85. self.get_status(3)
  86. else:
  87. self.get_status(4)
  88. if __name__ == '__main__':
  89. # 每10分钟执行一次
  90. while True:
  91. TaobaoMain().run()
  92. interval_time = random.randint(1200, 1800)
  93. logger.info(f"程序睡眠{interval_time}秒后继续执行")
  94. time.sleep(interval_time)