Przeglądaj źródła

京东淘宝修改增加读取设备调度

zhuoyuncheng 3 dni temu
rodzic
commit
27925f42e7

+ 323 - 156
spiders/jd/jd_auto_crawl.py

@@ -9,9 +9,10 @@ from urllib.parse import quote
 from DrissionPage import ChromiumPage, ChromiumOptions
 import json
 from commons.Logger import get_spider_logger
-from commons.conn_mysql import  MySQLPoolOnline
+from commons.conn_mysql import MySQLPoolOnline
 from pipelines.drug_pipelines import DrugPipeline
 from commons.feishu_webhook import send_text
+from spiders.jd.jd_captcha import handle_jd_slider_captcha
 
 logger = get_spider_logger("jd")
 
@@ -40,10 +41,14 @@ class JdCrawlerV2:
         self.db = MySQLPoolOnline()
         self.ip = None
         self.account_name = None
+        self.login_username = None
+        self.login_password = None
         self.platform = 2
         self.pipeline = DrugPipeline("jd")
         self.task_dict = drug_dict or {}
 
+        self.start_page = 1
+        self.end_page = 1
         if self.task_dict:
             self.get_product_data()
         self.success = True
@@ -64,6 +69,16 @@ class JdCrawlerV2:
         self.account_id = self.task_dict.get("collect_equipment_account_id", "")
         self.collect_region_id = self.task_dict.get("collect_region_id", "")
         self.collect_round = self.task_dict.get("collect_round", 1)
+        self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
+        self.end_page = self._parse_page(self.task_dict.get("end_page"), 15)
+
+    @staticmethod
+    def _parse_page(value, default=1):
+        try:
+            page = int(value)
+            return page if page >= 1 else default
+        except (TypeError, ValueError):
+            return default
 
     @staticmethod
     def _get_free_port():
@@ -92,7 +107,15 @@ class JdCrawlerV2:
             co.set_argument(f"--proxy-server={proxy}")
         logger.info("启动浏览器: account=%s, debug_port=%s", self.account_name, debug_port)
         self.driver = ChromiumPage(co)
+        self._listen_started = False
+
+    def _start_listen(self):
+        """登录完成后再开监听,避免干扰登录页/验证码拖动。"""
+        if self._listen_started or not self.driver:
+            return
         self.driver.listen.start("api?appid=search-pc-java")
+        self._listen_started = True
+        logger.info("已启动搜索接口监听")
 
     def register_signal_handler(self):
         def handler(signum, frame):
@@ -184,118 +207,125 @@ class JdCrawlerV2:
             return fp.get("estimatedPrice", "") or ""
         return ""
 
-    def parse(self, data):
-        ware_list = data.get("data", {}).get("wareList", [])
-        if not ware_list:
-            return
-        try:
-            for w in ware_list:
-                title = w.get("wareName", "")
+    def parse(self, ware_list):
 
-                title = re.sub(r"<[^>]*>", "", title).strip()
-                logger.info(title)
-                if self.product not in title:
-                    self.is_no_prodcut += 1
-                    continue
-                if self.brand not in title:
-                    self.is_no_prodcut += 1
-                    continue
-                # if self.product_desc not in title:
-                #     continue
-                if "+[" in title:
-                    self.is_no_prodcut += 1
-                    continue
-                self.is_no_prodcut = 0
-                status = 1
-                if self.product_keyword:
-                    search_keyword_list = self.product_keyword.split(",")
-                    for search_keyword in search_keyword_list:
-                        if search_keyword.strip() not in title:
-                            status = 0
-                if status == 0:
-                    continue
+        for w in ware_list:
+            title = w.get("wareName", "")
+
+            title = re.sub(r"<[^>]*>", "", title).strip()
+            color = w.get("color", "")
+            full_title = title + " " + color
+
+            logger.info(full_title)
 
