소스 검색

药师帮列表页快照

zhuoyuncheng 1 주 전
부모
커밋
e6e07a73ba
1개의 변경된 파일667개의 추가작업 그리고 0개의 파일을 삭제
  1. 667 0
      spiders/yaoshibang/ysb_snapshot_list_crawl.py

+ 667 - 0
spiders/yaoshibang/ysb_snapshot_list_crawl.py

@@ -0,0 +1,667 @@
+import base64
+import hashlib
+import json
+import math
+import random
+import signal
+import socket
+import sys
+import time
+import zlib
+from pathlib import Path
+from urllib.parse import quote
+import requests
+from Crypto.Cipher import AES
+from commons.conn_mysql import MySQLPoolOnline
+from DrissionPage import ChromiumPage, ChromiumOptions
+from commons.Logger import logger
+from oss_upload.oss_upload import AliyunOSSUploader
+from commons.config import YSB_ACCOUNT
+from pipelines.drug_pipelines import DrugPipeline
+from area_info.city_name_to_id import get_city
+
+CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
+CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
+
+SLIDER_OFFSET_FIX = 10
+LISTEN_CLEAR_ROUNDS = 3
+LISTEN_CLEAR_TIMEOUT = 0.3
+
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+PROJECT_ROOT = Path(__file__).resolve().parents[2]
+YSB_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoshibang"
+BROWSER_PROFILE_SUBDIR = "chrome_profile"
+
+
+def pkcs7_unpad(data):
+    if not data:
+        raise ValueError("Empty data for PKCS7 unpad")
+    pad_len = data[-1]
+    if pad_len < 1 or pad_len > 16:
+        raise ValueError("Invalid PKCS7 padding length")
+    if data[-pad_len:] != bytes([pad_len]) * pad_len:
+        raise ValueError("Invalid PKCS7 padding bytes")
+    return data[:-pad_len]
+
+
+def derive_ysb_key():
+    base = "BhCLxFfFhd12K4qRGPfy"
+    md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
+    return md5_hex[:16].upper().encode("utf-8")
+
+
+def decrypt_ysb_payload(cipher_text_b64):
+    """解密药师帮列表接口 data.o 字段,返回 JSON 对象。"""
+    key = derive_ysb_key()
+    cipher_bytes = base64.b64decode(cipher_text_b64)
+    cipher = AES.new(key, AES.MODE_ECB)
+    decrypted = cipher.decrypt(cipher_bytes)
+    unpadded = pkcs7_unpad(decrypted)
+    json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
+    return json.loads(json_bytes.decode("utf-8"))
+
+
+class YaoShiBangSnapshot:
+    def __init__(self, drug_dict=None):
+        self.driver = None
+
+        self.db = MySQLPoolOnline()
+        self.ip = None
+        self.login_username = None
+        self.login_password = None
+        self.platform = 5
+        self.pipeline = DrugPipeline("ysb")
+        self.task_dict = drug_dict or {}
+        self.ossuploader = AliyunOSSUploader()
+        self.start_page = 1
+        self.end_page = 1
+        self.account_name = YSB_ACCOUNT.get("username", "ysb_default")
+        self._register_signal_handler()
+        if self.task_dict:
+            self.get_product_data()
+        self.success = True
+        self.is_no_prodcut = 0
+        self.is_product_count = 0
+        self._listen_started = False
+
+    def get_product_data(self):
+        self.task_id = self.task_dict["id"]
+        self.company_id = self.task_dict["company_id"]
+        self.product = self.task_dict["product_name"]
+        self.product_desc = self.task_dict.get("product_specs", "")
+        self.brand = self.task_dict.get("product_brand", "")
+        self.product_keyword = self.task_dict.get("product_keyword", "")
+        self.collect_task_id = self.task_dict.get("collect_task_id", "")
+        self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
+        self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
+        self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
+        self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
+        self.account_id = self.task_dict.get("collect_equipment_account_id", "")
+        self.collect_region_id = self.task_dict.get("collect_region_id", "")
+        self.collect_round = self.task_dict.get("collect_round", 1)
+        self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
+        self.end_page = max(
+            self.start_page,
+            self._parse_page(self.task_dict.get("end_page"), self.start_page),
+        )
+
+    @staticmethod
+    def _parse_page(value, default=1):
+        try:
+            page = int(value)
+            return page if page >= 1 else default
+        except (TypeError, ValueError):
+            return default
+
+    def _register_signal_handler(self):
+        def handler(signum, frame):
+            logger.info("收到退出信号,正在关闭浏览器...")
+            self._quit_browser()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, handler)
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, handler)
+
+    def _quit_browser(self):
+        if self.driver:
+            try:
+                self.driver.quit()
+            except Exception:
+                pass
+            self.driver = None
+
+    @staticmethod
+    def _get_free_port():
+        """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def _resolve_browser_profile_dir(self):
+        """
+        浏览器数据固定落在 <项目根>/spiders/yaoshibang/ 下。
+        优先 chrome_profile/<账号>;若旧版目录已有登录态则继续沿用。
+        """
+        preferred = YSB_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name
+        legacy_flat = YSB_SPIDER_DIR / self.account_name
+        legacy_nested = YSB_SPIDER_DIR / "spiders" / "yaoshibang" / self.account_name
+
+        for candidate in (preferred, legacy_flat, legacy_nested):
+            if (candidate / "Default").is_dir() or (candidate / "Local State").is_file():
+                logger.info("使用已有浏览器配置目录: %s", candidate)
+                return candidate
+
+        preferred.parent.mkdir(parents=True, exist_ok=True)
+        logger.info("新建浏览器配置目录: %s", preferred)
+        return preferred
+
+    def init_browser(self):
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        profile_dir = self._resolve_browser_profile_dir()
+        profile_dir.mkdir(parents=True, exist_ok=True)
+        co.set_user_data_path(str(profile_dir))
+        logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve())
+
+        co.set_local_port(debug_port)
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_argument("--remote-debugging-address=127.0.0.1")
+        # co.set_argument("--disable-blink-features=AutomationControlled")
+        co.set_argument("--disable-dev-shm-usage")
+        co.set_argument("--start-maximized")
+        co.set_argument("--no-first-run")  # 避免首次运行弹窗
+        co.set_argument("--no-default-browser-check")  # 避免默认浏览器检查
+        self.driver = ChromiumPage(co)
+
+    def _solve_slider_captcha(self):
+        """检测并处理易盾滑块验证码,成功返回 True。"""
+        self.driver.wait.doc_loaded()
+        time.sleep(2)
+
+        yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
+        if not yidun:
+            return True
+
+        logger.info("检测到滑块验证码,开始处理")
+        jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
+
+        distance = self._call_captcha_api(jpg_bytes)
+        if distance is None:
+            logger.error("验证码识别失败")
+            return False
+
+        logger.info("滑块距离: %s", distance)
+        slider = self.driver.ele(
+            "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
+        )
+        if not slider:
+            logger.error("未找到滑块元素")
+            return False
+
+        try:
+            drag_distance = float(distance) + SLIDER_OFFSET_FIX
+        except (TypeError, ValueError):
+            logger.error("滑块距离非数字: %r", distance)
+            return False
+
+        if not math.isfinite(drag_distance) or drag_distance <= 0:
+            logger.error("滑块距离无效: %s", drag_distance)
+            return False
+        self._simulate_slider_drag(slider, drag_distance - 5)
+        time.sleep(3)
+        return True
+
+    def _call_captcha_api(self, image_bytes):
+        """调用云码平台识别滑块距离,失败返回 None。"""
+        try:
+            b64 = base64.b64encode(image_bytes).decode()
+            resp = requests.post(
+                CAPTCHA_API_URL,
+                json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
+                headers={"Content-Type": "application/json"},
+                timeout=15,
+            ).json()
+            logger.info("验证码 API 返回: %s", resp)
+            if not isinstance(resp, dict):
+                return None
+            data = resp.get("data")
+            if isinstance(data, dict):
+                dist = data.get("data")
+            else:
+                dist = data
+            if dist is None:
+                logger.error("验证码 API 未返回距离字段: %s", resp)
+                return None
+            try:
+                d = float(dist)
+            except (TypeError, ValueError):
+                logger.error("验证码距离无法解析为数字: %r", dist)
+                return None
+            if not math.isfinite(d):
+                logger.error("验证码距离非有限数值: %r", dist)
+                return None
+            return d
+        except Exception as e:
+            logger.exception("验证码 API 调用失败: %s", e)
+            return None
+
+    @staticmethod
+    def _generate_human_track(distance):
+        try:
+            distance = float(distance)
+        except (TypeError, ValueError):
+            return []
+        if distance <= 0 or not math.isfinite(distance):
+            return []
+        tracks = []
+        current = 0
+        mid = distance * 0.7
+        t = 0.2
+        v = 0
+        move_points = []
+
+        while current < mid:
+            a = random.uniform(2, 4)
+            v0 = v
+            v = v0 + a * t
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        while current < distance:
+            a = -random.uniform(0.5, 1.5)
+            v0 = v
+            v = v0 + a * t
+            if v < 0.5:
+                v = 0.5
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        total_points = len(move_points)
+        for i, move in enumerate(move_points):
+            y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
+
+            if i < total_points * 0.3:
+                duration = random.uniform(0.01, 0.03)
+            elif i > total_points * 0.7:
+                duration = random.uniform(0.03, 0.08)
+            else:
+                duration = random.uniform(0.02, 0.05)
+
+            if random.random() < 0.05:
+                duration += random.uniform(0.05, 0.1)
+
+            tracks.append((move, y_offset, duration))
+
+        if random.random() < 0.7:
+            tracks.append((-random.randint(1, 3), 0, 0.05))
+
+        return tracks
+
+    def _simulate_slider_drag(self, slider_element, target_distance):
+        if target_distance <= 0:
+            logger.warning("滑块目标距离无效: %s", target_distance)
+            return
+        self.driver.actions.move_to(slider_element).hold()
+        for offset_x, offset_y, duration in self._generate_human_track(target_distance):
+            self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
+        self.driver.actions.release()
+
+    def _is_logged_in(self):
+        # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
+        title = self.driver.ele(
+            "xpath=//span[@class='logout']",
+            timeout=5,
+        )
+        return bool(title)
+
+    def _start_listen(self):
+        """监听列表接口 getWholesaleList。"""
+        target = "wholesale-drug/sales/getWholesaleList/v4270"
+        if self._listen_started and getattr(self.driver.listen, "listening", False):
+            self.driver.listen.stop()
+        self.driver.listen.start(target)
+        self._listen_started = True
+        logger.info("已启动监听: %s", target)
+
+    def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT):
+        if not self.driver:
+            return
+        try:
+            for _ in range(rounds):
+                resps = list(self.driver.listen.steps(timeout=timeout))
+                if not resps:
+                    break
+        except Exception as e:
+            logger.debug("清空监听缓冲失败: %s", e)
+
+    @staticmethod
+    def _parse_listen_body(resp):
+        body = resp.response.body
+        if isinstance(body, str):
+            body = json.loads(body)
+        if not isinstance(body, dict):
+            return None
+        return body
+
+    @staticmethod
+    def _extract_encrypted_o(body):
+        data_block = (body or {}).get("data") or {}
+        if isinstance(data_block, dict):
+            return data_block.get("o")
+        return None
+
+    def _consume_list_listen(self, page, timeout=10):
+        """消费列表接口响应,返回解密后的 json_data。"""
+        for resp in self.driver.listen.steps(timeout=timeout):
+            try:
+                body = self._parse_listen_body(resp)
+                if not body:
+                    continue
+                message = str(body.get("message", ""))
+                if message and "成功" not in message:
+                    logger.warning("第%s页 message=%s", page, message)
+                    continue
+                encrypted_o = self._extract_encrypted_o(body)
+                if not encrypted_o:
+                    continue
+                json_data = decrypt_ysb_payload(encrypted_o)
+                logger.info("第%s页列表解密成功 wholesales=%s", page, len(json_data.get("wholesales", [])))
+                return json_data
+            except Exception as e:
+                logger.warning("第%s页解析列表监听失败: %s", page, e)
+        return None
+
+    def login(self):
+        logger.info("开始登录药师帮")
+        self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
+        if not input_name:
+            logger.error("未找到账号输入框")
+            return False
+        input_name.input(YSB_ACCOUNT["username"])
+        time.sleep(random.uniform(1.5, 2.5))
+
+        input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
+        if not input_pass:
+            logger.error("未找到密码输入框")
+            return False
+        input_pass.input(YSB_ACCOUNT["password"])
+        time.sleep(random.uniform(1.5, 2.5))
+
+        login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
+        if not login_btn:
+            logger.error("未找到登录按钮")
+            return False
+
+        login_btn.click()
+        time.sleep(3)
+        for i in range(3):
+            self._solve_slider_captcha()
+            time.sleep(3)
+
+            if self._is_logged_in():
+                logger.info("登录成功")
+                return True
+
+        logger.error("登录后未检测到目标店铺名,登录可能失败")
+        return False
+
+    def _take_snapshot(self, upload_key, image_ele):
+        """在当前页面截图并上传,不再重复跳转。"""
+        time.sleep(1)
+        self._dismiss_popup_before_screenshot()
+        try:
+            jpg_bytes = image_ele.get_screenshot(as_bytes="jpg")
+            if not jpg_bytes:
+                logger.warning("截图为空 upload_key=%s", upload_key)
+                return ""
+            img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
+        except Exception:
+            logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
+            return ""
+        if not img_url:
+            logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
+            return ""
+        logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
+        time.sleep(random.uniform(1, 2))
+        return img_url
+
+    def _human_click(self, element):
+        """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。"""
+        if not element:
+            return False
+        try:
+            time.sleep(random.uniform(0.8, 2.0))
+            try:
+                self.driver.run_js(
+                    "arguments[0].scrollIntoView({block:'center',behavior:'instant'});",
+                    element,
+                )
+            except Exception:
+                pass
+            time.sleep(random.uniform(0.3, 1))
+            self.driver.run_js("arguments[0].click();", element)
+            return True
+        except Exception as e:
+            logger.warning("点击失败: %s", e)
+            try:
+                element.click()
+                return True
+            except Exception:
+                return False
+
+    def _dismiss_popup_before_screenshot(self):
+        """截图前关闭或隐藏营销弹窗,避免遮挡。"""
+        close_locs = [
+            "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
+            "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
+            "xpath=//button[contains(@class,'close')]",
+            "xpath=//span[text()='×']",
+            "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
+        ]
+        for loc in close_locs:
+            try:
+                btn = self.driver.ele(loc, timeout=0.5)
+                if btn:
+                    btn.click()
+                    time.sleep(0.2)
+            except Exception:
+                pass
+
+        try:
+            # 兜底:隐藏常见高层弹窗和遮罩
+            self.driver.run_js(
+                """
+                const sels = [
+                  '[class*="modal"]',
+                  '[class*="popup"]',
+                  '[class*="dialog"]',
+                  '[class*="mask"]',
+                  '[class*="overlay"]'
+                ];
+                for (const s of sels) {
+                  document.querySelectorAll(s).forEach(el => {
+                    const style = getComputedStyle(el);
+                    const z = parseInt(style.zIndex || '0', 10);
+                    if (z >= 999 && style.display !== 'none') {
+                      el.style.display = 'none';
+                    }
+                  });
+                }
+                document.body.style.overflow = 'auto';
+                """
+            )
+            time.sleep(0.2)
+        except Exception:
+            pass
+
+    def to_product(self, item):
+
+        now = time.strftime("%Y-%m-%d %H:%M:%S")
+        item_id = item.get("wholesaleid", "")
+        provider_id = item.get("providerId", "")
+
+        city_id = province_id = city = province = ""
+
+        city_str = item.get("warehouseCity", "")
+        if city_str:
+            city_id, province_id, city, province = get_city(city_str)
+        price = item.get("disPrice", "")
+
+        if not price:
+            price = item.get("minprice", "")
+        if not price:
+            price = item.get("price", "")
+
+        shop_name = item.get("provider_name", "")
+        if not shop_name:
+            shop_name = item.get("abbreviation", "")
+
+        product = {
+            "platform": self.platform,
+            "item_id": item_id,
+            "enterprise_id": self.company_id,
+            "product_name": item.get("drugname", ""),
+            "spec": item.get("specification", ""),
+            "one_price": '',
+            "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
+            "shop_name": shop_name,
+            "anonymous_store_name": "",
+            "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
+            "city_name": city,
+            "city_id": city_id,
+            "province_name": province,
+            "province_id": province_id,
+            "area_info": "",
+            "factory_name": item.get("manufacturer", ""),
+            "scrape_date": time.strftime("%Y-%m-%d"),
+            "price": price,
+            "sales": "",
+            "stock_count": item.get("stockAvailable", ""),
+            "snapshot_url": "",
+            "approval_num": "",
+            "produced_time": item.get("prodDate", ""),
+            "deadline": item.get("valid_date", ""),
+            "update_time": now,
+            "insert_time": now,
+            "number": 1,
+            "product_brand": self.brand or "",
+            "collect_task_id": self.collect_task_id,
+            "search_name": self.product,
+            "company_name": "",
+            "collect_config_info": json.dumps(
+                {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
+                 "sampling_end_time": self.sampling_end_time}),
+            "account_id": self.account_id,
+            "collect_region_id": self.collect_region_id,
+            "collect_round": self.collect_round,
+            "is_sold_out": 0
+        }
+        return product
+
+    def search(self):
+        self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
+        self.driver.wait.doc_loaded(timeout=10)
+        time.sleep(2)
+
+        if not self._is_logged_in():
+            if not self.login():
+                return False
+        keyword = self.product
+        if self.brand:
+            keyword = (self.brand + " " + self.product).strip()
+
+        if self.product_desc:
+            keyword = (keyword + " " + self.product_desc).strip()
+
+        search_key = quote(keyword)
+        page = self.start_page
+        url = (
+            f"https://dian.ysbang.cn/#/indexContent?lastClick=-1&page={page}"
+            f"&pagesize=60&classify_id=&searchkey={search_key}"
+        )
+        self._start_listen()
+        self.driver.get(url)
+        for page in range(1, 100):
+            self.driver.wait.doc_loaded(timeout=10)
+            time.sleep(1.5)
+
+            json_data = self._consume_list_listen(page)
+
+            if not json_data:
+                logger.warning("第%s页未收到列表监听数据", page)
+                break
+
+            wholesales = json_data.get("wholesales", [])
+            if not wholesales:
+                logger.info("第%s页无数据,停止", page)
+                break
+
+            list_items = wholesales[0:5]
+            goods_wrappers = self.driver.eles(
+                "xpath=//div[@class='drugListPage']//div[@class='drug-list']/div[contains(@class,'all-goods-wrapper')]"
+            )
+            for list_idx, item in enumerate(list_items, start=1):
+                item_id = item.get("wholesaleid", "")
+                logger.info(
+                    "第%s页 列表第%s/%s条 wholesaleid=%s",
+                    page,
+                    list_idx,
+                    len(list_items),
+                    item_id,
+                )
+                if not item_id:
+                    continue
+
+                detail_url = (
+                    f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1"
+                )
+                product = self.to_product(item)
+                title = product.get("product_name", "")
+                if self.brand not in title:
+                    self.is_product_count += 1
+                if self.product not in title:
+                    self.is_product_count += 1
+                    continue
+
+                if self.product in title and self.brand in title:
+                    self.is_product_count = 0
+                if self.is_product_count >= 20:
+                    return
+
+                dom_idx = list_idx - 1
+                image_ele = goods_wrappers[dom_idx]
+                upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest()
+                product["snapshot_url"] = self._take_snapshot(upload_key, image_ele)
+
+                try:
+                    self.pipeline.storge_data(product)
+                    logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+                except Exception as e:
+                    logger.exception("写入数据库失败: %s", e)
+
+            # 检测下一页
+            self.clear_listen_buffer()
+            next_button = self.driver.ele("xpath=//div[@class='condition']//div[@class='btn next']")
+            if not next_button:
+                logger.info("没有下一页,停止")
+                break
+            else:
+                self._human_click(next_button)
+
+    def run(self):
+        try:
+            self.init_browser()
+            self.search()
+        except Exception as e:
+            logger.exception("运行异常: %s", e)
+        finally:
+            self._quit_browser()
+        return self.pipeline.crawl_count, self.success