Forráskód Böngészése

京东快照优化

zhuoyuncheng 1 hete
szülő
commit
d235607eb3
2 módosított fájl, 719 hozzáadás és 19 törlés
  1. 1 19
      spiders/jd/jd_auto_crawl.py
  2. 718 0
      spiders/jd/jd_auto_crawl_snap.py

+ 1 - 19
spiders/jd/jd_auto_crawl.py

@@ -223,24 +223,6 @@ class JdCrawlerV2:
         else:
             return 1
 
-    def _take_snapshot(self, upload_key, ele):
-        """在指定标签页截图并上传。"""
-        time.sleep(1)
-        try:
-            jpg_bytes = ele.get_screenshot(as_bytes="jpg")
-            if not jpg_bytes:
-                logger.warning("截图为空 upload_key=%s", upload_key)
-                return ""
-            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
-        except Exception:
-            logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
-            return ""
-        if not img_url:
-            logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
-            return ""
-        logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
-        time.sleep(random.uniform(1, 2))
-        return img_url
 
     def get_heshu(self,full_title):
         last_box = None
@@ -312,7 +294,7 @@ class JdCrawlerV2:
             ele_xpath = "//div[@id='main_search_conter']//div[contains(@class,'_goodsContainer_')]/div[@data-sku=" + "'" + sku_id + "'" + "]"
             ele_screen = self.driver.ele("xpath="+ele_xpath)
             upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest()
-            snap_url = self._take_snapshot(upload_key,ele_screen)
+            snap_url = ""
 
             try:
                 price = Decimal(str(low_price)).quantize(Decimal("0.00"))

+ 718 - 0
spiders/jd/jd_auto_crawl_snap.py

