zhuoyuncheng hace 1 semana
padre
commit
2fcd7a3e6b

+ 9 - 2
commons/config.py

@@ -1,9 +1,9 @@
 # 京东设备id,对应数据库账号表的id
 
-JD_DEVICE_ID = 1
+JD_DEVICE_ID = 4
 
 #淘宝设备id,对应数据库账号表的id
-TB_DEVICE_ID= 1
+TB_DEVICE_ID= 4
 
 # 药师帮账号
 YSB_ACCOUNT = {
@@ -11,3 +11,10 @@ YSB_ACCOUNT = {
     "password":"ljm123456"
 }
 
+# 壹药城账号
+YYC_ACCOUNT = {
+    "username":"18193030281",
+    "password":"a666666",
+    "token":"Sm45MzRmREtiaThGc1gvLzZaVnRFWXFjTlg4bVBZaytxZXoyRjR5OXB1NmFJMk84VDhadzI4NUdNUmhFZjFtZCszSnFGV1JUdThVTWtwUi9SVG9jUk01T0JNZ1RyUVZCNUsyNms0Z0F3OUhRc3JKRzZ1S0dFOWNLUVAwNFdweWRtaDkyS25iOUVwU3dnUC9FcVpnZWdSbS9QQm1qSHhsOS84RHlOc1czUWgxRTZkSEh4d2NUOCtkNzA1dTlLWjkzP2FwcElkPTEyNTAma2V5SWQ9MTI1MA==",
+    "user_id":387715
+}

+ 28 - 0
snapshot_start_run_yaoex.py

@@ -0,0 +1,28 @@
+from spiders.yaoex.yaoex_snapshot_crawl import YaoexSnapshotCrawl
+from commons.collect_schedule_runner import run_scheduled_loop
+
+PLATFORM_NAME = "壹药城"
+PLATFORM_ID = 6
+
+if __name__ == "__main__":
+    # task_dict = {
+    #     "id": 1622,
+    #     "collect_task_id": 4596,
+    #     "company_id": 8,
+    #     "product_name": "依马打正红花油",
+    #     "product_specs": "",
+    #     "product_keyword": "",
+    #     "product_brand": "金活",
+    #     "sampling_cycle": 1,
+    #     "sampling_start_time": 1778083200,
+    #     "sampling_end_time": 1778342399,
+    #     "collect_equipment_account_id": 15,
+    #     "collect_region_id": 0,
+    #     "collect_equipment_id": 25,
+    #     "collect_round": 2,
+    #     "start_page": 1,
+    #     "end_page": 10,
+    # }
+    #
+    # YaoexSnapshotCrawl(task_dict).run()
+     run_scheduled_loop(PLATFORM_NAME, PLATFORM_ID, YaoexSnapshotCrawl)

+ 28 - 0
snapshot_start_run_ysbang.py

@@ -0,0 +1,28 @@
+from spiders.yaoshibang.ysb_snapshot_crawl import YaoShiBangSnapshot
+from commons.collect_schedule_runner import run_scheduled_loop
+
+PLATFORM_NAME = "药师帮"
+PLATFORM_ID = 5
+
+if __name__ == "__main__":
+    # task_dict = {
+    #     "id": 1622,
+    #     "collect_task_id": 4596,
+    #     "company_id": 8,
+    #     "product_name": "依马打正红花油",
+    #     "product_specs": "",
+    #     "product_keyword": "",
+    #     "product_brand": "金活",
+    #     "sampling_cycle": 1,
+    #     "sampling_start_time": 1778083200,
+    #     "sampling_end_time": 1778342399,
+    #     "collect_equipment_account_id": 15,
+    #     "collect_region_id": 0,
+    #     "collect_equipment_id": 25,
+    #     "collect_round": 2,
+    #     "start_page": 1,
+    #     "end_page": 10,
+    # }
+
+    # YaoShiBangSnapshot(task_dict).run()
+    run_scheduled_loop(PLATFORM_NAME, PLATFORM_ID, YaoShiBangSnapshot)

+ 30 - 3
spiders/jd/jd_auto_crawl.py

@@ -8,12 +8,13 @@ from decimal import Decimal, InvalidOperation
 from urllib.parse import quote
 from DrissionPage import ChromiumPage, ChromiumOptions
 import json
+import hashlib
 from commons.Logger import get_spider_logger
 from commons.conn_mysql import MySQLPoolOnline
 from pipelines.drug_pipelines import DrugPipeline
 from commons.feishu_webhook import send_text
 from spiders.jd.jd_captcha import handle_jd_slider_captcha
-
+from oss_upload.oss_upload import AliyunOSSUploader
 logger = get_spider_logger("jd")
 
 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
@@ -46,7 +47,7 @@ class JdCrawlerV2:
         self.platform = 2
         self.pipeline = DrugPipeline("jd")
         self.task_dict = drug_dict or {}
-
+        self.ossuploader = AliyunOSSUploader()
         self.start_page = 1
         self.end_page = 1
         if self.task_dict:
@@ -222,6 +223,25 @@ class JdCrawlerV2:
         else:
             return 1
 
+    def _take_snapshot(self, upload_key, ele):
+        """在指定标签页截图并上传。"""
+        time.sleep(1)
+        try:
+            jpg_bytes = ele.get_screenshot(as_bytes="jpg")
+            if not jpg_bytes:
+                logger.warning("截图为空 upload_key=%s", upload_key)
+                return ""
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
+        except Exception:
+            logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
+            return ""
+        if not img_url:
+            logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
+            return ""
+        logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
+        time.sleep(random.uniform(1, 2))
+        return img_url
+
     def get_heshu(self,full_title):
         last_box = None
         last_bottle = None
@@ -285,8 +305,15 @@ class JdCrawlerV2:
             heshu_count = self.get_heshu(full_title)
             final_price = self._estimated_price(w)
             jd_price = w.get("jdPrice", "")
+            item_url = f"https://item.jd.com/{sku_id}.html"
             low_price = final_price if final_price else jd_price
 
+            # 获取列表页快照
+            ele_xpath = "//div[@id='main_search_conter']//div[contains(@class,'_goodsContainer_')]/div[@data-sku=" + "'" + sku_id + "'" + "]"
+            ele_screen = self.driver.ele("xpath="+ele_xpath)
+            upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest()
+            snap_url = self._take_snapshot(upload_key,ele_screen)
+
             try:
                 price = Decimal(str(low_price)).quantize(Decimal("0.00"))
             except (InvalidOperation, ValueError):
@@ -322,7 +349,7 @@ class JdCrawlerV2:
                 "price": price,
                 "sales": sales,
                 "stock_count": "",
-                "snapshot_url": "",
+                "snapshot_url": snap_url,
                 "approval_num": "",
                 "produced_time": "",
                 "deadline": "",

+ 642 - 0
spiders/yaoex/yaoex_snapshot_crawl.py

@@ -0,0 +1,642 @@
+import base64
+import hashlib
+import json
+import random
+import signal
+import socket
+import sys
+import time
+from pathlib import Path
+from urllib.parse import quote
+import requests
+from Crypto.Cipher import AES
+from DrissionPage import ChromiumPage, ChromiumOptions
+from commons.Logger import logger
+from oss_upload.oss_upload import AliyunOSSUploader
+from pipelines.drug_pipelines import DrugPipeline
+from area_info.city_name_to_id import get_city
+from commons.config import YYC_ACCOUNT
+from Crypto.Util.Padding import unpad
+
+CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
+CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
+
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+# 项目根目录 → spiders/yaoex(与从哪执行脚本无关)
+PROJECT_ROOT = Path(__file__).resolve().parents[2]
+YAOEX_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoex"
+BROWSER_PROFILE_SUBDIR = "chrome_profile"
+SLIDER_OFFSET_FIX = 10
+DETAIL_GET_TIMEOUT = 15
+DETAIL_URL_WAIT = 10
+DETAIL_DOM_WAIT = 8
+DETAIL_NAV_RETRIES = 3
+DETAIL_CONTENT_XPATH = "xpath://div[contains(@class,'yaoex-product-detail__content')]"
+REQUEST_RETRY_COUNT = 3
+REQUEST_TIMEOUT_SEC = 20
+NOT_PRODUCT_BREAK = 15
+
+headers = {
+    "Accept": "application/json, text/plain, */*",
+    "Accept-Language": "zh-CN,zh;q=0.9",
+    "Connection": "keep-alive",
+    "Content-Type": "application/x-www-form-urlencoded",
+    "Origin": "https://mall.yaoex.com",
+    "Referer": "https://mall.yaoex.com/",
+    "Sec-Fetch-Dest": "empty",
+    "Sec-Fetch-Mode": "cors",
+    "Sec-Fetch-Site": "cross-site",
+    "User-Agent": (
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+        "AppleWebKit/537.36 (KHTML, like Gecko) "
+        "Chrome/146.0.0.0 Safari/537.36"
+    ),
+    "X-Request-Agent": "Axios",
+    "X-Requested-With": "XMLHttpRequest",
+    "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
+    "sec-ch-ua-mobile": "?0",
+    "sec-ch-ua-platform": '"Windows"',
+}
+
+
+class YaoexSnapshotCrawl:
+    def __init__(self, drug_dict=None):
+        self.driver = None
+        self.platform = 6
+        self.pipeline = DrugPipeline("yaoex")
+        self.task_dict = drug_dict or {}
+        self.ossuploader = AliyunOSSUploader()
+        self.start_page = 1
+        self.end_page = 1
+        self.account_name = YYC_ACCOUNT.get("username", "yyc_default")
+        self._shop_cache = {}
+        self._register_signal_handler()
+        if self.task_dict:
+            self.get_product_data()
+        self.success = True
+        self.is_not_product = 0
+        self.user_id = YYC_ACCOUNT["user_id"]
+        self.token = YYC_ACCOUNT["token"]
+
+    def get_product_data(self):
+        self.task_id = self.task_dict["id"]
+        self.company_id = self.task_dict["company_id"]
+        self.product = self.task_dict["product_name"]
+        self.product_desc = self.task_dict.get("product_specs", "")
+        self.brand = self.task_dict.get("product_brand", "")
+        self.product_keyword = self.task_dict.get("product_keyword", "")
+        self.collect_task_id = self.task_dict.get("collect_task_id", "")
+        self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
+        self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
+        self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
+        self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
+        self.account_id = self.task_dict.get("collect_equipment_account_id", "")
+        self.collect_region_id = self.task_dict.get("collect_region_id", "")
+        self.collect_round = self.task_dict.get("collect_round", 1)
+        self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
+        self.end_page = max(
+            self.start_page,
+            self._parse_page(self.task_dict.get("end_page"), self.start_page),
+        )
+
+    @staticmethod
+    def _parse_page(value, default=1):
+        try:
+            page = int(value)
+            return page if page >= 1 else default
+        except (TypeError, ValueError):
+            return default
+
+    def _register_signal_handler(self):
+        def handler(signum, frame):
+            logger.info("收到退出信号,正在关闭浏览器...")
+            self._quit_browser()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, handler)
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, handler)
+
+    @staticmethod
+    def _timestamp_ms() -> str:
+        return str(int(time.time() * 1000))
+
+    def _quit_browser(self):
+        if self.driver:
+            try:
+                self.driver.quit()
+            except Exception:
+                pass
+            self.driver = None
+
+    @staticmethod
+    def _get_free_port():
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def _resolve_browser_profile_dir(self):
+        """
+        浏览器数据固定落在 <项目根>/spiders/yaoex/ 下。
+        优先 chrome_profile/<账号>;若旧版直接在 yaoex/<账号> 已有登录态则继续沿用。
+        """
+        preferred = YAOEX_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name
+        legacy_flat = YAOEX_SPIDER_DIR / self.account_name
+        legacy_nested = YAOEX_SPIDER_DIR / "spiders" / "yaoex" / self.account_name
+
+        for candidate in (preferred, legacy_flat, legacy_nested):
+            if (candidate / "Default").is_dir() or (candidate / "Local State").is_file():
+                logger.info("使用已有浏览器配置目录: %s", candidate)
+                return candidate
+
+        preferred.parent.mkdir(parents=True, exist_ok=True)
+        logger.info("新建浏览器配置目录: %s", preferred)
+        return preferred
+
+    def init_browser(self):
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        profile_dir = self._resolve_browser_profile_dir()
+        profile_dir.mkdir(parents=True, exist_ok=True)
+        co.set_user_data_path(str(profile_dir))
+        logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve())
+        co.set_local_port(debug_port)
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_argument("--remote-debugging-address=127.0.0.1")
+        co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--start-maximized")
+        co.set_argument("--no-first-run")
+        co.set_argument("--no-default-browser-check")
+        self.driver = ChromiumPage(co)
+
+    def _is_logged_in(self):
+        return bool(self.driver.ele("xpath=//a[@id='logout']", timeout=5))
+
+    def _call_captcha_api(self, image_bytes):
+        try:
+            b64 = base64.b64encode(image_bytes).decode()
+            resp = requests.post(
+                CAPTCHA_API_URL,
+                json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
+                headers={"Content-Type": "application/json"},
+                timeout=15,
+            ).json()
+            logger.info("验证码 API 返回: %s", resp)
+            return resp["data"]["data"]
+        except Exception as e:
+            logger.exception("验证码识别失败: %s", e)
+            return None
+
+    @staticmethod
+    def _generate_human_track(distance):
+        tracks = []
+        current = 0
+        mid = distance * 0.7
+        t = 0.2
+        v = 0
+        move_points = []
+
+        while current < mid:
+            a = random.uniform(2, 4)
+            v0 = v
+            v = v0 + a * t
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        while current < distance:
+            a = -random.uniform(0.5, 1.5)
+            v0 = v
+            v = v0 + a * t
+            if v < 0.5:
+                v = 0.5
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        total_points = len(move_points)
+        for i, move in enumerate(move_points):
+            y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
+            if i < total_points * 0.3:
+                duration = random.uniform(0.01, 0.03)
+            elif i > total_points * 0.7:
+                duration = random.uniform(0.03, 0.08)
+            else:
+                duration = random.uniform(0.02, 0.05)
+            if random.random() < 0.05:
+                duration += random.uniform(0.05, 0.1)
+            tracks.append((move, y_offset, duration))
+
+        if random.random() < 0.7:
+            tracks.append((-random.randint(1, 3), 0, 0.05))
+        return tracks
+
+    def _simulate_slider_drag(self, slider_element, target_distance):
+        self.driver.actions.move_to(slider_element).hold()
+        for offset_x, offset_y, duration in self._generate_human_track(target_distance):
+            self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
+        self.driver.actions.release()
+
+    def _solve_slider_if_present(self):
+        modal = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
+        if not modal:
+            return True
+
+        logger.info("检测到滑块验证码,开始处理")
+        jpg_bytes = modal.get_screenshot(as_bytes="jpg")
+        distance = self._call_captcha_api(jpg_bytes)
+        if distance is None:
+            return False
+
+        slider = self.driver.ele(
+            "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
+        )
+        if not slider:
+            logger.error("未找到滑块元素")
+            return False
+        self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX)
+        time.sleep(3)
+        return True
+
+    def login(self):
+        self.driver.get("https://mall.yaoex.com/login", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+
+        input_name = self.driver.ele("xpath://input[@name='username']", timeout=5)
+        if not input_name:
+            logger.error("未找到用户名输入框")
+            return False
+        input_name.input(YYC_ACCOUNT["username"])
+        time.sleep(random.uniform(1.2, 2.0))
+
+        input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
+        if not input_pass:
+            logger.error("未找到密码输入框")
+            return False
+        input_pass.input(YYC_ACCOUNT["password"])
+        time.sleep(random.uniform(1.2, 2.0))
+
+        geetest_click = self.driver.ele(
+            "xpath=//div[contains(@class,'geetest_btn_click')]", timeout=3
+        )
+        if geetest_click:
+            geetest_click.click()
+            time.sleep(1.5)
+
+        login_button = self.driver.ele("xpath://input[@id='login-btn']", timeout=5)
+        if not login_button:
+            logger.error("未找到登录按钮")
+            return False
+        login_button.click()
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        if not self._solve_slider_if_present():
+            return False
+
+        return self._is_logged_in()
+
+    def decrypt_price(self, ciphertext_b64):
+        if not ciphertext_b64 or not str(ciphertext_b64).strip():
+            return ""
+        _KEY_FIXED = "GDLSAUO1KUMIIBCE"
+        if not self.user_id:
+            key = _KEY_FIXED.encode("utf-8")
+        else:
+            uid = str(self.user_id)[:6].rjust(6, "0")
+            key = (_KEY_FIXED[:10] + uid).encode("utf-8")
+
+        raw = base64.b64decode(ciphertext_b64.strip())
+        cipher = AES.new(key, AES.MODE_ECB)
+        plain = unpad(cipher.decrypt(raw), AES.block_size)
+        return plain.decode("utf-8")
+
+    def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC):
+        last_err = None
+        for attempt in range(1, retries + 1):
+            try:
+                resp = requests.post(
+                    url,
+                    headers=headers,
+                    data=payload,
+                    timeout=timeout,
+                )
+                resp.raise_for_status()
+                return resp
+            except Exception as e:
+                last_err = e
+                if attempt < retries:
+                    logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e)
+                    time.sleep(min(2 * attempt, 5))
+                else:
+                    logger.error("请求失败,已达最大重试次数(%s): %s", retries, e)
+        raise last_err
+
+    def _shop_payload(self, enterprise_id):
+        return {
+            "traderName": "yaoex_pc",
+            "trader": "pc",
+            "closesignature": "yes",
+            "signature_method": "md5",
+            "signature": "****",
+            "timestamp": self._timestamp_ms(),
+            "token": self.token,
+            "userToken": self.token,
+            "enterpriseId": enterprise_id,
+        }
+
+    def _list_payload(self, keyword, page):
+        return {
+            "traderName": "yaoex_pc",
+            "trader": "pc",
+            "closesignature": "yes",
+            "signature_method": "md5",
+            "signature": "****",
+            "timestamp": self._timestamp_ms(),
+            "token": self.token,
+            "userToken": self.token,
+            "userId": self.user_id,
+            "roleId": "101",
+            "userType": "下游客户",
+            "buyerCode": self.user_id,
+            "nowPage": str(page),
+            "per": "20",
+            "keyword": keyword,
+            "catSearchId": "",
+            "specs": "",
+            "factoryIds": "",
+            "sellerCodes": "",
+            "sellerFilterMode": "0",
+            "sortColumn": "default",
+            "sortMode": "default",
+            "ver": "1",
+            "stock_mode": "1",
+            "showExtendCard": "true",
+            "needDinnerPrice": "true",
+            "limitStart": "",
+            "limitEnd": "",
+            "deadLineStart": "",
+            "deadLineEnd": "",
+            "filterDtos": "",
+            "showWholePurchase": "true",
+        }
+
+    def fetch_list_page(self, keyword, page):
+        list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList"
+        resp = self._post_with_retry(list_url, self._list_payload(keyword, page))
+        return resp.json().get("data", {}).get("shopProducts", []) or []
+
+    def fetch_shop(self, seller_code):
+        detail_url = "https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification"
+        resp = self._post_with_retry(detail_url, self._shop_payload(seller_code))
+        shop_res = resp.json().get("data", {})
+        base_info = shop_res.get("baseInfo", {})
+        return base_info.get("address", ""), base_info.get("enterpriseName", "")
+
+    def _get_shop_info(self, seller_code):
+        if seller_code in self._shop_cache:
+            return self._shop_cache[seller_code]
+        try:
+            shop_info = self.fetch_shop(seller_code)
+        except Exception as e:
+            logger.warning("fetch_shop 失败 seller_code=%s: %s", seller_code, e)
+            shop_info = ("", "")
+        self._shop_cache[seller_code] = shop_info
+        return shop_info
+
+    def _current_url(self):
+        try:
+            return self.driver.url or ""
+        except Exception:
+            return ""
+
+    def _url_has_product(self, spu_code, seller_code):
+        url = self._current_url()
+        spu_code = str(spu_code or "")
+        seller_code = str(seller_code or "")
+        if spu_code and seller_code:
+            return spu_code in url and seller_code in url
+        return bool(spu_code and spu_code in url)
+
+    def _wait_detail_ready(self, spu_code, seller_code, timeout=DETAIL_URL_WAIT):
+        deadline = time.time() + timeout
+        while time.time() < deadline:
+            if self._url_has_product(spu_code, seller_code):
+                if self.driver.ele(DETAIL_CONTENT_XPATH, timeout=1):
+                    time.sleep(0.3)
+                    return True
+            time.sleep(0.4)
+        return False
+
+    def _build_detail_url(self, item):
+        if not item.get("productId") and item.get("groupBuyProductDto"):
+            item = item.get("groupBuyProductDto") or {}
+            spu_code = item.get("spuCode", "")
+            seller_code = item.get("sellerCode", "")
+            group_buying_id = item.get("groupBuyingId", "")
+            p_json = json.dumps(
+                {"id": group_buying_id, "s": seller_code, "sp": spu_code},
+                separators=(",", ":"),
+            )
+            detail_url = (
+                f"https://mall.yaoex.com/groupBuying/#/productDetail?p={quote(p_json)}"
+            )
+        else:
+            seller_code = item.get("sellerCode")
+            spu_code = item.get("spuCode")
+            detail_url = (
+                f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}"
+            )
+        return item, detail_url, spu_code, seller_code
+
+    def _goto_detail_page(self, detail_url, spu_code, seller_code):
+        """get 后 refresh 一次,让 SPA 按当前 URL 重新渲染详情。"""
+        for attempt in range(1, DETAIL_NAV_RETRIES + 1):
+            try:
+                self.driver.get(detail_url, timeout=DETAIL_GET_TIMEOUT)
+                time.sleep(0.5)
+                self.driver.refresh()
+                time.sleep(2)
+                return True
+            except Exception as e:
+                logger.warning(
+                    "跳转详情异常 spu=%s seller=%s attempt=%s: %s",
+                    spu_code, seller_code, attempt, e,
+                )
+                time.sleep(random.uniform(0.8, 1.5))
+        return False
+
+    def _take_snapshot(self, upload_key):
+        time.sleep(1)
+        try:
+            detail_ele = self.driver.ele(DETAIL_CONTENT_XPATH, timeout=2)
+            if detail_ele:
+                jpg_bytes = detail_ele.get_screenshot(as_bytes="jpg")
+            else:
+                jpg_bytes = self.driver.get_screenshot(as_bytes="jpg")
+            if not jpg_bytes:
+                logger.warning("截图为空 upload_key=%s", upload_key)
+                return ""
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
+        except Exception:
+            logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
+            return ""
+        if not img_url:
+            logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
+            return ""
+        logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
+        time.sleep(random.uniform(1, 2))
+        return img_url
+
+    def parse_product(self, item, detail_url, snap_url):
+        seller_code = item.get("sellerCode")
+        spu_code = item.get("spuCode")
+        name_part = (item.get("productName") or "").strip()
+        short_part = (item.get("shortName") or "").strip()
+        product_name = f"{name_part} {short_part}".strip()
+
+        shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home"
+        company_address, company_name = self._get_shop_info(seller_code)
+
+        address = item.get("cityName", "")
+        city_id = province_id = city = province = ""
+        if address:
+            city_id, province_id, city, province = get_city(address.split("市")[0])
+
+        price = self.decrypt_price(item.get("price"))
+        hash_text = f"{seller_code}_{spu_code}_{price}"
+        item_id = hashlib.md5(hash_text.encode("utf-8")).hexdigest()
+
+        is_sold_out = 1 if "商品已售罄" in (item.get("statusDescription") or "") else 0
+
+        shop_name = item.get("storeName") or item.get("shopName")
+        anonymous_store_name = ""
+        if shop_name == "预约配送中心":
+            anonymous_store_name = item.get("supplyName", "")
+
+        inventory = item.get("currentInventory") or item.get("stockCount")
+        now = time.strftime("%Y-%m-%d %H:%M:%S")
+
+        return {
+            "platform": self.platform,
+            "item_id": item_id,
+            "enterprise_id": self.company_id,
+            "product_name": product_name,
+            "spec": item.get("spec"),
+            "one_price": "",
+            "detail_url": detail_url,
+            "shop_name": shop_name,
+            "anonymous_store_name": anonymous_store_name,
+            "shop_url": shop_url,
+            "city_name": city,
+            "city_id": city_id,
+            "province_name": province,
+            "province_id": province_id,
+            "shipment_city_name": "",
+            "shipment_city_id": "",
+            "shipment_province_name": "",
+            "shipment_province_id": "",
+            "area_info": company_address or "",
+            "factory_name": item.get("factoryName"),
+            "scrape_date": time.strftime("%Y-%m-%d"),
+            "price": price,
+            "sales": "",
+            "stock_count": inventory,
+            "snapshot_url": snap_url,
+            "approval_num": item.get("approvalNum"),
+            "produced_time": item.get("productionTime"),
+            "deadline": item.get("deadLine"),
+            "update_time": now,
+            "insert_time": now,
+            "number": 1,
+            "product_brand": self.brand or "",
+            "collect_task_id": self.collect_task_id,
+            "search_name": self.product,
+            "company_name": company_name,
+            "collect_config_info": json.dumps(
+                {
+                    "sampling_cycle": self.sampling_cycle,
+                    "sampling_start_time": self.sampling_start_time,
+                    "sampling_end_time": self.sampling_end_time,
+                }
+            ),
+            "account_id": self.account_id,
+            "collect_region_id": self.collect_region_id,
+            "collect_round": self.collect_round,
+            "is_sold_out": is_sold_out,
+        }
+
+    def search(self):
+        self.driver.get("https://mall.yaoex.com/", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+
+        if not self._is_logged_in():
+            if not self.login():
+                logger.error("登录失败")
+                return False
+
+        keyword = self.product
+        if self.brand:
+            keyword = (self.brand + " " + self.product).strip()
+        if self.product_desc:
+            keyword = (keyword + " " + self.product_desc).strip()
+
+        for page in range(self.start_page, self.end_page + 1):
+            logger.info("正在爬取 %s %s,第%s页", self.brand, self.product, page)
+            page_items = self.fetch_list_page(keyword=keyword, page=page)
+            if not page_items:
+                logger.info("第%s页无数据,停止", page)
+                break
+
+            for item in page_items:
+                item, detail_url, spu_code, seller_code = self._build_detail_url(item)
+
+                name_part = (item.get("productName") or "").strip()
+                short_part = (item.get("shortName") or "").strip()
+                product_name = f"{name_part} {short_part}".strip()
+
+                if self.product not in product_name:
+                    self.is_not_product += 1
+                    continue
+                if self.brand not in product_name:
+                    self.is_not_product += 1
+                    continue
+                self.is_not_product = 0
+
+                if not self._goto_detail_page(detail_url, spu_code, seller_code):
+                    logger.warning(
+                        "详情页跳转失败,跳过 spu=%s seller=%s url=%s",
+                        spu_code, seller_code, detail_url,
+                    )
+                    continue
+
+                upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest()
+                snap_url = self._take_snapshot(upload_key)
+                product = self.parse_product(item, detail_url, snap_url)
+
+                if not product.get("item_id"):
+                    continue
+
+                try:
+                    self.pipeline.storge_data(product)
+                    logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+                except Exception as e:
+                    logger.exception("写入数据库失败: %s", e)
+
+                time.sleep(random.uniform(1, 2))
+
+            if self.is_not_product > NOT_PRODUCT_BREAK:
+                logger.info("连续不匹配商品过多,停止搜索")
+                break
+            time.sleep(random.uniform(1, 3))
+
+        return True
+
+    def run(self):
+        try:
+            self.init_browser()
+            self.search()
+        except Exception as e:
+            logger.exception("运行异常: %s", e)
+        finally:
+            self._quit_browser()

+ 440 - 73
spiders/yaoshibang/ysb_snapshot_crawl.py

@@ -1,33 +1,146 @@
+import base64
+import hashlib
+import json
+import math
 import random
+import re
 import signal
 import socket
 import sys
 import time
-import base64
-import math
+import zlib
+from pathlib import Path
 import requests
+import secrets
+import string
+from Crypto.Cipher import AES
 from commons.conn_mysql import MySQLPoolOnline
 from DrissionPage import ChromiumPage, ChromiumOptions
 from commons.Logger import logger
 from oss_upload.oss_upload import AliyunOSSUploader
-from commons.config import YSB_ACCOUNT, YSB_PASSWORD
+from commons.config import YSB_ACCOUNT
+from pipelines.drug_pipelines import DrugPipeline
+from datetime import datetime, timedelta
+from area_info.city_name_to_id import get_city
+
 CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
 CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
 
 SLIDER_OFFSET_FIX = 10
+DETAIL_GET_TIMEOUT = 15
+DETAIL_URL_WAIT = 10
+DETAIL_DOM_WAIT = 8
+DETAIL_NAV_RETRIES = 3
+DETAIL_APPROVAL_XPATH = (
+    'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]'
+)
 
 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+PROJECT_ROOT = Path(__file__).resolve().parents[2]
+YSB_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoshibang"
+BROWSER_PROFILE_SUBDIR = "chrome_profile"
+
+headers = {
+    "Accept": "*/*",
+    "Accept-Language": "zh-CN,zh;q=0.9",
+    "Connection": "keep-alive",
+    "Content-Type": "application/json",
+    "Origin": "https://dian.ysbang.cn",
+    "Referer": "https://dian.ysbang.cn/",
+    "Sec-Fetch-Dest": "empty",
+    "Sec-Fetch-Mode": "cors",
+    "Sec-Fetch-Site": "same-origin",
+    "User-Agent": (
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
+        "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
+    ),
+    "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
+    "sec-ch-ua-mobile": "?0",
+    "sec-ch-ua-platform": '"Windows"',
+}
+
+
+def pkcs7_unpad(data):
+    if not data:
+        raise ValueError("Empty data for PKCS7 unpad")
+    pad_len = data[-1]
+    if pad_len < 1 or pad_len > 16:
+        raise ValueError("Invalid PKCS7 padding length")
+    if data[-pad_len:] != bytes([pad_len]) * pad_len:
+        raise ValueError("Invalid PKCS7 padding bytes")
+    return data[:-pad_len]
+
+
+def derive_ysb_key():
+    base = "BhCLxFfFhd12K4qRGPfy"
+    md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
+    return md5_hex[:16].upper().encode("utf-8")
+
+
+def decrypt_ysb_payload(cipher_text_b64):
+    """解密药师帮列表接口 data.o 字段,返回 JSON 对象。"""
+    key = derive_ysb_key()
+    cipher_bytes = base64.b64decode(cipher_text_b64)
+    cipher = AES.new(key, AES.MODE_ECB)
+    decrypted = cipher.decrypt(cipher_bytes)
+    unpadded = pkcs7_unpad(decrypted)
+    json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
+    return json.loads(json_bytes.decode("utf-8"))
 
 
 class YaoShiBangSnapshot:
-    def __init__(self, product=None):
-        self.product = product
+    def __init__(self, drug_dict=None):
         self.driver = None
-        self.account_name = "ysbang_1"
+
+        self.db = MySQLPoolOnline()
+        self.ip = None
+        self.login_username = None
+        self.login_password = None
         self.platform = 5
-        self.db_online = MySQLPoolOnline()
+        self.pipeline = DrugPipeline("ysb")
+        self.task_dict = drug_dict or {}
         self.ossuploader = AliyunOSSUploader()
+        self.start_page = 1
+        self.end_page = 1
+        self.account_name = YSB_ACCOUNT.get("username", "ysb_default")
         self._register_signal_handler()
+        if self.task_dict:
+            self.get_product_data()
+        self.success = True
+        self.is_no_prodcut = 0
+        self.is_product_count = 0
+        self.token = ""
+        self._state_value = ""
+        self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
+
+    def get_product_data(self):
+        self.task_id = self.task_dict["id"]
+        self.company_id = self.task_dict["company_id"]
+        self.product = self.task_dict["product_name"]
+        self.product_desc = self.task_dict.get("product_specs", "")
+        self.brand = self.task_dict.get("product_brand", "")
+        self.product_keyword = self.task_dict.get("product_keyword", "")
+        self.collect_task_id = self.task_dict.get("collect_task_id", "")
+        self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
+        self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
+        self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
+        self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
+        self.account_id = self.task_dict.get("collect_equipment_account_id", "")
+        self.collect_region_id = self.task_dict.get("collect_region_id", "")
+        self.collect_round = self.task_dict.get("collect_round", 1)
+        self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
+        self.end_page = max(
+            self.start_page,
+            self._parse_page(self.task_dict.get("end_page"), self.start_page),
+        )
+
+    @staticmethod
+    def _parse_page(value, default=1):
+        try:
+            page = int(value)
+            return page if page >= 1 else default
+        except (TypeError, ValueError):
+            return default
 
     def _register_signal_handler(self):
         def handler(signum, frame):
@@ -54,16 +167,38 @@ class YaoShiBangSnapshot:
             s.bind(("127.0.0.1", 0))
             return s.getsockname()[1]
 
+    def _resolve_browser_profile_dir(self):
+        """
+        浏览器数据固定落在 <项目根>/spiders/yaoshibang/ 下。
+        优先 chrome_profile/<账号>;若旧版目录已有登录态则继续沿用。
+        """
+        preferred = YSB_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name
+        legacy_flat = YSB_SPIDER_DIR / self.account_name
+        legacy_nested = YSB_SPIDER_DIR / "spiders" / "yaoshibang" / self.account_name
+
+        for candidate in (preferred, legacy_flat, legacy_nested):
+            if (candidate / "Default").is_dir() or (candidate / "Local State").is_file():
+                logger.info("使用已有浏览器配置目录: %s", candidate)
+                return candidate
+
+        preferred.parent.mkdir(parents=True, exist_ok=True)
+        logger.info("新建浏览器配置目录: %s", preferred)
+        return preferred
+
     def init_browser(self):
         co = ChromiumOptions().set_browser_path(chrome_path)
         debug_port = self._get_free_port()
-        co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
+        profile_dir = self._resolve_browser_profile_dir()
+        profile_dir.mkdir(parents=True, exist_ok=True)
+        co.set_user_data_path(str(profile_dir))
+        logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve())
 
         co.set_local_port(debug_port)
         co.set_argument(f"--remote-debugging-port={debug_port}")
         co.set_argument("--remote-debugging-address=127.0.0.1")
         # co.set_argument("--disable-blink-features=AutomationControlled")
         co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--start-maximized")
         co.set_argument("--no-first-run")  # 避免首次运行弹窗
         co.set_argument("--no-default-browser-check")  # 避免默认浏览器检查
         self.driver = ChromiumPage(co)
