import json import time import requests from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline, MySQLPool39 from spiders.jd.jd_auto_crawl import JdCrawlerV2 import schedule from commons.feishu_webhook import send_text import random from commons.config import JD_DEVICE_ID platform_name = "京东" class JdMain: def __init__(self): # self.db_online = MySQLPool39() self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = self.get_task() def get_status(self, status): if status not in (2, 3, 4): logger.warning(f"未知状态值: {status}, 跳过状态上报") return if status == 2: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": int(time.time()) } if status == 3: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": int(time.time()), } if status == 4: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": int(time.time())} # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report" url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" try: res = requests.get(url, params=parmas, timeout=20) res.raise_for_status() logger.info("状态上报: %s", res.text) except Exception as e: logger.warning(f"状态上报失败: {e}") def get_task(self): """获取当前设备绑定的京东待执行任务。""" sql = """ SELECT t.* FROM `retrieve_collect_task_allocate` t INNER JOIN `retrieve_collect_equipment_account` a ON t.`collect_equipment_account_id` = a.`id` WHERE a.`device_id` = %s AND t.`platform` = 2 AND t.`status` = 1 LIMIT 1 """ task_list = self.db_online.select_data(sql, (JD_DEVICE_ID,)) print(task_list) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] print(task_dict) return task_dict def heartbeat_task(self): url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat" params = { "collect_task_allocate_id": self.task_id, } try: res = requests.get(url, params=params, timeout=20) logger.info("心跳任务上报成功") except Exception as e: logger.info("心跳任务上报失败") def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info(f"{platform_name}暂无任务") return self.get_status(2) self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run() self.heartbeat_task() # send_text( # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")}, 爬取数据: {self.crawl_count}条""") if is_success: self.get_status(3) else: self.get_status(4) if __name__ == '__main__': # task_dict= {"id": 1622, 'collect_task_id': 4596, 'company_id': 8, 'product_name': '小儿氨酚烷胺颗粒', # 'product_specs': '', 'product_keyword': '', 'product_brand': '可复美', 'sampling_cycle': 1, # 'sampling_start_time': 1778083200, 'sampling_end_time': 1778342399, 'collect_equipment_account_id': 15, # 'collect_region_id': 0, 'collect_equipment_id': 25, 'collect_round': 2,"start_page":4,"end_page":10} # JdCrawlerV2(task_dict).run() # 每10分钟执行一次 while True: JdMain().run() interval_time = random.randint(1200, 1800) logger.info(f"程序睡眠{interval_time}秒后继续执行") time.sleep(interval_time)