-                logger.info(f"商品名:{title}")
-                sku_id = w.get("skuId", "")
-                sales = w.get("totalSales", "")
-                shop_id = w.get("shopId", "")
-                shop_name = w.get("shopName", "")
-                heshu_m = re.search(r"(\d+)盒", title)
-                if heshu_m:
-                    heshu_count = int(heshu_m.group(1))
+            if self.product not in full_title:
+                self.is_no_prodcut += 1
+                continue
+            if self.brand not in full_title:
+                self.is_no_prodcut += 1
+                continue
+            if self.product_desc:
+                if self.product_desc in full_title:
+                    crawl_product_desc = self.product_desc
                 else:
-                    heshu_count = 1
-                final_price = self._estimated_price(w)
-                jd_price = w.get("jdPrice", "")
-                low_price = final_price if final_price else jd_price
-
-                try:
-                    price = Decimal(str(low_price)).quantize(Decimal("0.00"))
-                except (InvalidOperation, ValueError):
-                    price = Decimal("0.00")
-
-                item_url = f"https://item.jd.com/{sku_id}.html"
-                mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc"
-
-                # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
-                now_ts = time.strftime("%Y-%m-%d %H:%M:%S")
-                product = {
-                    "platform": self.platform,
-                    "item_id": sku_id,
-                    "enterprise_id": self.company_id,
-                    "product_name": title,
-                    "spec": self.product_desc,
-                    "one_price": "",
-                    "detail_url": item_url,
-                    "shop_name": shop_name,
-                    "anonymous_store_name": "",
-                    "shop_url": mall_url,
-                    "city_name": "",
-                    "city_id": "",
-                    "province_name": "",
-                    "province_id": "",
-                    "shipment_city_name": "",
-                    "shipment_city_id": "",
-                    "shipment_province_name": "",
-                    "shipment_province_id": "",
-                    "area_info": "",
-                    "factory_name": "",
-                    "scrape_date": time.strftime("%Y-%m-%d"),
-                    "price": price,
-                    "sales": sales,
-                    "stock_count": "",
-                    "snapshot_url": "",
-                    "approval_num": "",
-                    "produced_time": "",
-                    "deadline": "",
-                    "update_time": now_ts,
-                    "insert_time": now_ts,
-                    "number": heshu_count,
-                    "product_brand": self.brand or "",
-                    "collect_task_id": self.collect_task_id,
-                    "search_name": self.product,
-                    "company_name": "",
-                    "collect_config_info": json.dumps(
-                        {
-                            "sampling_cycle": self.sampling_cycle,
-                            "sampling_start_time": self.sampling_start_time,
-                            "sampling_end_time": self.sampling_end_time,
-                        }
-                    ),
-                    "account_id": self.account_id,
-                    "collect_region_id": self.collect_region_id,
-                    "collect_round": self.collect_round,
-                    "is_sold_out": 1
-
-                }
-
-                try:
-                    self.pipeline.storge_data(product)
-                    logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
-                except Exception as e:
-                    logger.exception("写入数据库失败: %s", e)
-        except Exception as e:
-            logger.error("写入数据库失败: %s", e)
+                    crawl_product_desc = ""
+                    title = full_title
+            else:
+                crawl_product_desc = ""
+                title = full_title
+
+            if "+[" in title:
+                continue
+
+            self.is_no_prodcut = 0
+            status = 1
+            if self.product_keyword:
+                search_keyword_list = self.product_keyword.split(",")
+                for search_keyword in search_keyword_list:
+                    if search_keyword.strip() not in title:
+                        status = 0
+            if status == 0:
+                continue
+
+            logger.info(f"商品名:{title}")
+            sku_id = w.get("skuId", "")
+            sales = w.get("totalSales", "")
+            shop_id = w.get("shopId", "")
+            shop_name = w.get("shopName", "")
+            heshu_m = re.search(r"(\d+)(盒|瓶)", full_title)
+            if heshu_m:
+                heshu_count = int(heshu_m.group(1))
+            else:
+                heshu_count = 1
+            final_price = self._estimated_price(w)
+            jd_price = w.get("jdPrice", "")
+            low_price = final_price if final_price else jd_price
+
+            try:
+                price = Decimal(str(low_price)).quantize(Decimal("0.00"))
+            except (InvalidOperation, ValueError):
+                price = Decimal("0.00")
+
+            item_url = f"https://item.jd.com/{sku_id}.html"
+            mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc"
+
+            # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
+            now_ts = time.strftime("%Y-%m-%d %H:%M:%S")
+            product = {
+                "platform": self.platform,
+                "item_id": sku_id,
+                "enterprise_id": self.company_id,
+                "product_name": title,
+                "spec": crawl_product_desc,
+                "one_price": "",
+                "detail_url": item_url,
+                "shop_name": shop_name,
+                "anonymous_store_name": "",
+                "shop_url": mall_url,
+                "city_name": "",
+                "city_id": "",
+                "province_name": "",
+                "province_id": "",
+                "shipment_city_name": "",
+                "shipment_city_id": "",
+                "shipment_province_name": "",
+                "shipment_province_id": "",
+                "area_info": "",
+                "factory_name": "",
+                "scrape_date": time.strftime("%Y-%m-%d"),
+                "price": price,
+                "sales": sales,
+                "stock_count": "",
+                "snapshot_url": "",
+                "approval_num": "",
+                "produced_time": "",
+                "deadline": "",
+                "update_time": now_ts,
+                "insert_time": now_ts,
+                "number": heshu_count,
+                "product_brand": self.brand or "",
+                "collect_task_id": self.collect_task_id,
+                "search_name": self.product,
+                "company_name": "",
+                "collect_config_info": json.dumps(
+                    {
+                        "sampling_cycle": self.sampling_cycle,
+                        "sampling_start_time": self.sampling_start_time,
+                        "sampling_end_time": self.sampling_end_time,
+                    }
+                ),
+                "account_id": self.account_id,
+                "collect_region_id": self.collect_region_id,
+                "collect_round": self.collect_round,
+                "is_sold_out": 0
+
+            }
+
+            try:
+                self.pipeline.storge_data(product)
+                logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+            except Exception as e:
+                logger.exception("写入数据库失败: %s", e)
 
     @staticmethod
     def _response_has_ware_list(data):
