zhuoyuncheng 1 долоо хоног өмнө
parent
commit
90da3314f0

+ 37 - 19
commons/collect_schedule_runner.py

@@ -1,9 +1,8 @@
 """定时从调度库拉取任务、执行爬虫并上报状态的通用入口。"""
-
+import random
 import time
 import json
 import requests
-import schedule
 
 from commons.Logger import logger
 from commons.conn_mysql import MySQLPoolOnline
@@ -74,35 +73,54 @@ class CollectScheduleRunner:
         logger.info(json.dumps(task_dict))
         return task_dict
 
+    def heartbeat_task(self):
+        url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
+        params = {
+            "collect_task_allocate_id": self.task_id,
+        }
+        try:
+            res = requests.get(RESULT_REPORT_URL, params=params, timeout=20)
+            logger.info("心跳任务上报成功")
+        except Exception as e:
+            logger.info("心跳任务上报失败")
+
     def run(self):
         self.task_dict = self.get_task()
         if not self.task_dict:
             logger.info("%s暂无任务", self.platform_name)
             return
-        # self._report_status(2)
+        self._report_status(2)
         self.crawl_count, is_success = self.spider_cls(self.task_dict).run()
-        # send_text(
-        #     f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n"
-        #     f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, "
-        #     f"爬取数据: {self.crawl_count}条"
-        # )
-        # self._report_status(3 if is_success else 4)
 
+        self.heartbeat_task()
+        send_text(
+            f"{time.strftime('%Y-%m-%d %H:%M:%S')} 通知:\n"
+            f"平台: {self.platform_name}, 药品: {self.task_dict.get('product_name')}, "
+            f"爬取数据: {self.crawl_count}条"
+        )
+        self._report_status(3 if is_success else 4)
 
-def run_scheduled_loop(platform_name, platform_id, spider_cls, *, interval_minutes=5, sleep_seconds=3, ):
-    """先立即跑一轮,再按间隔定时执行。"""
 
-    def scheduled_job():
+def run_scheduled_loop(
+    platform_name,
+    platform_id,
+    spider_cls,
+    *,
+    interval_minutes=5,
+    sleep_seconds=3,
+):
+    """循环拉取任务并执行爬虫;每轮结束后休眠 interval_minutes 分钟。"""
+    idle_seconds = random.randint(180,300)
+    logger.info(
+        "循环任务已启动,平台=%s,每轮间隔 %s 秒",
+        platform_name,
+        idle_seconds,
+    )
+    while True:
         try:
             logger.info("开始执行%s爬虫任务", platform_name)
             CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
             logger.info("%s爬虫任务执行完成", platform_name)
         except Exception as e:
             logger.error("%s爬虫任务执行失败: %s", platform_name, e, exc_info=True)
-
-    CollectScheduleRunner(platform_name, platform_id, spider_cls).run()
-    schedule.every(interval_minutes).minutes.do(scheduled_job)
-    logger.info("定时任务已启动,每%s分钟执行一次%s爬虫", interval_minutes, platform_name)
-    while True:
-        schedule.run_pending()
-        time.sleep(sleep_seconds)
+        time.sleep(idle_seconds)

+ 20 - 20
start_run_jd.py

@@ -6,6 +6,7 @@ from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
 from spiders.jd.jd_auto_crawl import JdCrawlerV2
 import schedule
 from commons.feishu_webhook import send_text
+import random
 
 platform_name = "京东"
 
@@ -58,14 +59,28 @@ class JdMain:
         self.task_id = task_dict["id"]
         return task_dict
 
+    def heartbeat_task(self):
+        url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
+        params = {
+            "collect_task_allocate_id": self.task_id,
+        }
+        try:
+            res = requests.get(url, params=params, timeout=20)
+            logger.info("心跳任务上报成功")
+        except Exception as e:
+            logger.info("心跳任务上报失败")
+
     def run(self):
         self.task_dict = self.get_task()
         if not self.task_dict:
             logger.info(f"{platform_name}暂无任务")
             return
-        print(self.task_dict)
+
+        logger.info(json.dumps(self.task_dict))
         self.get_status(2)
         self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run()
+
+        self.heartbeat_task()
         send_text(
             f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")},  爬取数据: {self.crawl_count}条""")
         if is_success:
@@ -74,25 +89,10 @@ class JdMain:
             self.get_status(4)
 
 
-def run_schedule_task():
-    """执行淘宝爬虫任务"""
-    H = time.strftime("%H")
-    if int(H) < 7:
-        return
-    try:
-        logger.info(f"开始执行{platform_name}爬虫任务")
-        JdMain().run()
-        logger.info(f"{platform_name}爬虫任务执行完成")
-    except Exception as e:
-        logger.error(f"{platform_name}爬虫任务执行失败: {e}", exc_info=True)
-
-
 if __name__ == '__main__':
     # 每10分钟执行一次
-    JdMain().run()
-    schedule.every(20).minutes.do(run_schedule_task)
-
-    logger.info(f"定时任务已启动,每10分钟执行一次{platform_name}爬虫")
     while True:
-        schedule.run_pending()
-        time.sleep(3)
+        JdMain().run()
+        interval_time = random.randint(1200, 1800)
+        logger.info(f"程序睡眠{interval_time}秒后继续执行")
+        time.sleep(interval_time)

+ 24 - 24
start_run_taobao.py

@@ -1,4 +1,5 @@
 import json
+import random
 import time
 import requests
 from commons.Logger import logger
@@ -46,6 +47,17 @@ class TaobaoMain:
         except Exception as e:
             logger.warning(f"状态上报失败: {e}")
 
+    def heartbeat_task(self):
+        url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
+        params = {
+            "collect_task_allocate_id": self.task_id,
+        }
+        try:
+            res = requests.get(url, params=params, timeout=20)
+            logger.info("心跳任务上报成功")
+        except Exception as e:
+            logger.info(f"心跳任务上报失败{str(e)}")
+
     def get_task(self):
         """ 获取任务 """
         sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=1 and `status`=1 limit 1 """
@@ -65,33 +77,21 @@ class TaobaoMain:
             return
         logger.info(self.task_dict)
         self.get_status(2)
-        # self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
-        # send_text(
-        #     f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
-        # if is_success:
-        #     self.get_status(3)
-        # else:
-        #     self.get_status(4)
+        self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
+        self.heartbeat_task()
 
-
-def run_schedule_task():
-    """执行淘宝爬虫任务"""
-    H = time.strftime("%H")
-    if int(H) < 7:
-        return
-    try:
-        logger.info(f"开始执行{platform_name}爬虫任务")
-        TaobaoMain().run()
-        logger.info(f"{platform_name}爬虫任务执行完成")
-    except Exception as e:
-        logger.error(f"{platform_name}爬虫任务执行失败: {e}", exc_info=True)
+        send_text(
+            f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
+        if is_success:
+            self.get_status(3)
+        else:
+            self.get_status(4)
 
 
 if __name__ == '__main__':
-    TaobaoMain().run()
     # 每10分钟执行一次
-    schedule.every(30).minutes.do(run_schedule_task)
-    logger.info(f"定时任务已启动,每30分钟执行一次{platform_name}爬虫")
     while True:
-        schedule.run_pending()
-        time.sleep(3)
+        TaobaoMain().run()
+        interval_time = random.randint(1200, 1800)
+        logger.info(f"程序睡眠{interval_time}秒后继续执行")
+        time.sleep(interval_time)