""" 京东滑块验证码:打码识别 + 轨迹生成 + 拖动。 多处复用:from spiders.jd.jd_captcha import handle_jd_slider_captcha, JdCaptchaHandler """ import base64 import math import random import time from contextlib import contextmanager import requests from PIL import Image DEFAULT_CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" DEFAULT_SCREENSHOT_PATH = "./element_screenshot.png" JFBYM_API_URL = "http://api.jfbym.com/api/YmServer/customApi" CAPTCHA_MODAL_XPATH = "xpath=//div[@id='captcha_modal']" CAPTCHA_IMG_XPATH = 'xpath://img[@id="main_img"]' SLIDER_IMG_XPATH = "xpath://img[@class='move-img']" @contextmanager def pause_page_listen(page, clear=True): """处理验证码时暂停网络监听,避免与滑块拖动抢 CDP 资源(auto_crawl 场景)。""" listen = getattr(page, "listen", None) was_listening = bool(listen and getattr(listen, "listening", False)) if was_listening: listen.pause(clear=clear) try: yield finally: if was_listening: listen.resume() def simulate(target_x, seed=None): while 1: x_seq = simulate_x(target_x, seed) if len(x_seq) < 50 and target_x > 150: continue t_seq = _generate_t(x_seq, seed) y_seq = _generate_y(x_seq, t_seq, seed) result = [] for x, y, t in zip(x_seq, y_seq, t_seq): result.append([x, y, t]) return result def _generate_t(x_seq, seed=None): if seed is not None: random.seed(seed + 9999) n = len(x_seq) t_seq = [0] * n for i in range(1, n): dx = x_seq[i] - x_seq[i - 1] is_pause = dx == 0 if i == 1: t_seq[i] = random.randint(50, 95) elif is_pause: if random.random() < 0.20: t_seq[i] = random.choice([16, 24, 33, 40, 58, 71, 74, 90, 96, 150, 200, 264]) else: t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0] else: r = random.random() if r < 0.90: t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0] elif r < 0.95: t_seq[i] = random.choice([6, 10]) else: t_seq[i] = random.choice([16, 24, 25, 33, 40, 58, 71, 74, 90, 96]) return t_seq def _generate_y(x_seq, t_seq, seed=None): if seed is not None: random.seed(seed + 19999) n = len(x_seq) y_seq = [0] * n current_y = 0 direction = 0 dir_remaining = 0 cooldown = 0 for i in range(1, n): is_abnormal_t = t_seq[i] > 10 if dir_remaining > 0: dir_remaining -= 1 if dir_remaining == 0: direction = 0 cooldown = random.randint(4, 8) elif cooldown > 0: cooldown -= 1 else: triggered = False if is_abnormal_t and random.random() < 0.40: triggered = True elif random.random() < 0.025: triggered = True if triggered: if current_y >= 4: direction = random.choices([-1, 1], weights=[85, 15])[0] elif current_y <= -4: direction = random.choices([-1, 1], weights=[15, 85])[0] else: direction = random.choice([-1, 1]) dir_remaining = random.choices([1, 2, 3, 4], weights=[35, 35, 20, 10])[0] current_y += direction y_seq[i] = current_y return y_seq def simulate_x(target_x, seed=None): if seed is not None: random.seed(seed) seq = [0] step = 1 x = 0 phase = "accelerating" accel_threshold = target_x * 0.2 cruise_threshold = target_x * 0.5 while x < target_x: remaining = target_x - x if phase == "accelerating": if step >= 9 or x >= accel_threshold: phase = "cruising" continue elif phase == "cruising": if x >= cruise_threshold: phase = "decelerating" continue elif phase == "decelerating": if remaining <= 6: phase = "fine_tuning" continue if phase == "accelerating": delta = random.choices([-1, 0, 1, 2, 3], weights=[5, 10, 30, 35, 20])[0] step = _clamp(step + delta, 1, 8) elif phase == "cruising": if step <= 1: delta = random.choices([0, 1, 2], weights=[12, 55, 33])[0] elif step >= 8: delta = random.choices([-2, -1, 0], weights=[20, 50, 30])[0] else: delta = random.choices([-2, -1, 0, 1, 2], weights=[5, 22, 50, 18, 5])[0] step = _clamp(step + delta, 0, 7) elif phase == "decelerating": remaining_ratio = remaining / target_x max_step = max(3, int(2.5 + 5.5 * remaining_ratio / 0.35)) if remaining_ratio > 0.18: if step <= 1: delta = random.choices([0, 1, 2], weights=[12, 50, 38])[0] elif step >= max_step: delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0] else: delta = random.choices([-2, -1, 0, 1, 2], weights=[8, 22, 46, 19, 5])[0] else: if step <= 0: delta = random.choices([1, 2], weights=[65, 35])[0] elif step == 1: delta = random.choices([-1, 0, 1], weights=[18, 52, 30])[0] elif step >= max_step: delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0] else: delta = random.choices([-2, -1, 0, 1], weights=[10, 30, 45, 15])[0] step = _clamp(step + delta, 0, max_step) if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]: step = 1 elif phase == "fine_tuning": if remaining <= 0: break step = random.choices([0, 1, 2], weights=[10, 70, 20])[0] step = min(step, remaining) if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]: step = 1 if remaining >= 1 else 0 x += step if x > target_x: x = target_x seq.append(x) return seq def _clamp(v, lo, hi): return max(lo, min(hi, v)) class JdCaptchaHandler: """京东滑块验证码处理器,绑定 DrissionPage 的 ChromiumPage / Tab。""" def __init__(self, page, token=None, screenshot_path=None): self.page = page self.token = token or DEFAULT_CAPTCHA_TOKEN self.screenshot_path = screenshot_path or DEFAULT_SCREENSHOT_PATH @staticmethod def _safe_float(value, default=0.0): try: return float(value) except (TypeError, ValueError): return default def _run_js_safe(self, target, script, default=None): try: if hasattr(target, "run_js"): return target.run_js(script) if hasattr(target, "run_script"): return target.run_script(script) except Exception: return default return default def _get_device_pixel_ratio(self): ratio = self._run_js_safe(self.page, "return window.devicePixelRatio || 1;", default=1) ratio = self._safe_float(ratio, 1.0) return ratio if ratio > 0 else 1.0 def _get_image_width(self, image_path): try: with Image.open(image_path) as img: return float(img.width) except Exception: return 0.0 def _get_ele_css_width(self, ele): width = self._run_js_safe(ele, "return this.getBoundingClientRect().width || 0;", default=0) width = self._safe_float(width, 0.0) if width > 0: return width try: size = ele.rect.size if isinstance(size, (tuple, list)) and len(size) >= 1: return self._safe_float(size[0], 0.0) except Exception: pass return 0.0 def _normalize_slider_distance(self, raw_distance, capt_ele, slider_ele, screenshot_path): distance = max(0.0, self._safe_float(raw_distance, 0.0)) capt_css_width = self._get_ele_css_width(capt_ele) screenshot_width = self._get_image_width(screenshot_path) natural_width = self._safe_float( self._run_js_safe(capt_ele, "return this.naturalWidth || 0;", default=0), 0.0, ) if capt_css_width > 0 and screenshot_width > 0: return distance * (capt_css_width / screenshot_width) if capt_css_width > 0 and natural_width > 0: return distance * (capt_css_width / natural_width) dpr = self._get_device_pixel_ratio() if dpr > 1.0: return distance / dpr return distance def generate_human_track(self, distance): try: distance = float(distance) except (TypeError, ValueError): return [] if distance <= 0 or not math.isfinite(distance): return [] tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def simulate_slider_drag(self, slider_element, target_distance): if target_distance <= 0: return self.page.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self.generate_human_track(target_distance): self.page.actions.move(offset_x, offset_y, duration=duration / 1000) self.page.actions.release() def verify(self, type_num, image_path=None): """调用云码平台:type_num=1 坐标点选,2 滑块距离。""" image_path = image_path or self.screenshot_path with open(image_path, "rb") as f: image_b64 = base64.b64encode(f.read()).decode() if type_num == 1: data = { "token": self.token, "type": "30332", "direction": "top", "click_num": 3, "image": image_b64, } else: data = { "token": self.token, "type": "22222", "image": image_b64, } response = requests.post( JFBYM_API_URL, headers={"Content-Type": "application/json"}, json=data, timeout=30, ).json() print(response) return response["data"]["data"] def handle_slider( self, capt_ele=None, slider_ele=None, drag_offset=1.5, inject_track_js=True, ): """ 完整滑块流程:截图 -> 打码 -> 注入轨迹 -> 拖动。 成功返回 True,失败返回 False。 """ capt_ele = capt_ele or self.page.ele(CAPTCHA_IMG_XPATH, timeout=2) if not capt_ele: print("未找到验证码背景图") return False capt_ele.get_screenshot(self.screenshot_path) distance = self.verify(2) try: distance = float(distance) except (TypeError, ValueError): print(f"滑块距离格式异常:{distance}") return False print(f"滑块距离(接口原始值):{distance}") slider_ele = slider_ele or self.page.ele(SLIDER_IMG_XPATH, timeout=2) if not slider_ele: print("未找到滑块") return False drag_distance = self._normalize_slider_distance( distance, capt_ele=capt_ele, slider_ele=slider_ele, screenshot_path=self.screenshot_path, ) drag_px = max(0.0, float(drag_distance) - drag_offset) if inject_track_js: result = simulate(math.ceil(int(drag_distance))) self.page.run_js("window.xxxll = {};".format(result)) time.sleep(3) self.simulate_slider_drag(slider_ele, drag_px) return True def has_captcha_modal(self): return bool(self.page.ele(CAPTCHA_MODAL_XPATH, timeout=1)) def has_moveslide_modal(self): capt_cha = "xpath://img[@class='move-img']" return bool(self.page.ele(capt_cha, timeout=1)) def _wait_for_slider(self, rounds=5): if self.has_moveslide_modal(): return True for _ in range(rounds): time.sleep(1) if self.has_moveslide_modal(): return True return False def handle_slider_until_gone(self, max_attempts=3, wait_after=2, slider_wait_rounds=5, **handle_kwargs): """ 处理滑块并在每次处理后检查验证码是否仍在页面。 验证码消失返回 True;达到 max_attempts 仍存在返回 False。 """ if not self.has_captcha_modal(): return True for attempt in range(1, max_attempts + 1): print(f"验证码处理 第 {attempt}/{max_attempts} 次") if not self._wait_for_slider(slider_wait_rounds): print("验证码弹窗在,但滑块元素未出现(可能非滑块类型)") if attempt >= max_attempts: return False time.sleep(wait_after) continue ok = self.handle_slider(**handle_kwargs) if not ok: print("本次滑块处理失败") else: time.sleep(wait_after) if not self.has_captcha_modal(): print("验证码已消失") return True print("验证码仍在页面") if attempt >= max_attempts: break time.sleep(wait_after) if self.has_captcha_modal(): print(f"验证码处理失败,已尝试 {max_attempts} 次,弹窗仍在") return False return True def handle_jd_slider_captcha( page, token=None, screenshot_path=None, max_attempts=3, wait_after=2, slider_wait_rounds=5, pause_listen=True, pause_listen_clear=False, **kwargs, ): """ 便捷入口:处理当前页面的京东滑块验证码,最多重试 max_attempts 次。 返回 True:无需验证码或已成功通过;False:处理失败或验证码仍在。 pause_listen:auto_crawl 等已开启 listen 的场景建议 True。 pause_listen_clear:暂停时是否清空监听队列;采集中应为 False,避免丢掉首屏 wareList。 """ handler = JdCaptchaHandler(page, token=token, screenshot_path=screenshot_path) if not handler.has_captcha_modal(): return True if pause_listen: with pause_page_listen(page, clear=pause_listen_clear): return handler.handle_slider_until_gone( max_attempts=max_attempts, wait_after=wait_after, slider_wait_rounds=slider_wait_rounds, **kwargs, ) return handler.handle_slider_until_gone( max_attempts=max_attempts, wait_after=wait_after, slider_wait_rounds=slider_wait_rounds, **kwargs, )