@@ -312,7 +342,7 @@ class JdCrawlerV2:
                 if not self._response_has_ware_list(data):
                     continue
                 ware_list = data["data"]["wareList"]
-                self.parse(data)
+                self.parse(ware_list)
                 n += len(ware_list)
             except Exception as e:
                 logger.warning("解析监听响应失败: %s", e)
@@ -328,7 +358,7 @@ class JdCrawlerV2:
         except Exception as e:
             logger.debug("清空监听缓冲失败: %s", e)
 
-    def collect_full_page_items(self, max_steps=20):
+    def collect_full_page_items(self, max_steps=10):
         """单次循环:边滑动边收数据,到底 / 看见「下一页」即停。"""
         n = self.fetch_items_once(timeout=FETCH_TIMEOUT_FIRST)
 
@@ -367,7 +397,7 @@ class JdCrawlerV2:
             if random.random() < 0.15:
                 self.driver.run_js(f"window.scrollBy(0, -{random.randint(60, 140)})")
 
-            self.sleep(3, 5)
+            self.sleep(0.5, 1.5)
 
             if step % 3 == 2:
                 n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
@@ -380,37 +410,142 @@ class JdCrawlerV2:
         return n, next_btn
 
     def get_account(self):
-        sql_account = f""" select `id`, `name`, `ip`, `cookie_timestamp`, `cookie_str` from `accounts_platform` where `platform` = 2 and `status` = 1 and `equipment_id` = 1 order by `cookie_timestamp` asc limit 1 """
-        account_list = self.db.select_data(sql_account)
+        sql_account = """
+                      SELECT *
+                      FROM `retrieve_collect_equipment_account`
+                      WHERE `id` = %s
+                        and `status` = 0
+                      """
+        account_list = self.db.select_data(sql_account, self.account_id)
         if not account_list:
             return False
 
         account_dict = account_list[0]
-        self.ip = account_dict["ip"]
-        self.account_name = account_dict["name"]
+        print(account_dict)
+        self.ip = account_dict.get("ip")
+        self.account_name = account_dict.get("username")
+        self.login_username = account_dict.get("phone", "")
+        self.login_password = account_dict.get("password", "")
         logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
         return True
 
     def disable_account(self):