@@ -102,7 +237,7 @@ class YaoShiBangSnapshot:
         if not math.isfinite(drag_distance) or drag_distance <= 0:
             logger.error("滑块距离无效: %s", drag_distance)
             return False
-        self._simulate_slider_drag(slider, drag_distance)
+        self._simulate_slider_drag(slider, drag_distance - 5)
         time.sleep(3)
         return True
 
@@ -206,11 +341,41 @@ class YaoShiBangSnapshot:
     def _is_logged_in(self):
         # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
         title = self.driver.ele(
-            "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]",
+            "xpath=//span[@class='logout']",
             timeout=5,
         )
         return bool(title)
 
+    def _current_url(self):
+        try:
+            return self.driver.url or ""
+        except Exception:
+            return ""
+
+    def _goto_detail_page(self, item_id, detail_url):
+        """get 后 refresh 一次,让 SPA 按当前 URL 重新渲染详情。"""
+        for attempt in range(1, DETAIL_NAV_RETRIES + 1):
+            try:
+                self.driver.get(detail_url, timeout=5)
+                time.sleep(1.5)
+                eles = self.driver.eles("xpath=//div[@class='y-dialog']//button[contains(text(),'确认')]", timeout=3)
+                if len(eles) == 2:
+                    eles[1].click()
+                    time.sleep(1)
+                    self.driver.refresh()
+
+                time.sleep(1.5)
+                if str(item_id) in self.driver.url:
+                    return True
+
+            except Exception as e:
+                logger.warning(
+                    "跳转详情异常 item_id=%s attempt=%s: %s",
+                    item_id, attempt, e,
+                )
+                time.sleep(random.uniform(0.8, 1.5))
+        return False
+
     def login(self):
         logger.info("开始登录药师帮")
         self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
