| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import json
- import time
- import requests
- from commons.Logger import logger
- from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
- from spiders.taobao.taobao_crawl import TaobaoCrawl
- import schedule
- from commons.feishu_webhook import send_text
- platform_name = "淘宝"
- class TaobaoMain:
- def __init__(self):
- # self.db_online = MySQLPool39()
- self.db_online = MySQLPoolOnline()
- self.crawl_count = ""
- self.task_id = ""
- self.task_dict = None
- def get_status(self, status):
- if status not in (2, 3, 4):
- logger.warning(f"未知状态值: {status}, 跳过状态上报")
- return
- if status == 2:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
- "start_time": int(time.time())
- }
- if status == 3:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1,
- "real_count": self.crawl_count, "end_time": int(time.time()),
- }
- if status == 4:
- parmas = {
- "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
- "end_time": int(time.time())}
- # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report"
- url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
- try:
- res = requests.get(url, params=parmas, timeout=20)
- res.raise_for_status()
- logger.info("状态上报: %s", res.text[:500])
- except Exception as e:
- logger.warning(f"状态上报失败: {e}")
- def get_task(self):
- """ 获取任务 """
- sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=1 and `status`=1 limit 1 """
- task_list = self.db_online.select_data(sql_task)
- if not task_list:
- return {}
- task_dict = task_list[0]
- self.task_id = task_dict["id"]
- print(task_dict)
- return task_dict
- def run(self):
- self.task_dict = self.get_task()
- if not self.task_dict:
- logger.info(f"{platform_name}暂无任务")
- return
- logger.info(self.task_dict)
- self.get_status(2)
- # self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
- # send_text(
- # f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
- # if is_success:
- # self.get_status(3)
- # else:
- # self.get_status(4)
- def run_schedule_task():
- """执行淘宝爬虫任务"""
- H = time.strftime("%H")
- if int(H) < 7:
- return
- try:
- logger.info(f"开始执行{platform_name}爬虫任务")
- TaobaoMain().run()
- logger.info(f"{platform_name}爬虫任务执行完成")
- except Exception as e:
- logger.error(f"{platform_name}爬虫任务执行失败: {e}", exc_info=True)
- if __name__ == '__main__':
- TaobaoMain().run()
- # 每10分钟执行一次
- schedule.every(30).minutes.do(run_schedule_task)
- logger.info(f"定时任务已启动,每30分钟执行一次{platform_name}爬虫")
- while True:
- schedule.run_pending()
- time.sleep(3)
|