-        update_sql = f""" UPDATE `accounts_platform` SET `status`= %s WHERE `name` = %s; """
-        self.db.execute(update_sql, (0, self.account_name))
+        update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `name` = %s; """
+        self.db.execute(update_sql, (1, self.account_name))
+
+    def _build_search_keyword(self):
+        parts = [p for p in (self.brand, self.product, self.product_desc) if p]
+        return " ".join(parts).strip() or self.product
+
+    def _is_logged_out(self):
+        return bool(self.driver.ele("xpath=//*[@class='link-login']", timeout=2))
+
+    def perform_jd_login(self):
+        """
+        使用已有浏览器实例执行京东账号密码登录(含滑块验证码)。
+        成功返回 True,失败返回 False。
+        """
+        username = self.login_username
+        password = self.login_password
+        login_url = "https://passport.jd.com/new/login.aspx"
+        self.driver.get(login_url)
+        input_name = self.driver.ele("xpath=//input[@id='loginname']", timeout=15)
+        if not input_name:
+            print("未找到用户名输入框")
+            return False
+
+        input_name.input(username)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        input_pass = self.driver.ele("xpath://input[@name='nloginpwd']", timeout=5)
+        if not input_pass:
+            print("未找到密码输入框")
+            return False
+
+        input_pass.input(password)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        login_btn = self.driver.ele("xpath://a[@id='loginsubmit']", timeout=5)
+        if not login_btn:
+            print("未找到登录按钮")
+            return False
+        login_btn.click()
+
+        time.sleep(random.uniform(3, 5))
+
+        if not handle_jd_slider_captcha(self.driver):
+            print("滑块验证码未通过")
+            return False
+
+        return True
+
+    def _ensure_logged_in(self):
+        """未登录时自动走登录流程(账号密码 + 滑块)。"""
+        if not self._is_logged_out():
+            return True
+
+        logger.info("检测到未登录,开始自动登录: %s", self.account_name)
+        ok = self.perform_jd_login()
+        if ok and not self._is_logged_out():
+            logger.info("自动登录成功: %s", self.account_name)
+            return True
+
+        logger.error("自动登录失败: %s", self.account_name)
+        return False
+
+    def _check_page_blocked(self):
+        html = self.driver.html or ""
+        if "抱歉由于访问频繁导致无法搜索" in html:
+            logger.error("账号无法搜索(访问频繁)")
+            self.success = False
+            return True
+        return False
+
+    def _jump_to_page(self, target_page):
+        """跳转到指定页码,并清空跳转前的监听残留。"""
+        to_page_input = self.driver.ele(
+            "xpath=//div[contains(@class,'_pagination_toPageNum_')]//input[@type='text']",
+            timeout=3,
+        )
+        if not to_page_input:
+            logger.warning("未找到跳页输入框,无法跳转到第 %s 页", target_page)
+            return False
+
+        self.clear_listen_buffer()
+        to_page_input.input(str(target_page))
+        self.sleep(1, 2)
+        self.driver.actions.key_down("enter").key_up("enter")
+        self.sleep(3, 5)
+        self.clear_listen_buffer()
+        logger.info("已跳转到第 %s 页", target_page)
+        return True
+
+    def _go_next_page(self, next_btn):
+        self.clear_listen_buffer()
+        if not self._human_click(next_btn):
+            logger.warning("点击下一页失败")
+            return False
+        self.sleep(2, 4)
+        self.clear_listen_buffer()
+        return True
 
     def crawl(self):
         total = 0
-        keyword = self.product
-        if self.brand:
-            keyword = self.brand + "" + self.product
-        if self.product_desc:
-            keyword = keyword + " " + self.product_desc
+        keyword = self._build_search_keyword()
+
         self.driver.get("https://www.jd.com/", timeout=15)
         time.sleep(15)
-        # 判端是否登录
-        link_login = self.driver.ele("xpath=//*[@class='link-login']")
-        if link_login:
-            self.disable_account()
-            send_text(f"京东:{self.account_name}账号非登录状态")
-            self.is_success = False
-            logger.error(f"{self.account_name}账号非登录状态")
+
+        if self._is_logged_out():
+            if not self.login_password or not self.login_username:
+                return
+            if not self._ensure_logged_in():
+                self.disable_account()
+                send_text(f"京东:{self.account_name}账号登录失败")
+                self.success = False
+                return
+            self.driver.get("https://www.jd.com/", timeout=15)
+            self.sleep(3, 5)
 
         kw = quote(str(keyword or ""), safe="")
         self.driver.get(
@@ -418,47 +553,79 @@ class JdCrawlerV2:
         )
         self.sleep(5, 8)
 
-        for page in range(1, 11):
-            if "抱歉由于访问频繁导致无法搜索" in self.driver.html:
-                logger.error("账号无法搜索")
+        if self._check_page_blocked():
+            return
+
+        if not handle_jd_slider_captcha(self.driver, pause_listen=False):
+            logger.warning("进入搜索页后滑块验证码处理失败")
+            self.success = False
+            return
+
+        self._start_listen()
+
+        if self.start_page > 1:
+            if not self._jump_to_page(self.start_page):
+                logger.warning("跳页失败,将从第 1 页开始采集")
+                self.start_page = 1
+
+        logger.info(
+            "采集页码范围: %s ~ %s(共 %s 页)",
+            self.start_page,
+            self.end_page,
+            self.end_page - self.start_page + 1,
+        )
+
+        for page_no in range(self.start_page, self.end_page + 1):
+            if self._is_logged_out():
+                if not self._ensure_logged_in():
+                    self.success = False
+                    break
+                self.driver.get(
+                    f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}",
+                    timeout=15,
+                )
+                self.sleep(3, 5)
+                if page_no > 1:
+                    self._jump_to_page(page_no)
+
+            if not handle_jd_slider_captcha(self.driver, pause_listen=True):
+                logger.warning("滑块验证码处理失败,停止采集")
                 self.success = False
                 break
 
-            if "cfe.m.jd.com/privatedomain" in self.driver.url:
-                self.disable_account()
-                logger.error("账号出现验证码,暂时禁用")
-                self.success = False
+            if self._check_page_blocked():
                 break
 
-            logger.info(f"===== 第 {page} 页 =====")
-            time.sleep(random.uniform(3, 5))
+            logger.info("===== 正在爬取第 %s 页 =====", page_no)
+            search_ele = self.driver.ele("xpath=//div[@id='search-condition']", timeout=10)
+            if not search_ele:
+                logger.warning("未找到搜索结果区域,停止采集")
+                break
 
-            page_n, next_btn = self.collect_full_page_items()
-            self.sleep(3, 5)
-            logger.info(f"本页监听商品条数(含可能重复): {page_n}")
+            page_n, _ = self.collect_full_page_items()
+            logger.info("本页监听商品条数(含可能重复): %s", page_n)
             total += page_n
-            logger.info(f"累计监听条数: {total}")
+            logger.info("累计监听条数: %s", total)
 
-            if not next_btn:
-                next_btn = self.driver.ele("text=下一页")
+            if self.is_no_prodcut > 20:
+                logger.info("连续无匹配商品过多,停止采集")
+                break
+
+            if page_no >= self.end_page:
+                break
 
+            next_btn = self.driver.ele("text=下一页", timeout=2)
             if not next_btn:
                 logger.info("没有下一页(未找到)")
                 break
-
             cls_str = next_btn.attr("class") or ""
             if "disabled" in cls_str:
                 logger.info("没有下一页(已禁用)")
                 break
 
-            self.clear_listen_buffer(
-                rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT
-            )
-            if self.is_no_prodcut > 20:
+            if not self._go_next_page(next_btn):
                 break
 
-            self._human_click(next_btn)
-
     def run(self):
         # 检测账号
         if not self.get_account():
@@ -467,9 +634,9 @@ class JdCrawlerV2:
             return self.pipeline.crawl_count, self.success
         logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
 
-        # 每次选取账号,立马账号使用时间
-        update_sql = f""" UPDATE `accounts_platform` SET `status`= %s, `cookie_timestamp`= %s WHERE `name` = %s; """
-        self.db.execute(update_sql, (1, int(time.time()), self.account_name))
+        # # # 每次选取账号,立马账号使用时间
+        update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s, `update_time`= %s WHERE `name` = %s; """
+        self.db.execute(update_sql, (0, int(time.time()), self.account_name))
 
         try:
             self.init_browser()