@@ -0,0 +1,718 @@
+import random
+import re
+import signal
+import socket
+import sys
+import time
+from decimal import Decimal, InvalidOperation
+from urllib.parse import quote
+from DrissionPage import ChromiumPage, ChromiumOptions
+import json
+import hashlib
+from commons.Logger import get_spider_logger
+from commons.conn_mysql import MySQLPoolOnline
+from pipelines.drug_pipelines import DrugPipeline
+from commons.feishu_webhook import send_text
+from spiders.jd.jd_captcha import handle_jd_slider_captcha
+from oss_upload.oss_upload import AliyunOSSUploader
+
+logger = get_spider_logger("jd")
+
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+
+FETCH_TIMEOUT_FIRST = 5
+FETCH_TIMEOUT_SCROLL = 6
+LISTEN_CLEAR_ROUNDS = 3
+LISTEN_CLEAR_TIMEOUT = 0.45
+
+# 「下一页」是否在视口内(条件略宽)
+_JS_NEXT_BTN_IN_VIEWPORT = """
+var el = arguments[0];
+if (!el) return false;
+var r = el.getBoundingClientRect();
+var h = window.innerHeight || document.documentElement.clientHeight || 800;
+var w = window.innerWidth || document.documentElement.clientWidth || 1200;
+return r.bottom > 80 && r.top < h - 40 && r.right > 0 && r.left < w;
+"""
+
+
+class JdCrawlerV2:
+    def __init__(self, drug_dict=None):
+        self.driver = None
+        self.register_signal_handler()
+        self.db = MySQLPoolOnline()
+        self.ip = None
+        self.account_name = None
+        self.login_username = None
+        self.login_password = None
+        self.platform = 2
+        self.pipeline = DrugPipeline("jd")
+        self.task_dict = drug_dict or {}
+        self.ossuploader = AliyunOSSUploader()
+        self.start_page = 1
+        self.end_page = 1
+        if self.task_dict:
+            self.get_product_data()
+        self.success = True
+        self.is_no_prodcut = 0
+
+    def get_product_data(self):
+        self.task_id = self.task_dict["id"]
+        self.company_id = self.task_dict["company_id"]
+        self.product = self.task_dict["product_name"]
+        self.product_desc = self.task_dict.get("product_specs", "")
+        self.brand = self.task_dict.get("product_brand", "")
+        self.product_keyword = self.task_dict.get("product_keyword", "")
+        self.collect_task_id = self.task_dict.get("collect_task_id", "")
+        self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
+        self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
+        self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
+        self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
+        self.account_id = self.task_dict.get("collect_equipment_account_id", "")
+        self.collect_region_id = self.task_dict.get("collect_region_id", "")
+        self.collect_round = self.task_dict.get("collect_round", 1)
+        self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
+        self.end_page = self._parse_page(self.task_dict.get("end_page"), 20)
+
+    @staticmethod
+    def _parse_page(value, default=1):
+        try:
+            page = int(value)
+            return page if page >= 1 else default
+        except (TypeError, ValueError):
+            return default
+
+    @staticmethod
+    def _get_free_port():
+        """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def init_browser(self):
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        co.set_user_data_path(f"./spiders/jd/{self.account_name}")
+
+        co.set_local_port(debug_port)
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_argument("--remote-debugging-address=127.0.0.1")
+        # co.set_argument("--disable-blink-features=AutomationControlled")
+        co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--no-first-run")  # 避免首次运行弹窗
+        co.set_argument("--no-default-browser-check")  # 避免默认浏览器检查
+
+        if self.ip:
+            proxy = self.ip.strip()
+            if not proxy.startswith(("http://", "https://")):
+                proxy = f"http://{proxy}"
+            co.set_argument(f"--proxy-server={proxy}")
+        logger.info("启动浏览器: account=%s, debug_port=%s", self.account_name, debug_port)
+        self.driver = ChromiumPage(co)
+        self._listen_started = False
+
+    def _start_listen(self):
+        """登录完成后再开监听,避免干扰登录页/验证码拖动。"""
+        if self._listen_started or not self.driver:
+            return
+        self.driver.listen.start("api?appid=search-pc-java")
+        self._listen_started = True
+        logger.info("已启动搜索接口监听")
+
+    def register_signal_handler(self):
+        def handler(signum, frame):
+            print("\n⚠️ 程序退出")
+            if self.driver:
+                self.driver.quit()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, handler)
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, handler)
+
+    def sleep(self, a, b):
+        time.sleep(random.uniform(a, b))
+
+    def _scroll_page_down(self, delta=900):
+        self.driver.run_js(f"window.scrollBy(0, {int(delta)});")
+        time.sleep(random.uniform(0.3, 0.6))
+
+    def _scroll_next_into_view(self, el):
+        if not el:
+            return
+        try:
+            self.driver.run_js(
+                "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
+                el,
+            )
+            self.sleep(1, 2)
+        except Exception as e:
+            logger.warning("滚动到下一页按钮失败: %s", e)
+            try:
+                el.scroll.to_see()
+            except Exception:
+                pass
+
+    def _get_scroll_info(self):
+        return self.driver.run_js("""
+            return {
+                scrollY: window.scrollY || window.pageYOffset || 0,
+                docH: Math.max(document.body.scrollHeight,
+                               document.documentElement.scrollHeight,
+                               document.body.offsetHeight),
+                viewH: window.innerHeight || document.documentElement.clientHeight || 800
+            };
+        """)
+
+    def _find_next_btn(self, timeout=0.3):
+        try:
+            return self.driver.ele("text=下一页", timeout=timeout)
+        except Exception:
+            return None
+
+    def _is_next_btn_visible(self, btn):
+        if not btn:
+            return False
+        try:
+            return bool(self.driver.run_js(_JS_NEXT_BTN_IN_VIEWPORT, btn))
+        except Exception:
+            return False
+
+    def _human_click(self, element):
+        """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。"""
+        if not element:
+            return False
+        try:
+            self.sleep(0.8, 2.0)
+            try:
+                self.driver.run_js(
+                    "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
+                    element,
+                )
+            except Exception:
+                pass
+            self.sleep(0.2, 0.6)
+            self.driver.run_js("arguments[0].click();", element)
+            return True
+        except Exception as e:
+            logger.warning("点击失败: %s", e)
+            try:
+                element.click()
+                return True
+            except Exception:
+                return False
+
+    @staticmethod
+    def _estimated_price(json_data):
+        fp = json_data.get("finalPrice")
+        if isinstance(fp, dict):
+            return fp.get("estimatedPrice", "") or ""
+        return ""
+
+    def get_heshu(self, full_title):
+        last_box = None
+        last_bottle = None
+        for match in re.finditer(r"(\d+)(盒|瓶)", full_title):
+            if match.group(2) == '盒':
+                last_box = match
+            else:  # 瓶
+                last_bottle = match
+        if last_box:
+            return int(last_box.group(1))
+        elif last_bottle:
+            return int(last_bottle.group(1))
+        else:
+            return 1
+
+    def _take_snapshot(self, upload_key, image_ele, max_retries=3):
+        """在指定标签页截图并上传。"""
+        for attempt in range(1, max_retries + 1):
+            time.sleep(1)
+            try:
+                jpg_bytes = image_ele.get_screenshot(as_bytes="jpg")
+                if not jpg_bytes:
+                    logger.warning(
+                        "截图为空 upload_key=%s attempt=%s/%s",
+                        upload_key, attempt, max_retries, )
+                    continue
+                img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
+            except Exception:
+                logger.exception(
+                    "截图或 OSS 上传失败 upload_key=%s attempt=%s/%s",
+                    upload_key, attempt, max_retries,
+                )
+                continue
+            if not img_url:
+                logger.warning(
+                    "OSS 未返回有效地址 upload_key=%s attempt=%s/%s",
+                    upload_key, attempt, max_retries,
+                )
+                continue
+            logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
+            time.sleep(random.uniform(1, 2))
+            return img_url
+        logger.warning("截图失败,已达最大重试次数 upload_key=%s", upload_key)
+        return ""
+
+    def get_heshu(self, full_title):
+        last_box = None
+        last_bottle = None
+        for match in re.finditer(r"(\d+)(盒|瓶)", full_title):
+            if match.group(2) == '盒':
+                last_box = match
+            else:  # 瓶
+                last_bottle = match
+        if last_box:
+            return int(last_box.group(1))
+        elif last_bottle:
+            return int(last_bottle.group(1))
+        else:
+            return 1
+
+    def parse(self, ware_list):
+
+        for w in ware_list:
+            title = w.get("wareName", "")
+
+            title = re.sub(r"<[^>]*>", "", title).strip()
+            color = w.get("color", "")
+            full_title = title + " " + color
+
+            logger.info(full_title)
+
+            if self.product not in full_title:
+                self.is_no_prodcut += 1
+                continue
+            if self.brand not in full_title:
+                self.is_no_prodcut += 1
+                continue
+            if self.product_desc:
+                if self.product_desc in full_title:
+                    crawl_product_desc = self.product_desc
+                else:
+                    crawl_product_desc = ""
+                    title = full_title
+            else:
+                crawl_product_desc = ""
+                title = full_title
+
+            if "+[" in title:
+                continue
+
+            self.is_no_prodcut = 0
+            status = 1
+            if self.product_keyword:
+                search_keyword_list = self.product_keyword.split(",")
+                for search_keyword in search_keyword_list:
+                    if search_keyword.strip() not in title:
+                        status = 0
+            if status == 0:
+                continue
+
+            logger.info(f"商品名:{title}")
+            sku_id = w.get("skuId", "")
+            sales = w.get("totalSales", "")
+            shop_id = w.get("shopId", "")
+            shop_name = w.get("shopName", "")
+            heshu_count = self.get_heshu(full_title)
+            final_price = self._estimated_price(w)
+            jd_price = w.get("jdPrice", "")
+            item_url = f"https://item.jd.com/{sku_id}.html"
+            low_price = final_price if final_price else jd_price
+
+            # 获取列表页快照
+            ele_xpath = "//div[@id='main_search_conter']//div[contains(@class,'_goodsContainer_')]/div[@data-sku=" + "'" + sku_id + "'" + "]"
+            ele_screen = self.driver.ele("xpath=" + ele_xpath)
+            upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest()
+            snap_url = self._take_snapshot(upload_key, ele_screen)
+
+            try:
+                price = Decimal(str(low_price)).quantize(Decimal("0.00"))
+            except (InvalidOperation, ValueError):
+                price = Decimal("0.00")
+
+            item_url = f"https://item.jd.com/{sku_id}.html"
+            mall_url = f"https://mall.jd.com/index-{shop_id}.html?from=pc"
+
+            # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
+            now_ts = time.strftime("%Y-%m-%d %H:%M:%S")
+            product = {
+                "platform": self.platform,
+                "item_id": sku_id,
+                "enterprise_id": self.company_id,
+                "product_name": title,
+                "spec": crawl_product_desc,
+                "one_price": "",
+                "detail_url": item_url,
+                "shop_name": shop_name,
+                "anonymous_store_name": "",
+                "shop_url": mall_url,
+                "city_name": "",
+                "city_id": "",
+                "province_name": "",
+                "province_id": "",
+                "shipment_city_name": "",
+                "shipment_city_id": "",
+                "shipment_province_name": "",
+                "shipment_province_id": "",
+                "area_info": "",
+                "factory_name": "",
+                "scrape_date": time.strftime("%Y-%m-%d"),
+                "price": price,
+                "sales": sales,
+                "stock_count": "",
+                "snapshot_url": snap_url,
+                "approval_num": "",
+                "produced_time": "",
+                "deadline": "",
+                "update_time": now_ts,
+                "insert_time": now_ts,
+                "number": heshu_count,
+                "product_brand": self.brand or "",
+                "collect_task_id": self.collect_task_id,
+                "search_name": self.product,
+                "company_name": "",
+                "collect_config_info": json.dumps(
+                    {
+                        "sampling_cycle": self.sampling_cycle,
+                        "sampling_start_time": self.sampling_start_time,
+                        "sampling_end_time": self.sampling_end_time,
+                    }
+                ),
+                "account_id": self.account_id,
+                "collect_region_id": self.collect_region_id,
+                "collect_round": self.collect_round,
+                "is_sold_out": 0
+
+            }
+
+            try:
+                self.pipeline.storge_data(product)
+                logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+            except Exception as e:
+                logger.exception("写入数据库失败: %s", e)
+
+    @staticmethod
+    def _response_has_ware_list(data):
+        if not isinstance(data, dict):
+            return False
+        wl = data.get("data", {}).get("wareList")
+        return bool(wl)
+
+    def fetch_items_once(self, timeout=FETCH_TIMEOUT_FIRST):
+        n = 0
+        for resp in self.driver.listen.steps(timeout=timeout):
+            try:
+                data = resp.response.body
+                if not self._response_has_ware_list(data):
+                    continue
+                ware_list = data["data"]["wareList"]
+                self.parse(ware_list)
+                n += len(ware_list)
+            except Exception as e:
+                logger.warning("解析监听响应失败: %s", e)
+        return n
+
+    def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT):
+        try:
+            for _ in range(rounds):
+                resps = list(self.driver.listen.steps(timeout=timeout))
+                if not resps:
+                    break
+            logger.debug("监听缓冲已清空")
+        except Exception as e:
+            logger.debug("清空监听缓冲失败: %s", e)
+
+    def collect_full_page_items(self, max_steps=10):
+        """单次循环:边滑动边收数据,到底 / 看见「下一页」即停。"""
+        n = self.fetch_items_once(timeout=FETCH_TIMEOUT_FIRST)
+
+        stagnant = 0
+        last_scroll_y = None
+
+        for step in range(max_steps):
+            next_btn = self._find_next_btn(timeout=0.3)
+            if self._is_next_btn_visible(next_btn):
+                n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
+                return n, next_btn
+
+            info = self._get_scroll_info()
+            scroll_y = info["scrollY"]
+            doc_h = info["docH"]
+            view_h = info["viewH"]
+
+            at_bottom = (scroll_y + view_h >= doc_h - 20)
+            if last_scroll_y is not None and abs(scroll_y - last_scroll_y) < 8:
+                stagnant += 1
+            else:
+                stagnant = 0
+            last_scroll_y = scroll_y
+
+            if at_bottom and stagnant >= 2:
+                n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
+                next_btn = self._find_next_btn(timeout=2)
+                if next_btn:
+                    self._scroll_next_into_view(next_btn)
+                    return n, next_btn
+                logger.info("已到页面底部且未发现下一页,停止滑动")
+                return n, None
+
+            self._scroll_page_down(random.randint(400, 800))
+
+            if random.random() < 0.15:
+                self.driver.run_js(f"window.scrollBy(0, -{random.randint(60, 140)})")
+
+            self.sleep(0.5, 1.5)
+
+            if step % 3 == 2:
+                n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
+
+        n += self.fetch_items_once(timeout=FETCH_TIMEOUT_SCROLL)
+        next_btn = self._find_next_btn(timeout=3)
+        if next_btn and not self._is_next_btn_visible(next_btn):
+            self._scroll_next_into_view(next_btn)
+
+        return n, next_btn
+
+    def get_account(self):
+        sql_account = """
+                      SELECT *
+                      FROM `retrieve_collect_equipment_account`
+                      WHERE `id` = %s
+                        and `status` = 0
+                      """
+        account_list = self.db.select_data(sql_account, self.account_id)
+        if not account_list:
+            return False
+
+        account_dict = account_list[0]
+        print(account_dict)
+        self.ip = account_dict.get("ip")
+        self.account_name = account_dict.get("username")
+        self.login_username = account_dict.get("phone", "")
+        self.login_password = account_dict.get("password", "")
+        logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
+        return True
+
+    def disable_account(self):
+        update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `name` = %s; """
+        self.db.execute(update_sql, (1, self.account_name))
+
+    def _build_search_keyword(self):
+        parts = [p for p in (self.brand, self.product, self.product_desc) if p]
+        return " ".join(parts).strip() or self.product
+
+    def _is_logged_out(self):
+        return bool(self.driver.ele("xpath=//*[@class='link-login']", timeout=2))
+
+    def perform_jd_login(self):
+        """
+        使用已有浏览器实例执行京东账号密码登录(含滑块验证码)。
+        成功返回 True,失败返回 False。
+        """
+        username = self.login_username
+        password = self.login_password
+        login_url = "https://passport.jd.com/new/login.aspx"
+        self.driver.get(login_url)
+        input_name = self.driver.ele("xpath=//input[@id='loginname']", timeout=15)
+        if not input_name:
+            print("未找到用户名输入框")
+            return False
+
+        input_name.input(username)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        input_pass = self.driver.ele("xpath://input[@name='nloginpwd']", timeout=5)
+        if not input_pass:
+            print("未找到密码输入框")
+            return False
+
+        input_pass.input(password)
+        time.sleep(random.uniform(1.5, 2.5))
+
+        login_btn = self.driver.ele("xpath://a[@id='loginsubmit']", timeout=5)
+        if not login_btn:
+            print("未找到登录按钮")
+            return False
+        login_btn.click()
+
+        time.sleep(random.uniform(3, 5))
+
+        if not handle_jd_slider_captcha(self.driver):
+            print("滑块验证码未通过")
+            return False
+
+        return True
+
+    def _ensure_logged_in(self):
+        """未登录时自动走登录流程(账号密码 + 滑块)。"""
+        if not self._is_logged_out():
+            return True
+
+        logger.info("检测到未登录,开始自动登录: %s", self.account_name)
+        ok = self.perform_jd_login()
+        if ok and not self._is_logged_out():
+            logger.info("自动登录成功: %s", self.account_name)
+            return True
+
+        logger.error("自动登录失败: %s", self.account_name)
+        return False
+
+    def _check_page_blocked(self):
+        html = self.driver.html or ""
+        if "抱歉由于访问频繁导致无法搜索" in html:
+            logger.error("账号无法搜索(访问频繁)")
+            self.success = False
+            return True
+        return False
+
+    def _jump_to_page(self, target_page):
+        """跳转到指定页码,并清空跳转前的监听残留。"""
+        to_page_input = self.driver.ele(
+            "xpath=//div[contains(@class,'_pagination_toPageNum_')]//input[@type='text']",
+            timeout=3,
+        )
+        if not to_page_input:
+            logger.warning("未找到跳页输入框,无法跳转到第 %s 页", target_page)
+            return False
+
+        self.clear_listen_buffer()
+        to_page_input.input(str(target_page))
+        self.sleep(1, 2)
+        self.driver.actions.key_down("enter").key_up("enter")
+        self.sleep(3, 5)
+        self.clear_listen_buffer()
+        logger.info("已跳转到第 %s 页", target_page)
+        return True
+
+    def _go_next_page(self, next_btn):
+        self.clear_listen_buffer()
+        if not self._human_click(next_btn):
+            logger.warning("点击下一页失败")
+            return False
+        self.sleep(2, 4)
+        return True
+
+    def crawl(self):
+        total = 0
+        keyword = self._build_search_keyword()
+
+        self.driver.get("https://www.jd.com/", timeout=15)
+        time.sleep(15)
+
+        if self._is_logged_out():
+            if not self.login_password or not self.login_username:
+                return
+            if not self._ensure_logged_in():
+                self.disable_account()
+                send_text(f"京东:{self.account_name}账号登录失败")
+                self.success = False
+                return
+            self.driver.get("https://www.jd.com/", timeout=15)
+            self.sleep(3, 5)
+
+        kw = quote(str(keyword or ""), safe="")
+        self._search_kw = kw
+        # 必须先监听再打开搜索页,否则首屏 wareList(前约 30 条)在监听开启前就返回了
+        self._start_listen()
+        self.driver.get(
+            f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}", timeout=15
+        )
+        self.sleep(5, 8)
+
+        if self._check_page_blocked():
+            return
+
+        if not handle_jd_slider_captcha(self.driver, pause_listen=False):
+            logger.warning("进入搜索页后滑块验证码处理失败")
+            self.success = False
+            return
+
+        if self.start_page > 1:
+            if not self._jump_to_page(self.start_page):
+                logger.warning("跳页失败,将从第 1 页开始采集")
+                self.start_page = 1
+
+        logger.info(
+            "采集页码范围: %s ~ %s(共 %s 页)",
+            self.start_page,
+            self.end_page,
+            self.end_page - self.start_page + 1,
+        )
+
+        for page_no in range(self.start_page, self.end_page + 1):
+            if self._is_logged_out():
+                if not self._ensure_logged_in():
+                    self.success = False
+                    break
+                self.driver.get(
+                    f"https://search.jd.com/Search?keyword={kw}&enc=utf-8&wq={kw}",
+                    timeout=15,
+                )
+                self.sleep(3, 5)
+                if page_no > 1:
+                    self._jump_to_page(page_no)
+
+            if not handle_jd_slider_captcha(self.driver, pause_listen=True):
+                logger.warning("滑块验证码处理失败,停止采集")
+                self.success = False
+                break
+
+            if self._check_page_blocked():
+                break
+
+            logger.info("===== 正在爬取第 %s 页 =====", page_no)
+            search_ele = self.driver.ele("xpath=//div[@id='search-condition']", timeout=10)
+            if not search_ele:
+                logger.warning("未找到搜索结果区域,停止采集")
+                break
+
+            page_n, _ = self.collect_full_page_items()
+            logger.info("本页监听商品条数(含可能重复): %s", page_n)
+            total += page_n
+            logger.info("累计监听条数: %s", total)
+
+            if self.is_no_prodcut > 20:
+                logger.info("连续无匹配商品过多,停止采集")
+                break
+
+            if page_no >= self.end_page:
+                break
+
+            next_btn = self.driver.ele("text=下一页", timeout=2)
+            if not next_btn:
+                logger.info("没有下一页(未找到)")
+                break
+            cls_str = next_btn.attr("class") or ""
+            if "disabled" in cls_str:
+                logger.info("没有下一页(已禁用)")
+                break
+
+            if not self._go_next_page(next_btn):
+                break
+
+    def run(self):
+        # 检测账号
+        if not self.get_account():
+            logger.info("==================当前无账号可用==================")
+            self.success = False
+            return self.pipeline.crawl_count, self.success
+        logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
+
+        # # # 每次选取账号,立马账号使用时间
+        update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s, `update_time`= %s WHERE `username` = %s; """
+        self.db.execute(update_sql, (0, int(time.time()), self.account_name))
+
+        try:
+            self.init_browser()
+            self.crawl()
+        except Exception as e:
+            self.success = False
+            logger.exception("爬取异常: %s", e)
+            self.sleep(3, 5)
+
+        finally:
+            if self.driver:
+                self.driver.quit()
+                self.driver = None
+        return self.pipeline.crawl_count, self.success