import json import time import requests from commons.Logger import logger from commons.conn_mysql import MySQLPoolOnline, MySQLPool39 from spiders.taobao.taobao_crawl import TaobaoCrawl import schedule from commons.feishu_webhook import send_text platform_name = "淘宝" class TaobaoMain: def __init__(self): # self.db_online = MySQLPool39() self.db_online = MySQLPoolOnline() self.crawl_count = "" self.task_id = "" self.task_dict = None def get_status(self, status): if status not in (2, 3, 4): logger.warning(f"未知状态值: {status}, 跳过状态上报") return if status == 2: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "start_time": int(time.time()) } if status == 3: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1, "real_count": self.crawl_count, "end_time": int(time.time()), } if status == 4: parmas = { "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0, "end_time": int(time.time())} # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report" url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report" try: res = requests.get(url, params=parmas, timeout=20) res.raise_for_status() logger.info("状态上报: %s", res.text[:500]) except Exception as e: logger.warning(f"状态上报失败: {e}") def get_task(self): """ 获取任务 """ sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=1 and `status`=1 limit 1 """ task_list = self.db_online.select_data(sql_task) if not task_list: return {} task_dict = task_list[0] self.task_id = task_dict["id"] print(task_dict) return task_dict def run(self): self.task_dict = self.get_task() if not self.task_dict: logger.info(f"{platform_name}暂无任务") return logger.info(self.task_dict) self.get_status(2) # self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run() # send_text( # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""") # if is_success: # self.get_status(3) # else: # self.get_status(4) def run_schedule_task(): """执行淘宝爬虫任务""" H = time.strftime("%H") if int(H) < 7: return try: logger.info(f"开始执行{platform_name}爬虫任务") TaobaoMain().run() logger.info(f"{platform_name}爬虫任务执行完成") except Exception as e: logger.error(f"{platform_name}爬虫任务执行失败: {e}", exc_info=True) if __name__ == '__main__': TaobaoMain().run() # 每10分钟执行一次 schedule.every(30).minutes.do(run_schedule_task) logger.info(f"定时任务已启动,每30分钟执行一次{platform_name}爬虫") while True: schedule.run_pending() time.sleep(3)