+ 8 - 0
spiders/pdd/.idea/Downloads.iml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 24 - 17
spiders/taobao/taobao_crawl.py

@@ -7,7 +7,7 @@ from decimal import Decimal, InvalidOperation
 from curl_cffi import requests
 from lxml import etree
 from commons.Logger import get_spider_logger
-from commons.conn_mysql import MySQLPool
+from commons.conn_mysql import MySQLPoolOnline
 from pipelines.drug_pipelines import DrugPipeline
 from spiders.taobao.taobao_login import (TaobaoAutoCrawl)
 from area_info.city_name_to_id import get_city
@@ -16,7 +16,7 @@ logger = get_spider_logger("taobao")
 
 MTOP_APP_KEY = "12574478"
 MTOP_APP_ID = "34385"
-SEARCH_MAX_PAGE = 10
+SEARCH_MAX_PAGE = 20
 REQUEST_RETRY_COUNT = 3
 COOKIE_MAX_AGE_SEC = 1800
 
@@ -44,7 +44,7 @@ MTOP_URL = (
 class TaobaoCrawl:
     def __init__(self, drug_dict=None):
         self.cookies = None
-        self.db = MySQLPool()
+        self.db = MySQLPoolOnline()
         self.pipeline = DrugPipeline("taobao")
         self.session = None
         self.proxies = None
@@ -295,11 +295,15 @@ class TaobaoCrawl:
                     self.is_no_product += 1
                     continue
                 if "+" in item_title:
-                    self.is_no_product += 1
                     continue
 
-                if self.product_desc and self.product_desc not in item_title:
-                    continue
+                if self.product_desc:
+                    if self.product_desc in item_title:
+                        crawl_product_desc = self.product_desc
+                    else:
+                        crawl_product_desc = ""
+                else:
+                    crawl_product_desc = ""
                 self.is_no_product = 0
                 status = 1
                 if self.product_keyword:
@@ -349,12 +353,12 @@ class TaobaoCrawl:
                     "item_id": item_id,
                     "enterprise_id": self.company_id,
                     "product_name": item_title,
-                    "spec": self.product_desc,
+                    "spec": crawl_product_desc,
                     "one_price": "",
                     "detail_url": item_url,
                     "shop_name": shop_name,
                     "anonymous_store_name": "",
-                    "shop_url": "",
+                    "shop_url": shop_url,
                     "city_name": "",
                     "city_id": "",
                     "province_name": "",
@@ -390,7 +394,7 @@ class TaobaoCrawl:
                     "account_id": self.account_id,
                     "collect_region_id": self.collect_region_id,
                     "collect_round": self.collect_round,
-                    "is_sold_out": 1
+                    "is_sold_out": 0
 
                 }
 
