import json import time import requests from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline, MySQLPool39 from spiders.jd.jd_auto_crawl import JdCrawlerV2 import schedule from commons.feishu_webhook import send_text import random platform_name = "京东" class JdMain: def __init__(self): # self.db_online = MySQLPool39() self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = self.get_task() def get_status(self, status): if status not in (2, 3, 4): logger.warning(f"未知状态值: {status}, 跳过状态上报") return if status == 2: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": int(time.time()) } if status == 3: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": int(time.time()), } if status == 4: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": int(time.time())} # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report" url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" try: res = requests.get(url, params=parmas, timeout=20) res.raise_for_status() logger.info("状态上报: %s", res.text) except Exception as e: logger.warning(f"状态上报失败: {e}") def get_task(self): """ 获取任务 """ sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=2 and `status`=1 limit 1 """ task_list = self.db_online.select_data(sql_task) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] return task_dict def heartbeat_task(self): url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat" params = { "collect_task_allocate_id": self.task_id, } try: res = requests.get(url, params=params, timeout=20) logger.info("心跳任务上报成功") except Exception as e: logger.info("心跳任务上报失败") def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info(f"{platform_name}暂无任务") return logger.info(json.dumps(self.task_dict)) self.get_status(2) self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run() self.heartbeat_task() send_text( f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")}, 爬取数据: {self.crawl_count}条""") if is_success: self.get_status(3) else: self.get_status(4) if __name__ == '__main__': # 每10分钟执行一次 while True: JdMain().run() interval_time = random.randint(1200, 1800) logger.info(f"程序睡眠{interval_time}秒后继续执行") time.sleep(interval_time)