import json import random import time import requests from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline, MySQLPool39 from spiders.taobao.taobao_crawl import TaobaoCrawl import schedule from commons.feishu_webhook import send_text from commons.config import TB_DEVICE_ID platform_name = "淘宝" class TaobaoMain: def __init__(self): # self.db_online = MySQLPool39() self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = None def get_status(self, status): if status not in (2, 3, 4): logger.warning(f"未知状态值: {status}, 跳过状态上报") return if status == 2: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": int(time.time()) } if status == 3: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": int(time.time()), } if status == 4: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": int(time.time())} # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report" url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" try: res = requests.get(url, params=parmas, timeout=20) res.raise_for_status() logger.info("状态上报: %s", res.text[:500]) except Exception as e: logger.warning(f"状态上报失败: {e}") def heartbeat_task(self): url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat" params = { "collect_task_allocate_id": self.task_id, } try: res = requests.get(url, params=params, timeout=20) logger.info("心跳任务上报成功") except Exception as e: logger.info(f"心跳任务上报失败{str(e)}") def get_task(self): """获取当前设备绑定的京东待执行任务。""" sql = """ SELECT t.* FROM `retrieve_collect_task_allocate` t INNER JOIN `retrieve_collect_equipment_account` a ON t.`collect_equipment_account_id` = a.`id` WHERE a.`device_id` = %s AND t.`platform` = 1 AND t.`status` = 1 LIMIT 1 """ task_list = self.db_online.select_data(sql, (TB_DEVICE_ID,)) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] return task_dict def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info(f"{platform_name}暂无任务") return # logger.info(self.task_dict) self.get_status(2) self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run() self.heartbeat_task() # send_text( # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""") if is_success: self.get_status(3) else: self.get_status(4) if __name__ == '__main__': # 每10分钟执行一次 while True: TaobaoMain().run() interval_time = random.randint(1200, 1800) logger.info(f"程序睡眠{interval_time}秒后继续执行") time.sleep(interval_time)