@@ -429,8 +433,7 @@ class TaobaoCrawl:
             return False
         safe_name = self._sql_literal(self.account_name)
         sql_account = (
-            "select `id`,`name`,`ip`,`cookie_timestamp`,`cookie_str` "
-            f"from `accounts_platform` where `name`='{safe_name}'"
+            f"select * from `retrieve_collect_equipment_account` where `username`='{safe_name}'"
         )
         account_list = self.db.select_data(sql_account)
         if not account_list:
@@ -448,18 +451,22 @@ class TaobaoCrawl:
         return True
 
     def get_account(self):
-        sql_account = f""" select `id`,`name`,`ip`,`cookie_timestamp`,`cookie_str` from `accounts_platform` where `platform`=1 and `status`=1 and `equipment_id`=1 order by `cookie_timestamp` asc limit 1 """
-
-        account_list = self.db.select_data(sql_account)
+        sql_account = """
+                      SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0
+                      """
+        account_list = self.db.select_data(sql_account,self.account_id)
         if not account_list:
             return False
 
         account_dict = account_list[0]
         self.ip = account_dict.get("ip")
         cookie_str = account_dict.get("cookie_str")
-        self.account_name = account_dict.get("name")
-        self.cookie_stamp = account_dict.get("cookie_timestamp")
 