@@ -221,14 +386,14 @@ class YaoShiBangSnapshot:
         if not input_name:
             logger.error("未找到账号输入框")
             return False
-        input_name.input(YSB_ACCOUNT)
+        input_name.input(YSB_ACCOUNT["username"])
         time.sleep(random.uniform(1.5, 2.5))
 
         input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
         if not input_pass:
             logger.error("未找到密码输入框")
             return False
-        input_pass.input(YSB_PASSWORD)
+        input_pass.input(YSB_ACCOUNT["password"])
         time.sleep(random.uniform(1.5, 2.5))
 
         login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
@@ -249,35 +414,83 @@ class YaoShiBangSnapshot:
         logger.error("登录后未检测到目标店铺名,登录可能失败")
         return False
 
-    def get_snapshot(self, detail_url, row_id):
-        self.driver.get(detail_url, timeout=15)
-        self.driver.wait.doc_loaded(timeout=10)
-        time.sleep(2)
+    def _take_snapshot(self, upload_key):
+        """在当前页面截图并上传,不再重复跳转。"""
+        time.sleep(1)
         self._dismiss_popup_before_screenshot()
-
-        ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8)
-        if not ele:
-            ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5)
-        if not ele:
-            logger.warning("未找到详情区域元素,跳过截图 row_id=%s", row_id)
-            return ""
-
         try:
-            jpg_bytes = ele.get_screenshot(as_bytes="jpg")
+            jpg_bytes = self.driver.get_screenshot(as_bytes="jpg")
             if not jpg_bytes:
-                logger.warning("截图为空 row_id=%s", row_id)
+                logger.warning("截图为空 upload_key=%s", upload_key)
                 return ""
-            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(row_id))
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
         except Exception:
-            logger.exception("截图或 OSS 上传失败 row_id=%s url=%s", row_id, detail_url)
+            logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
             return ""
         if not img_url:
-            logger.warning("OSS 未返回有效地址 row_id=%s", row_id)
+            logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
             return ""
-        logger.info("截图上传完成 row_id=%s url=%s", row_id, img_url)
-        time.sleep(random.uniform(0.5, 1.5))
+        logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
+        time.sleep(random.uniform(1, 2))
         return img_url
 
+    def gen_pair(self, ex1_len=9, o_raw_len=16):
+        alphabet = string.ascii_lowercase + string.digits
+        ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
+        o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
+        return {"ex1": ex1, "o": o}
+
+    def build_base_payload(self, keyword, page, first_search):
+        date_str = time.strftime("%Y-%m-%d %H:%M:%S")
+        return {
+            "platform": "pc",
+            "version": "6.0.0",
+            "ua": "Chrome146",
+            'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
+            "trafficType": 1,
+            "ex1": "",
+            "o": "",
+            "lastClick": -1,
+            "page": page,
+            "pagesize": "60",
+            "classify_id": "",
+            "searchkey": keyword,
+            "onlyTcm": 0,
+            "operationtype": 1,
+            "qualifiedLoanee": 0,
+            "drugId": -1,
+            "tagId": "",
+            "showRecentlyPurchasedFlag": True,
+            "onlySimpleLoan": 0,
+            "sn": "",
+            "buttons": [],
+            "buttonList": [],
+            "synonymId": 0,
+            "activityTypes": [],
+            "provider_filter": "",
+            "factoryNames": "",
+            "tcmGradeNames": [],
+            "tcmExeStandardIds": [],
+            "specs": "",
+            "deliverFloor": 0,
+            "purchaseLimitFloor": 0,
+            "nextRequestKey": "",
+            "adConfigId": 0,
+            "stateValue": self._state_value,
+            "firstSearch": first_search,
+            "token": self.token,
+        }
+
+    @staticmethod
+    def _extract_state_value(json_data, data_block):
+        for src in (json_data, data_block):
+            if not isinstance(src, dict):
+                continue
+            val = src.get("stateValue") or src.get("state_value")
+            if val:
+                return str(val)
+        return None
+
     def _dismiss_popup_before_screenshot(self):
         """截图前关闭或隐藏营销弹窗,避免遮挡。"""
         close_locs = [
@@ -324,19 +537,106 @@ class YaoShiBangSnapshot:
         except Exception:
             pass
 
-    def _save_snapshot_url(self, row_id, img_url):
-        """上传成功后回写库,避免下次任务重复拉取同一批。"""
-        if row_id is None or not img_url:
-            return
-        sql = (
-            "UPDATE `retrieve_process_lowprice_product` "
-            "SET `snapshot_url` = %s WHERE `id` = %s AND `platform` = %s"
+    def to_product(self, item):
+
+        now = time.strftime("%Y-%m-%d %H:%M:%S")
+        item_id = item.get("wholesaleid", "")
+        provider_id = item.get("providerId", "")
+
+        city_str = item.get("warehouseCity", "")
+        city_id = province_id = city = province = ""
+        price = item.get("disPrice", "")
+
+        if not price:
+            price = item.get("minprice", "")
+        if not price:
+            price = item.get("price", "")
+
+        shop_name = item.get("provider_name", "")
+        if not shop_name:
+            shop_name = item.get("abbreviation", "")
+
+        product = {
+            "platform": self.platform,
+            "item_id": item_id,
+            "enterprise_id": self.company_id,
+            "product_name": item.get("drugname", ""),
+            "spec": item.get("specification", ""),
+            "one_price": '',
+            "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
+            "shop_name": shop_name,
+            "anonymous_store_name": "",
+            "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
+            "city_name": city,
+            "city_id": city_id,
+            "province_name": province,
+            "province_id": province_id,
+            "area_info": "",
+            "factory_name": item.get("manufacturer", ""),
+            "scrape_date": time.strftime("%Y-%m-%d"),
+            "price": price,
+            "sales": "",
+            "stock_count": item.get("stockAvailable", ""),
+            "snapshot_url": "",
+            "approval_num": "",
+            "produced_time": item.get("prodDate", ""),
+            "deadline": item.get("valid_date", ""),
+            "update_time": now,
+            "insert_time": now,
+            "number": 1,
+            "product_brand": self.brand or "",
+            "collect_task_id": self.collect_task_id,
+            "search_name": self.product,
+            "company_name": "",
+            "collect_config_info": json.dumps(
+                {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
+                 "sampling_end_time": self.sampling_end_time}),
+            "account_id": self.account_id,
+            "collect_region_id": self.collect_region_id,
+            "collect_round": self.collect_round,
+            "is_sold_out": 0
+        }
+        return product
+
+    def parse_detail(self, product):
+        appvolnum_ele = self.driver.ele(
+            'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]/following-sibling::span[1]')
+        appvolnum_value = appvolnum_ele.text if appvolnum_ele else ""
+        price = ""
+        discount_ele = self.driver.ele(
+            'xpath://div[@class="sale-info-wrap"]//div[@class="tooltip-content"]',
+            timeout=2,
+        )
+        discount_value = discount_ele.text if discount_ele else ""
+        if not price and discount_value:
+            price_re = re.search(r"¥([0-9.]+)", discount_value)
+            if price_re:
+                price = price_re.group(1).strip()
+
+        current_ele = self.driver.ele(
+            'xpath://div[@class="sale-info-wrap"]//span[contains(@class,"current-price")]',
+            timeout=3,
+        )
+
+        if current_ele and not price:
+            price = (current_ele.text or "").replace("¥", "").strip()
+
+        list_price = product.get("price", "")
+        if price:
+            product["price"] = price
+
+        if appvolnum_value:
+            product["approval_num"] = appvolnum_value
+        logger.info(
+            "详情解析 wholesaleid=%s list_price=%s dom_price=%s url=%s",
+            product.get("item_id"),
+            list_price,
+            product.get("price"),
+            self._current_url(),
         )
-        n = self.db_online.execute(sql, (img_url, row_id, self.platform))
-        if n <= 0:
-            logger.warning("snapshot_url 回写未影响行数 id=%s platform=%s", row_id, self.platform)
+        return product
 
-    def search(self, data_list):
+    def search(self):
         self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
         self.driver.wait.doc_loaded(timeout=10)
         time.sleep(2)
@@ -344,44 +644,111 @@ class YaoShiBangSnapshot:
         if not self._is_logged_in():
             if not self.login():
                 return False
+        cookies_list = self.driver.cookies()
+        cookies_dict = {c['name']: c['value'] for c in cookies_list}
+        self.token = cookies_dict.get("Token") or cookies_dict.get("token")
+
+        keyword = self.product
+        if self.brand:
+            keyword = (self.brand + " " + self.product).strip()
+        if self.product_desc:
+            keyword = (keyword + " " + self.product_desc).strip()
+
+        self._state_value = ""
+        for page in range(1, 100):
+            first_search = page == 1
+            logger.info("药师帮爬取第%s页 firstSearch=%s stateValue=%s", page, first_search,
+                        self._state_value or "(空)")
+            pair = self.gen_pair()
+            payload = self.build_base_payload(keyword, page=page, first_search=first_search)
+            payload["ex1"] = pair["ex1"]
+            payload["o"] = pair["o"]
+
+            response = None
+            for attempt in range(3):
+                try:
+                    response = requests.post(
+                        "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270", headers=headers,
+                        json=payload, timeout=30
+                    )
+                    if response.status_code == 200:
+                        break
+                except Exception as e:
+                    logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e)
+                    response = None
+                    time.sleep(10)
+            if not response or response.status_code != 200:
+                logger.error("第%s页请求失败,停止爬取", page)
+                return False
 
-        ok, fail = 0, 0
-        for data in data_list:
-            row_id = data.get("id")
-            link_url = data.get("link_url")
-            if not link_url:
-                logger.warning("缺少 link_url,跳过 id=%s", row_id)
-                fail += 1
+            try:
+                data_json = response.json()
+            except json.JSONDecodeError:
+                logger.exception("第%s页响应不是合法 JSON", page)
+                return False
+            data_block = data_json.get("data") or {}
+            if str(data_json.get("message", "")) == "该操作需要登录":
+                logger.warning("第%s页需要登录,请检查浏览器登录态", page)
+                return False
+
+            encrypted_o = data_block.get("o")
+            if not encrypted_o:
+                logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
+                break
+
+            try:
+                json_data = decrypt_ysb_payload(encrypted_o)
+            except Exception as e:
+                logger.exception("第%s页解密失败: %s", page, e)
                 continue
-            img_url = self.get_snapshot(link_url, row_id)
-            if img_url:
-                self._save_snapshot_url(row_id, img_url)
-                ok += 1
-            else:
-                fail += 1
-        logger.info("快照任务结束 成功=%s 失败=%s 总计=%s", ok, fail, len(data_list))
-        return ok > 0
 
-    def run(self):
-        date_str = time.strftime("%Y-%m-%d")
-        sql = """
-             SELECT `id`,`link_url` FROM `retrieve_process_lowprice_product` 
-            WHERE `platform`=%s AND `snapshot_url` IS NULL AND `scrape_date`=%s 
-            LIMIT 100 """
-
-        data_list = self.db_online.select_data(sql, (self.platform, date_str))
-        if not data_list:
-            logger.info("当前不需要更新快照")
-            return
+            state_val = self._extract_state_value(json_data, data_block)
+            if state_val:
+                self._state_value = state_val
+
+            wholesales = json_data.get("wholesales", [])
+            if not wholesales:
+                logger.info(f"第{page}页无数据,停止")
+                break
+
+            for item in wholesales:
+                item_id = item.get("wholesaleid", "")
+                if not item_id:
+                    continue
+
+                detail_url = (
+                    f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1"
+                )
+                product = self.to_product(item)
+                title = product.get("product_name", "")
+                if self.brand not in title:
+                    self.is_product_count += 1
+                    continue
+                if self.product not in title:
+                    self.is_product_count += 1
+                    continue
+                if self.product in title and self.brand in title:
+                    self.is_product_count = 0
+                if self.is_product_count >= 20:
+                    return False
+
+                self._goto_detail_page(item_id, detail_url)
+
+                product = self.parse_detail(product)
+                upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest()
+                product["snapshot_url"] = self._take_snapshot(upload_key)
+
+                try:
+                    self.pipeline.storge_data(product)
+                    logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+                except Exception as e:
+                    logger.exception("写入数据库失败: %s", e)
 
+    def run(self):
         try:
             self.init_browser()
-            self.search(data_list)
+            self.search()
         except Exception as e:
             logger.exception("运行异常: %s", e)
         finally:
             self._quit_browser()
-
-
-if __name__ == "__main__":
-    YaoShiBangSnapshot().run()

+ 386 - 0
spiders/yaoshibang/ysb_snapshot_crawl_bak.py

@@ -0,0 +1,386 @@
+import random
+import signal
+import socket
+import sys
+import time
+import base64
+import math
+import requests
+from commons.conn_mysql import MySQLPoolOnline
+from DrissionPage import ChromiumPage, ChromiumOptions
+from commons.Logger import logger
+from oss_upload.oss_upload import AliyunOSSUploader
+from commons.config import YSB_ACCOUNT
+
+CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
+CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
+
+SLIDER_OFFSET_FIX = 10
+
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+
+
+class YaoShiBangSnapshot:
+    def __init__(self, product=None):
+        self.product = product
+        self.driver = None
+        self.account_name = "ysbang_1"
+        self.platform = 5
+        self.db_online = MySQLPoolOnline()
+        self.ossuploader = AliyunOSSUploader()
+        self._register_signal_handler()
+
+    def _register_signal_handler(self):
+        def handler(signum, frame):
+            logger.info("收到退出信号,正在关闭浏览器...")
+            self._quit_browser()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, handler)
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, handler)
+
+    def _quit_browser(self):
+        if self.driver:
+            try:
+                self.driver.quit()
+            except Exception:
+                pass
+            self.driver = None
+
+    @staticmethod
+    def _get_free_port():
+        """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def init_browser(self):
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
+
+        co.set_local_port(debug_port)
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_argument("--remote-debugging-address=127.0.0.1")
+        # co.set_argument("--disable-blink-features=AutomationControlled")
+        co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--start-maximized")
+        co.set_argument("--no-first-run")  # 避免首次运行弹窗
+        co.set_argument("--no-default-browser-check")  # 避免默认浏览器检查
+        self.driver = ChromiumPage(co)
+
+    def _solve_slider_captcha(self):
+        """检测并处理易盾滑块验证码,成功返回 True。"""
+        self.driver.wait.doc_loaded()
+        time.sleep(2)
+
+        yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
+        if not yidun:
+            return True
+
+        logger.info("检测到滑块验证码,开始处理")
+        jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
+
+        distance = self._call_captcha_api(jpg_bytes)
+        if distance is None:
+            logger.error("验证码识别失败")
+            return False
+
+        logger.info("滑块距离: %s", distance)
+        slider = self.driver.ele(
+            "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
+        )
+        if not slider:
+            logger.error("未找到滑块元素")
+            return False
+
+        try:
+            drag_distance = float(distance) + SLIDER_OFFSET_FIX
+        except (TypeError, ValueError):
+            logger.error("滑块距离非数字: %r", distance)
+            return False
+
+        if not math.isfinite(drag_distance) or drag_distance <= 0:
+            logger.error("滑块距离无效: %s", drag_distance)
+            return False
+        self._simulate_slider_drag(slider, drag_distance)
+        time.sleep(3)
+        return True
+
+    def _call_captcha_api(self, image_bytes):
+        """调用云码平台识别滑块距离,失败返回 None。"""
+        try:
+            b64 = base64.b64encode(image_bytes).decode()
+            resp = requests.post(
+                CAPTCHA_API_URL,
+                json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
+                headers={"Content-Type": "application/json"},
+                timeout=15,
+            ).json()
+            logger.info("验证码 API 返回: %s", resp)
+            if not isinstance(resp, dict):
+                return None
+            data = resp.get("data")
+            if isinstance(data, dict):
+                dist = data.get("data")
+            else:
+                dist = data
+            if dist is None:
+                logger.error("验证码 API 未返回距离字段: %s", resp)
+                return None
+            try:
+                d = float(dist)
+            except (TypeError, ValueError):
+                logger.error("验证码距离无法解析为数字: %r", dist)
+                return None
+            if not math.isfinite(d):
+                logger.error("验证码距离非有限数值: %r", dist)
+                return None
+            return d
+        except Exception as e:
+            logger.exception("验证码 API 调用失败: %s", e)
+            return None
+
+    @staticmethod
+    def _generate_human_track(distance):
+        try:
+            distance = float(distance)
+        except (TypeError, ValueError):
+            return []
+        if distance <= 0 or not math.isfinite(distance):
+            return []
+        tracks = []
+        current = 0
+        mid = distance * 0.7
+        t = 0.2
+        v = 0
+        move_points = []
+
+        while current < mid:
+            a = random.uniform(2, 4)
+            v0 = v
+            v = v0 + a * t
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        while current < distance:
+            a = -random.uniform(0.5, 1.5)
+            v0 = v
+            v = v0 + a * t
+            if v < 0.5:
+                v = 0.5
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        total_points = len(move_points)
+        for i, move in enumerate(move_points):
+            y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
+
+            if i < total_points * 0.3:
+                duration = random.uniform(0.01, 0.03)
+            elif i > total_points * 0.7:
+                duration = random.uniform(0.03, 0.08)
+            else:
+                duration = random.uniform(0.02, 0.05)
+
+            if random.random() < 0.05:
+                duration += random.uniform(0.05, 0.1)
+
+            tracks.append((move, y_offset, duration))
+
+        if random.random() < 0.7:
+            tracks.append((-random.randint(1, 3), 0, 0.05))
+
+        return tracks
+
+    def _simulate_slider_drag(self, slider_element, target_distance):
+        if target_distance <= 0:
+            logger.warning("滑块目标距离无效: %s", target_distance)
+            return
+        self.driver.actions.move_to(slider_element).hold()
+        for offset_x, offset_y, duration in self._generate_human_track(target_distance):
+            self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
+        self.driver.actions.release()
+
+    def _is_logged_in(self):
+        # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
+        title = self.driver.ele(
+            "xpath=//span[@class='logout']",
+            timeout=5,
+        )
+        return bool(title)
+
+    def login(self):
+        logger.info("开始登录药师帮")
+        self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
+        if not input_name:
+            logger.error("未找到账号输入框")
+            return False
+        input_name.input(YSB_ACCOUNT["account"])
+        time.sleep(random.uniform(1.5, 2.5))
+
+        input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
+        if not input_pass:
+            logger.error("未找到密码输入框")
+            return False
+        input_pass.input(YSB_ACCOUNT["password"])
+        time.sleep(random.uniform(1.5, 2.5))
+
+        login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
+        if not login_btn:
+            logger.error("未找到登录按钮")
+            return False
+
+        login_btn.click()
+        time.sleep(3)
+        for i in range(3):
+            self._solve_slider_captcha()
+            time.sleep(3)
+
+            if self._is_logged_in():
+                logger.info("登录成功")
+                return True
+
+        logger.error("登录后未检测到目标店铺名,登录可能失败")
+        return False
+
+    def get_snapshot(self, detail_url, row_id):
+        self.driver.get(detail_url, timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+        self._dismiss_popup_before_screenshot()
+
+        try:
+            # jpg_bytes = ele.get_screenshot(as_bytes="jpg")
+            jpg_bytes = self.driver.get_screenshot(as_bytes="jpg")
+            if not jpg_bytes:
+                logger.warning("截图为空 row_id=%s", row_id)
+                return ""
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(row_id))
+        except Exception:
+            logger.exception("截图或 OSS 上传失败 row_id=%s url=%s", row_id, detail_url)
+            return ""
+        if not img_url:
+            logger.warning("OSS 未返回有效地址 row_id=%s", row_id)
+            return ""
+        logger.info("截图上传完成 row_id=%s url=%s", row_id, img_url)
+        time.sleep(random.uniform(2, 3))
+        return img_url
+
+    def _dismiss_popup_before_screenshot(self):
+        """截图前关闭或隐藏营销弹窗,避免遮挡。"""
+        close_locs = [
+            "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
+            "xpath=//button[contains(@class,'close')]",
+            "xpath=//span[text()='×']",
+            "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
+        ]
+        for loc in close_locs:
+            try:
+                btn = self.driver.ele(loc, timeout=0.5)
+                if btn:
+                    btn.click()
+                    time.sleep(0.2)
+            except Exception:
+                pass
+
+        try:
+            # 兜底:隐藏常见高层弹窗和遮罩
+            self.driver.run_js(
+                """
+                const sels = [
+                  '[class*="modal"]',
+                  '[class*="popup"]',
+                  '[class*="dialog"]',
+                  '[class*="mask"]',
+                  '[class*="overlay"]'
+                ];
+                for (const s of sels) {
+                  document.querySelectorAll(s).forEach(el => {
+                    const style = getComputedStyle(el);
+                    const z = parseInt(style.zIndex || '0', 10);
+                    if (z >= 999 && style.display !== 'none') {
+                      el.style.display = 'none';
+                    }
+                  });
+                }
+                document.body.style.overflow = 'auto';
+                """
+            )
+            time.sleep(0.2)
+        except Exception:
+            pass
+
+    def _save_snapshot_url(self, row_id, img_url):
+        """上传成功后回写库,避免下次任务重复拉取同一批。"""
+        if row_id is None or not img_url:
+            return
+        sql = (
+            "UPDATE `retrieve_process_lowprice_product` "
+            "SET `snapshot_url` = %s WHERE `id` = %s AND `platform` = %s"
+        )
+        n = self.db_online.execute(sql, (img_url, row_id, self.platform))
+        if n <= 0:
+            logger.warning("snapshot_url 回写未影响行数 id=%s platform=%s", row_id, self.platform)
+
+    def search(self, data_list):
+        self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        if not self._is_logged_in():
+            if not self.login():
+                return False
+
+        ok, fail = 0, 0
+        for data in data_list:
+            row_id = data.get("id")
+            link_url = data.get("link_url")
+            if not link_url:
+                logger.warning("缺少 link_url,跳过 id=%s", row_id)
+                fail += 1
+                continue
+            img_url = self.get_snapshot(link_url, row_id)
+            if img_url:
+                # self._save_snapshot_url(row_id, img_url)
+                ok += 1
+            else:
+                fail += 1
+            time.sleep(2)
+        logger.info("快照任务结束 成功=%s 失败=%s 总计=%s", ok, fail, len(data_list))
+        return ok > 0
+
+    def run(self):
+        date_str = time.strftime("%Y-%m-%d")
+        sql = """
+              SELECT `id`, `link_url`
+              FROM `retrieve_process_lowprice_product`
+              WHERE `platform` = %s
+                AND `snapshot_url` = ""
+                AND `scrape_date` = %s LIMIT 100 """
+
+        data_list = self.db_online.select_data(sql, (self.platform, date_str))
+        if not data_list:
+            logger.info("当前不需要更新快照")
+            return
+
+        try:
+            self.init_browser()
+            self.search(data_list)
+        except Exception as e:
+            logger.exception("运行异常: %s", e)
+        finally:
+            self._quit_browser()
+
+
+if __name__ == "__main__":
+    YaoShiBangSnapshot().run()