zhuoyuncheng vor 1 Woche
Ursprung
Commit
df6c8a3852
1 geänderte Dateien mit 500 neuen und 0 gelöschten Zeilen
  1. 500 0
      spiders/jd/jd_captcha.py

+ 500 - 0
spiders/jd/jd_captcha.py

@@ -0,0 +1,500 @@
+"""
+京东滑块验证码:打码识别 + 轨迹生成 + 拖动。
+多处复用:from spiders.jd.jd_captcha import handle_jd_slider_captcha, JdCaptchaHandler
+"""
+
+import base64
+import math
+import random
+import time
+from contextlib import contextmanager
+
+import requests
+from PIL import Image
+
+DEFAULT_CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
+DEFAULT_SCREENSHOT_PATH = "./element_screenshot.png"
+JFBYM_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
+
+CAPTCHA_MODAL_XPATH = "xpath=//div[@id='captcha_modal']"
+CAPTCHA_IMG_XPATH = 'xpath://img[@id="main_img"]'
+SLIDER_IMG_XPATH = "xpath://img[@class='move-img']"
+
+
+@contextmanager
+def pause_page_listen(page, clear=True):
+    """处理验证码时暂停网络监听,避免与滑块拖动抢 CDP 资源(auto_crawl 场景)。"""
+    listen = getattr(page, "listen", None)
+    was_listening = bool(listen and getattr(listen, "listening", False))
+    if was_listening:
+        listen.pause(clear=clear)
+    try:
+        yield
+    finally:
+        if was_listening:
+            listen.resume()
+
+
+def simulate(target_x, seed=None):
+    while 1:
+        x_seq = simulate_x(target_x, seed)
+        if len(x_seq) < 50 and target_x > 150:
+            continue
+        t_seq = _generate_t(x_seq, seed)
+        y_seq = _generate_y(x_seq, t_seq, seed)
+        result = []
+        for x, y, t in zip(x_seq, y_seq, t_seq):
+            result.append([x, y, t])
+        return result
+
+
+def _generate_t(x_seq, seed=None):
+    if seed is not None:
+        random.seed(seed + 9999)
+
+    n = len(x_seq)
+    t_seq = [0] * n
+
+    for i in range(1, n):
+        dx = x_seq[i] - x_seq[i - 1]
+        is_pause = dx == 0
+
+        if i == 1:
+            t_seq[i] = random.randint(50, 95)
+        elif is_pause:
+            if random.random() < 0.20:
+                t_seq[i] = random.choice([16, 24, 33, 40, 58, 71, 74, 90, 96, 150, 200, 264])
+            else:
+                t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0]
+        else:
+            r = random.random()
+            if r < 0.90:
+                t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0]
+            elif r < 0.95:
+                t_seq[i] = random.choice([6, 10])
+            else:
+                t_seq[i] = random.choice([16, 24, 25, 33, 40, 58, 71, 74, 90, 96])
+
+    return t_seq
+
+
+def _generate_y(x_seq, t_seq, seed=None):
+    if seed is not None:
+        random.seed(seed + 19999)
+
+    n = len(x_seq)
+    y_seq = [0] * n
+    current_y = 0
+    direction = 0
+    dir_remaining = 0
+    cooldown = 0
+
+    for i in range(1, n):
+        is_abnormal_t = t_seq[i] > 10
+
+        if dir_remaining > 0:
+            dir_remaining -= 1
+            if dir_remaining == 0:
+                direction = 0
+                cooldown = random.randint(4, 8)
+        elif cooldown > 0:
+            cooldown -= 1
+        else:
+            triggered = False
+            if is_abnormal_t and random.random() < 0.40:
+                triggered = True
+            elif random.random() < 0.025:
+                triggered = True
+
+            if triggered:
+                if current_y >= 4:
+                    direction = random.choices([-1, 1], weights=[85, 15])[0]
+                elif current_y <= -4:
+                    direction = random.choices([-1, 1], weights=[15, 85])[0]
+                else:
+                    direction = random.choice([-1, 1])
+                dir_remaining = random.choices([1, 2, 3, 4], weights=[35, 35, 20, 10])[0]
+
+        current_y += direction
+        y_seq[i] = current_y
+
+    return y_seq
+
+
+def simulate_x(target_x, seed=None):
+    if seed is not None:
+        random.seed(seed)
+
+    seq = [0]
+    step = 1
+    x = 0
+    phase = "accelerating"
+
+    accel_threshold = target_x * 0.2
+    cruise_threshold = target_x * 0.5
+
+    while x < target_x:
+        remaining = target_x - x
+
+        if phase == "accelerating":
+            if step >= 9 or x >= accel_threshold:
+                phase = "cruising"
+                continue
+        elif phase == "cruising":
+            if x >= cruise_threshold:
+                phase = "decelerating"
+                continue
+        elif phase == "decelerating":
+            if remaining <= 6:
+                phase = "fine_tuning"
+                continue
+
+        if phase == "accelerating":
+            delta = random.choices([-1, 0, 1, 2, 3], weights=[5, 10, 30, 35, 20])[0]
+            step = _clamp(step + delta, 1, 8)
+        elif phase == "cruising":
+            if step <= 1:
+                delta = random.choices([0, 1, 2], weights=[12, 55, 33])[0]
+            elif step >= 8:
+                delta = random.choices([-2, -1, 0], weights=[20, 50, 30])[0]
+            else:
+                delta = random.choices([-2, -1, 0, 1, 2], weights=[5, 22, 50, 18, 5])[0]
+            step = _clamp(step + delta, 0, 7)
+        elif phase == "decelerating":
+            remaining_ratio = remaining / target_x
+            max_step = max(3, int(2.5 + 5.5 * remaining_ratio / 0.35))
+
+            if remaining_ratio > 0.18:
+                if step <= 1:
+                    delta = random.choices([0, 1, 2], weights=[12, 50, 38])[0]
+                elif step >= max_step:
+                    delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0]
+                else:
+                    delta = random.choices([-2, -1, 0, 1, 2], weights=[8, 22, 46, 19, 5])[0]
+            else:
+                if step <= 0:
+                    delta = random.choices([1, 2], weights=[65, 35])[0]
+                elif step == 1:
+                    delta = random.choices([-1, 0, 1], weights=[18, 52, 30])[0]
+                elif step >= max_step:
+                    delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0]
+                else:
+                    delta = random.choices([-2, -1, 0, 1], weights=[10, 30, 45, 15])[0]
+
+            step = _clamp(step + delta, 0, max_step)
+            if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]:
+                step = 1
+        elif phase == "fine_tuning":
+            if remaining <= 0:
+                break
+            step = random.choices([0, 1, 2], weights=[10, 70, 20])[0]
+            step = min(step, remaining)
+            if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]:
+                step = 1 if remaining >= 1 else 0
+
+        x += step
+        if x > target_x:
+            x = target_x
+        seq.append(x)
+
+    return seq
+
+
+def _clamp(v, lo, hi):
+    return max(lo, min(hi, v))
+
+
+class JdCaptchaHandler:
+    """京东滑块验证码处理器,绑定 DrissionPage 的 ChromiumPage / Tab。"""
+
+    def __init__(self, page, token=None, screenshot_path=None):
+        self.page = page
+        self.token = token or DEFAULT_CAPTCHA_TOKEN
+        self.screenshot_path = screenshot_path or DEFAULT_SCREENSHOT_PATH
+
+    @staticmethod
+    def _safe_float(value, default=0.0):
+        try:
+            return float(value)
+        except (TypeError, ValueError):
+            return default
+
+    def _run_js_safe(self, target, script, default=None):
+        try:
+            if hasattr(target, "run_js"):
+                return target.run_js(script)
+            if hasattr(target, "run_script"):
+                return target.run_script(script)
+        except Exception:
+            return default
+        return default
+
+    def _get_device_pixel_ratio(self):
+        ratio = self._run_js_safe(self.page, "return window.devicePixelRatio || 1;", default=1)
+        ratio = self._safe_float(ratio, 1.0)
+        return ratio if ratio > 0 else 1.0
+
+    def _get_image_width(self, image_path):
+        try:
+            with Image.open(image_path) as img:
+                return float(img.width)
+        except Exception:
+            return 0.0
+
+    def _get_ele_css_width(self, ele):
+        width = self._run_js_safe(ele, "return this.getBoundingClientRect().width || 0;", default=0)
+        width = self._safe_float(width, 0.0)
+        if width > 0:
+            return width
+        try:
+            size = ele.rect.size
+            if isinstance(size, (tuple, list)) and len(size) >= 1:
+                return self._safe_float(size[0], 0.0)
+        except Exception:
+            pass
+        return 0.0
+
+    def _normalize_slider_distance(self, raw_distance, capt_ele, slider_ele, screenshot_path):
+        distance = max(0.0, self._safe_float(raw_distance, 0.0))
+        capt_css_width = self._get_ele_css_width(capt_ele)
+        screenshot_width = self._get_image_width(screenshot_path)
+        natural_width = self._safe_float(
+            self._run_js_safe(capt_ele, "return this.naturalWidth || 0;", default=0),
+            0.0,
+        )
+
+        if capt_css_width > 0 and screenshot_width > 0:
+            return distance * (capt_css_width / screenshot_width)
+        if capt_css_width > 0 and natural_width > 0:
+            return distance * (capt_css_width / natural_width)
+
+        dpr = self._get_device_pixel_ratio()
+        if dpr > 1.0:
+            return distance / dpr
+        return distance
+
+    def generate_human_track(self, distance):
+        try:
+            distance = float(distance)
+        except (TypeError, ValueError):
+            return []
+        if distance <= 0 or not math.isfinite(distance):
+            return []
+
+        tracks = []
+        current = 0
+        mid = distance * 0.7
+        t = 0.2
+        v = 0
+        move_points = []
+
+        while current < mid:
+            a = random.uniform(2, 4)
+            v0 = v
+            v = v0 + a * t
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        while current < distance:
+            a = -random.uniform(0.5, 1.5)
+            v0 = v
+            v = v0 + a * t
+            if v < 0.5:
+                v = 0.5
+            move = v0 * t + 0.5 * a * t * t
+            current += move
+            move_points.append(move)
+
+        total_points = len(move_points)
+        for i, move in enumerate(move_points):
+            y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
+            if i < total_points * 0.3:
+                duration = random.uniform(0.01, 0.03)
+            elif i > total_points * 0.7:
+                duration = random.uniform(0.03, 0.08)
+            else:
+                duration = random.uniform(0.02, 0.05)
+            if random.random() < 0.05:
+                duration += random.uniform(0.05, 0.1)
+            tracks.append((move, y_offset, duration))
+
+        if random.random() < 0.7:
+            tracks.append((-random.randint(1, 3), 0, 0.05))
+        return tracks
+
+    def simulate_slider_drag(self, slider_element, target_distance):
+        if target_distance <= 0:
+            return
+        self.page.actions.move_to(slider_element).hold()
+        for offset_x, offset_y, duration in self.generate_human_track(target_distance):
+            self.page.actions.move(offset_x, offset_y, duration=duration / 1000)
+        self.page.actions.release()
+
+    def verify(self, type_num, image_path=None):
+        """调用云码平台:type_num=1 坐标点选,2 滑块距离。"""
+        image_path = image_path or self.screenshot_path
+        with open(image_path, "rb") as f:
+            image_b64 = base64.b64encode(f.read()).decode()
+
+        if type_num == 1:
+            data = {
+                "token": self.token,
+                "type": "30332",
+                "direction": "top",
+                "click_num": 3,
+                "image": image_b64,
+            }
+        else:
+            data = {
+                "token": self.token,
+                "type": "22222",
+                "image": image_b64,
+            }
+
+        response = requests.post(
+            JFBYM_API_URL,
+            headers={"Content-Type": "application/json"},
+            json=data,
+            timeout=30,
+        ).json()
+        print(response)
+        return response["data"]["data"]
+
+    def handle_slider(
+            self,
+            capt_ele=None,
+            slider_ele=None,
+            drag_offset=1.5,
+            inject_track_js=True,
+    ):
+        """
+        完整滑块流程:截图 -> 打码 -> 注入轨迹 -> 拖动。
+        成功返回 True,失败返回 False。
+        """
+        capt_ele = capt_ele or self.page.ele(CAPTCHA_IMG_XPATH, timeout=2)
+        if not capt_ele:
+            print("未找到验证码背景图")
+            return False
+
+        capt_ele.get_screenshot(self.screenshot_path)
+
+        distance = self.verify(2)
+        try:
+            distance = float(distance)
+        except (TypeError, ValueError):
+            print(f"滑块距离格式异常:{distance}")
+            return False
+        print(f"滑块距离(接口原始值):{distance}")
+
+        slider_ele = slider_ele or self.page.ele(SLIDER_IMG_XPATH, timeout=2)
+        if not slider_ele:
+            print("未找到滑块")
+            return False
+
+        drag_distance = self._normalize_slider_distance(
+            distance,
+            capt_ele=capt_ele,
+            slider_ele=slider_ele,
+            screenshot_path=self.screenshot_path,
+        )
+
+        drag_px = max(0.0, float(drag_distance) - drag_offset)
+
+        if inject_track_js:
+            result = simulate(math.ceil(int(drag_distance)))
+            self.page.run_js("window.xxxll = {};".format(result))
+            time.sleep(3)
+
+        self.simulate_slider_drag(slider_ele, drag_px)
+        return True
+
+    def has_captcha_modal(self):
+        return bool(self.page.ele(CAPTCHA_MODAL_XPATH, timeout=1))
+
+    def has_moveslide_modal(self):
+        capt_cha = "xpath://img[@class='move-img']"
+        return bool(self.page.ele(capt_cha, timeout=1))
+
+    def _wait_for_slider(self, rounds=5):
+        if self.has_moveslide_modal():
+            return True
+        for _ in range(rounds):
+            time.sleep(1)
+            if self.has_moveslide_modal():
+                return True
+        return False
+
+    def handle_slider_until_gone(self, max_attempts=3, wait_after=2, slider_wait_rounds=5, **handle_kwargs):
+        """
+        处理滑块并在每次处理后检查验证码是否仍在页面。
+        验证码消失返回 True;达到 max_attempts 仍存在返回 False。
+        """
+        if not self.has_captcha_modal():
+            return True
+
+        for attempt in range(1, max_attempts + 1):
+            print(f"验证码处理 第 {attempt}/{max_attempts} 次")
+            if not self._wait_for_slider(slider_wait_rounds):
+                print("验证码弹窗在,但滑块元素未出现(可能非滑块类型)")
+                if attempt >= max_attempts:
+                    return False
+                time.sleep(wait_after)
+                continue
+
+            ok = self.handle_slider(**handle_kwargs)
+            if not ok:
+                print("本次滑块处理失败")
+            else:
+                time.sleep(wait_after)
+                if not self.has_captcha_modal():
+                    print("验证码已消失")
+                    return True
+                print("验证码仍在页面")
+
+            if attempt >= max_attempts:
+                break
+            time.sleep(wait_after)
+
+        if self.has_captcha_modal():
+            print(f"验证码处理失败,已尝试 {max_attempts} 次,弹窗仍在")
+            return False
+        return True
+
+
+def handle_jd_slider_captcha(
+    page,
+    token=None,
+    screenshot_path=None,
+    max_attempts=3,
+    wait_after=2,
+    slider_wait_rounds=5,
+    pause_listen=True,
+    pause_listen_clear=False,
+    **kwargs,
+):
+    """
+    便捷入口:处理当前页面的京东滑块验证码,最多重试 max_attempts 次。
+    返回 True:无需验证码或已成功通过;False:处理失败或验证码仍在。
+    pause_listen:auto_crawl 等已开启 listen 的场景建议 True。
+    pause_listen_clear:暂停时是否清空监听队列;采集中应为 False,避免丢掉首屏 wareList。
+    """
+    handler = JdCaptchaHandler(page, token=token, screenshot_path=screenshot_path)
+    if not handler.has_captcha_modal():
+        return True
+
+    if pause_listen:
+        with pause_page_listen(page, clear=pause_listen_clear):
+            return handler.handle_slider_until_gone(
+                max_attempts=max_attempts,
+                wait_after=wait_after,
+                slider_wait_rounds=slider_wait_rounds,
+                **kwargs,
+            )
+
+    return handler.handle_slider_until_gone(
+        max_attempts=max_attempts,
+        wait_after=wait_after,
+        slider_wait_rounds=slider_wait_rounds,
+        **kwargs,
+    )