+        self.ip = account_dict.get("ip")
+        self.account_name = account_dict.get("username")
+        self.login_username = account_dict.get("phone", "")
+        self.login_password = account_dict.get("password", "")
+        self.cookie_stamp = account_dict.get("update_time")
         if self.ip:
             account_proxy = f"http://{self.ip}"
             self.proxies = {"http": account_proxy, "https": account_proxy}
@@ -470,7 +477,7 @@ class TaobaoCrawl:
                 not cookie_str
                 or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
         )
-
+        print(account_dict)
         if need_refresh:
             if not self.update_cookie():
                 return False

+ 6 - 15
spiders/taobao/taobao_login.py

@@ -6,7 +6,7 @@ import sys
 from DrissionPage import ChromiumPage, ChromiumOptions
 import re
 import socket
-from commons.conn_mysql import MySQLPool
+from commons.conn_mysql import MySQLPoolOnline
 import hashlib
 from commons.Logger import logger
 
@@ -22,7 +22,7 @@ class TaobaoAutoCrawl:
     def __init__(self, account_name, ip, key_word):
         self.driver = None
         self.register_signal_handler()
-        self.db = MySQLPool()
+        self.db = MySQLPoolOnline()
         self.account_name = account_name
         self.ip = ip
         self.keyword = key_word
@@ -156,8 +156,8 @@ class TaobaoAutoCrawl:
 
         ele_iframe = self.driver.ele("xpath=//iframe[@id='baxia-dialog-content']")
         if ele_iframe:
-            update_sql = f""" UPDATE `accounts_platform` SET `status`= %s WHERE `nickname` = %s; """
-            self.db.execute(update_sql, (0, self.account_name))
+            update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `nickname` = %s; """
+            self.db.execute(update_sql, (1, self.account_name))
             return False
 
         ele = self.driver.ele('xpath=//*[contains(@class,"site-nav-login-info-nick")]', timeout=30)
@@ -166,10 +166,9 @@ class TaobaoAutoCrawl:
             cookies_list = self.driver.cookies()
             cookies_dict = {c['name']: c['value'] for c in cookies_list}
             timestamp = int(time.time())
-            next_update_time = timestamp + random.randint(3600, 7200)
             # 保存 cookie 到文件
-            update_sql = f""" UPDATE `accounts_platform` SET `cookie_timestamp` = %s, `cookie_str`= %s,`cookie_update_time` = %s, `status`= %s WHERE `nickname` = %s; """
-            self.db.execute(update_sql, (timestamp, json.dumps(cookies_dict), next_update_time, 1, self.account_name))
+            update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `update_time` = %s, `cookie_str`= %s,`status`= %s WHERE `username` = %s; """
+            self.db.execute(update_sql, (timestamp, json.dumps(cookies_dict), 0, self.account_name))
             print(f"{self.account_name},获取 cookie 成功!")
             logger.info(f"{self.account_name},获取 cookie 成功!")
 
@@ -189,11 +188,3 @@ class TaobaoAutoCrawl:
             if self.driver:
                 self.driver.quit()
         return bool(bool_login)
-
-
-if __name__ == '__main__':
-    account_name = "tb_account10"
-    ip = ""
-    keyword = "手机"
-    taobao_crawl = TaobaoAutoCrawl(account_name, ip, keyword)
-    taobao_crawl.run()

+ 24 - 8
start_run_jd.py

@@ -7,7 +7,7 @@ from spiders.jd.jd_auto_crawl import JdCrawlerV2
 import schedule
 from commons.feishu_webhook import send_text
 import random
-
+from commons.config import JD_DEVICE_ID
 platform_name = "京东"
 
 
@@ -49,14 +49,25 @@ class JdMain:
             logger.warning(f"状态上报失败: {e}")
 
     def get_task(self):
