start_run_jd.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import json
  2. import time
  3. import requests
  4. from commons.Logger import logger
  5. from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
  6. from spiders.jd.jd_auto_crawl import JdCrawlerV2
  7. import schedule
  8. from commons.feishu_webhook import send_text
  9. platform_name = "京东"
  10. class JdMain:
  11. def __init__(self):
  12. # self.db_online = MySQLPool39()
  13. self.db_online = MySQLPoolOnline()
  14. self.crawl_count = ""
  15. self.task_id = ""
  16. self.task_dict = self.get_task()
  17. def get_status(self, status):
  18. if status not in (2, 3, 4):
  19. logger.warning(f"未知状态值: {status}, 跳过状态上报")
  20. return
  21. if status == 2:
  22. parmas = {
  23. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
  24. "start_time": int(time.time())
  25. }
  26. if status == 3:
  27. parmas = {
  28. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1,
  29. "real_count": self.crawl_count, "end_time": int(time.time()),
  30. }
  31. if status == 4:
  32. parmas = {
  33. "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
  34. "end_time": int(time.time())}
  35. # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report"
  36. url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
  37. try:
  38. res = requests.get(url, params=parmas, timeout=20)
  39. res.raise_for_status()
  40. logger.info("状态上报: %s", res.text)
  41. except Exception as e:
  42. logger.warning(f"状态上报失败: {e}")
  43. def get_task(self):
  44. """ 获取任务 """
  45. sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=2 and `status`=1 limit 1 """
  46. task_list = self.db_online.select_data(sql_task)
  47. if not task_list:
  48. return {}
  49. task_dict = task_list[0]
  50. self.task_id = task_dict["id"]
  51. return task_dict
  52. def run(self):
  53. self.task_dict = self.get_task()
  54. if not self.task_dict:
  55. logger.info(f"{platform_name}暂无任务")
  56. return
  57. print(self.task_dict)
  58. self.get_status(2)
  59. self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run()
  60. send_text(
  61. f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")}, 爬取数据: {self.crawl_count}条""")
  62. if is_success:
  63. self.get_status(3)
  64. else:
  65. self.get_status(4)
  66. def run_schedule_task():
  67. """执行淘宝爬虫任务"""
  68. H = time.strftime("%H")
  69. if int(H) < 7:
  70. return
  71. try:
  72. logger.info(f"开始执行{platform_name}爬虫任务")
  73. JdMain().run()
  74. logger.info(f"{platform_name}爬虫任务执行完成")
  75. except Exception as e:
  76. logger.error(f"{platform_name}爬虫任务执行失败: {e}", exc_info=True)
  77. if __name__ == '__main__':
  78. # 每10分钟执行一次
  79. JdMain().run()
  80. schedule.every(20).minutes.do(run_schedule_task)
  81. logger.info(f"定时任务已启动,每10分钟执行一次{platform_name}爬虫")
  82. while True:
  83. schedule.run_pending()
  84. time.sleep(3)