| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import json
- import time
- import requests
- from commons.Logger import logger
- from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
- from spiders.jd.jd_auto_crawl import JdCrawlerV2
- import schedule
- from commons.feishu_webhook import send_text
- import random
- from commons.config import JD_DEVICE_ID
- platform_name = "京东"
- class JdMain:
- def __init__(self):
- # self.db_online = MySQLPool39()
- self.db_online = MySQLPoolOnline()
- self.crawl_count = ""
- self.task_id = ""
- self.task_dict = self.get_task()
- def get_status(self, status):
- if status not in (2, 3, 4):
- logger.warning(f"未知状态值: {status}, 跳过状态上报")
- return
- if status == 2:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
- "start_time": int(time.time())
- }
- if status == 3:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1,
- "real_count": self.crawl_count, "end_time": int(time.time()),
- }
- if status == 4:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
- "end_time": int(time.time())}
- # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report"
- url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
- try:
- res = requests.get(url, params=parmas, timeout=20)
- res.raise_for_status()
- logger.info("状态上报: %s", res.text)
- except Exception as e:
- logger.warning(f"状态上报失败: {e}")
- def get_task(self):
- """获取当前设备绑定的京东待执行任务。"""
- sql = """
- SELECT t.*
- FROM `retrieve_collect_task_allocate` t
- INNER JOIN `retrieve_collect_equipment_account` a
- ON t.`collect_equipment_account_id` = a.`id`
- WHERE a.`device_id` = %s
- AND t.`platform` = 2
- AND t.`status` = 1
- LIMIT 1
- """
- task_list = self.db_online.select_data(sql, (JD_DEVICE_ID,))
- print(task_list)
- if not task_list:
- return {}
- task_dict = task_list[0]
- self.task_id = task_dict["id"]
- print(task_dict)
- return task_dict
- def heartbeat_task(self):
- url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
- params = {
- "collect_task_allocate_id": self.task_id,
- }
- try:
- res = requests.get(url, params=params, timeout=20)
- logger.info("心跳任务上报成功")
- except Exception as e:
- logger.info("心跳任务上报失败")
- def run(self):
- self.task_dict = self.get_task()
- if not self.task_dict:
- logger.info(f"{platform_name}暂无任务")
- return
- self.get_status(2)
- self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run()
- self.heartbeat_task()
- # send_text(
- # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")}, 爬取数据: {self.crawl_count}条""")
- if is_success:
- self.get_status(3)
- else:
- self.get_status(4)
- if __name__ == '__main__':
- # task_dict= {"id": 1622, 'collect_task_id': 4596, 'company_id': 8, 'product_name': '小儿氨酚烷胺颗粒',
- # 'product_specs': '', 'product_keyword': '', 'product_brand': '可复美', 'sampling_cycle': 1,
- # 'sampling_start_time': 1778083200, 'sampling_end_time': 1778342399, 'collect_equipment_account_id': 15,
- # 'collect_region_id': 0, 'collect_equipment_id': 25, 'collect_round': 2,"start_page":4,"end_page":10}
- # JdCrawlerV2(task_dict).run()
- # 每10分钟执行一次
- while True:
- JdMain().run()
- interval_time = random.randint(1200, 1800)
- logger.info(f"程序睡眠{interval_time}秒后继续执行")
- time.sleep(interval_time)
|