-        """ 获取任务 """
-        sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=2 and  `status`=1 limit 1 """
-        task_list = self.db_online.select_data(sql_task)
-
+        """获取当前设备绑定的京东待执行任务。"""
+        sql = """
+            SELECT t.*
+            FROM `retrieve_collect_task_allocate` t
+            INNER JOIN `retrieve_collect_equipment_account` a
+                ON t.`collect_equipment_account_id` = a.`id`
+            WHERE a.`device_id` = %s
+              AND t.`platform` = 2
+              AND t.`status` = 1
+            LIMIT 1
+        """
+        task_list = self.db_online.select_data(sql, (JD_DEVICE_ID,))
+        print(task_list)
         if not task_list:
             return {}
+
         task_dict = task_list[0]
         self.task_id = task_dict["id"]
+        print(task_dict)
         return task_dict
 
     def heartbeat_task(self):
@@ -76,13 +87,13 @@ class JdMain:
             logger.info(f"{platform_name}暂无任务")
             return
 
-        logger.info(json.dumps(self.task_dict))
+
         self.get_status(2)
         self.crawl_count, is_success = JdCrawlerV2(self.task_dict).run()
 
         self.heartbeat_task()
-        send_text(
-            f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")},  爬取数据: {self.crawl_count}条""")
+        # send_text(
+        #     f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_brand") + " " + self.task_dict.get("product_name") + " " + self.task_dict.get("product_specs")},  爬取数据: {self.crawl_count}条""")
         if is_success:
             self.get_status(3)
         else:
@@ -90,6 +101,11 @@ class JdMain:
 
 
 if __name__ == '__main__':
+    # task_dict= {"id": 1622, 'collect_task_id': 4596, 'company_id': 8, 'product_name': '小儿氨酚烷胺颗粒',
+    # 'product_specs': '', 'product_keyword': '', 'product_brand': '可复美', 'sampling_cycle': 1,
+    # 'sampling_start_time': 1778083200, 'sampling_end_time': 1778342399, 'collect_equipment_account_id': 15,
+    # 'collect_region_id': 0, 'collect_equipment_id': 25, 'collect_round': 2,"start_page":4,"end_page":10}
+    # JdCrawlerV2(task_dict).run()
     # 每10分钟执行一次
     while True:
         JdMain().run()

+ 18 - 7
start_run_taobao.py

@@ -7,6 +7,7 @@ from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
 from spiders.taobao.taobao_crawl import TaobaoCrawl
 import schedule
 from commons.feishu_webhook import send_text
+from commons.config import TB_DEVICE_ID
 
 platform_name = "淘宝"
 
@@ -59,15 +60,24 @@ class TaobaoMain:
             logger.info(f"心跳任务上报失败{str(e)}")
 
     def get_task(self):
-        """ 获取任务 """
-        sql_task = f""" select `id`,`collect_task_id`,`company_id`,`product_name`,`product_specs`,`product_keyword`,`product_brand`,`sampling_cycle`,`sampling_start_time`,`sampling_end_time`,`collect_equipment_account_id`,`collect_region_id`,`collect_equipment_id`,`collect_round` from `retrieve_collect_task_allocate` where `platform`=1 and `status`=1 limit 1 """
-        task_list = self.db_online.select_data(sql_task)
+        """获取当前设备绑定的京东待执行任务。"""
+        sql = """
+            SELECT t.*
+            FROM `retrieve_collect_task_allocate` t
+            INNER JOIN `retrieve_collect_equipment_account` a
+                ON t.`collect_equipment_account_id` = a.`id`
+            WHERE a.`device_id` = %s
+              AND t.`platform` = 1
+              AND t.`status` = 1
+            LIMIT 1
+        """
+        task_list = self.db_online.select_data(sql, (TB_DEVICE_ID,))
 
         if not task_list:
             return {}
+
         task_dict = task_list[0]
         self.task_id = task_dict["id"]
-        print(task_dict)
         return task_dict
 
     def run(self):
@@ -75,13 +85,14 @@ class TaobaoMain:
         if not self.task_dict:
             logger.info(f"{platform_name}暂无任务")
             return
-        logger.info(self.task_dict)
+        # logger.info(self.task_dict)
         self.get_status(2)
         self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
         self.heartbeat_task()
 
-        send_text(
-            f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
+        # send_text(
+        #     f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
+
         if is_success:
             self.get_status(3)
         else: