| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- """
- 京东滑块验证码:打码识别 + 轨迹生成 + 拖动。
- 多处复用:from spiders.jd.jd_captcha import handle_jd_slider_captcha, JdCaptchaHandler
- """
- import base64
- import math
- import random
- import time
- from contextlib import contextmanager
- import requests
- from PIL import Image
- DEFAULT_CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- DEFAULT_SCREENSHOT_PATH = "./element_screenshot.png"
- JFBYM_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
- CAPTCHA_MODAL_XPATH = "xpath=//div[@id='captcha_modal']"
- CAPTCHA_IMG_XPATH = 'xpath://img[@id="main_img"]'
- SLIDER_IMG_XPATH = "xpath://img[@class='move-img']"
- @contextmanager
- def pause_page_listen(page, clear=True):
- """处理验证码时暂停网络监听,避免与滑块拖动抢 CDP 资源(auto_crawl 场景)。"""
- listen = getattr(page, "listen", None)
- was_listening = bool(listen and getattr(listen, "listening", False))
- if was_listening:
- listen.pause(clear=clear)
- try:
- yield
- finally:
- if was_listening:
- listen.resume()
- def simulate(target_x, seed=None):
- while 1:
- x_seq = simulate_x(target_x, seed)
- if len(x_seq) < 50 and target_x > 150:
- continue
- t_seq = _generate_t(x_seq, seed)
- y_seq = _generate_y(x_seq, t_seq, seed)
- result = []
- for x, y, t in zip(x_seq, y_seq, t_seq):
- result.append([x, y, t])
- return result
- def _generate_t(x_seq, seed=None):
- if seed is not None:
- random.seed(seed + 9999)
- n = len(x_seq)
- t_seq = [0] * n
- for i in range(1, n):
- dx = x_seq[i] - x_seq[i - 1]
- is_pause = dx == 0
- if i == 1:
- t_seq[i] = random.randint(50, 95)
- elif is_pause:
- if random.random() < 0.20:
- t_seq[i] = random.choice([16, 24, 33, 40, 58, 71, 74, 90, 96, 150, 200, 264])
- else:
- t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0]
- else:
- r = random.random()
- if r < 0.90:
- t_seq[i] = random.choices([6, 7, 8, 9, 10], weights=[3, 25, 45, 22, 5])[0]
- elif r < 0.95:
- t_seq[i] = random.choice([6, 10])
- else:
- t_seq[i] = random.choice([16, 24, 25, 33, 40, 58, 71, 74, 90, 96])
- return t_seq
- def _generate_y(x_seq, t_seq, seed=None):
- if seed is not None:
- random.seed(seed + 19999)
- n = len(x_seq)
- y_seq = [0] * n
- current_y = 0
- direction = 0
- dir_remaining = 0
- cooldown = 0
- for i in range(1, n):
- is_abnormal_t = t_seq[i] > 10
- if dir_remaining > 0:
- dir_remaining -= 1
- if dir_remaining == 0:
- direction = 0
- cooldown = random.randint(4, 8)
- elif cooldown > 0:
- cooldown -= 1
- else:
- triggered = False
- if is_abnormal_t and random.random() < 0.40:
- triggered = True
- elif random.random() < 0.025:
- triggered = True
- if triggered:
- if current_y >= 4:
- direction = random.choices([-1, 1], weights=[85, 15])[0]
- elif current_y <= -4:
- direction = random.choices([-1, 1], weights=[15, 85])[0]
- else:
- direction = random.choice([-1, 1])
- dir_remaining = random.choices([1, 2, 3, 4], weights=[35, 35, 20, 10])[0]
- current_y += direction
- y_seq[i] = current_y
- return y_seq
- def simulate_x(target_x, seed=None):
- if seed is not None:
- random.seed(seed)
- seq = [0]
- step = 1
- x = 0
- phase = "accelerating"
- accel_threshold = target_x * 0.2
- cruise_threshold = target_x * 0.5
- while x < target_x:
- remaining = target_x - x
- if phase == "accelerating":
- if step >= 9 or x >= accel_threshold:
- phase = "cruising"
- continue
- elif phase == "cruising":
- if x >= cruise_threshold:
- phase = "decelerating"
- continue
- elif phase == "decelerating":
- if remaining <= 6:
- phase = "fine_tuning"
- continue
- if phase == "accelerating":
- delta = random.choices([-1, 0, 1, 2, 3], weights=[5, 10, 30, 35, 20])[0]
- step = _clamp(step + delta, 1, 8)
- elif phase == "cruising":
- if step <= 1:
- delta = random.choices([0, 1, 2], weights=[12, 55, 33])[0]
- elif step >= 8:
- delta = random.choices([-2, -1, 0], weights=[20, 50, 30])[0]
- else:
- delta = random.choices([-2, -1, 0, 1, 2], weights=[5, 22, 50, 18, 5])[0]
- step = _clamp(step + delta, 0, 7)
- elif phase == "decelerating":
- remaining_ratio = remaining / target_x
- max_step = max(3, int(2.5 + 5.5 * remaining_ratio / 0.35))
- if remaining_ratio > 0.18:
- if step <= 1:
- delta = random.choices([0, 1, 2], weights=[12, 50, 38])[0]
- elif step >= max_step:
- delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0]
- else:
- delta = random.choices([-2, -1, 0, 1, 2], weights=[8, 22, 46, 19, 5])[0]
- else:
- if step <= 0:
- delta = random.choices([1, 2], weights=[65, 35])[0]
- elif step == 1:
- delta = random.choices([-1, 0, 1], weights=[18, 52, 30])[0]
- elif step >= max_step:
- delta = random.choices([-2, -1, 0], weights=[25, 45, 30])[0]
- else:
- delta = random.choices([-2, -1, 0, 1], weights=[10, 30, 45, 15])[0]
- step = _clamp(step + delta, 0, max_step)
- if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]:
- step = 1
- elif phase == "fine_tuning":
- if remaining <= 0:
- break
- step = random.choices([0, 1, 2], weights=[10, 70, 20])[0]
- step = min(step, remaining)
- if step == 0 and len(seq) >= 2 and seq[-1] == seq[-2]:
- step = 1 if remaining >= 1 else 0
- x += step
- if x > target_x:
- x = target_x
- seq.append(x)
- return seq
- def _clamp(v, lo, hi):
- return max(lo, min(hi, v))
- class JdCaptchaHandler:
- """京东滑块验证码处理器,绑定 DrissionPage 的 ChromiumPage / Tab。"""
- def __init__(self, page, token=None, screenshot_path=None):
- self.page = page
- self.token = token or DEFAULT_CAPTCHA_TOKEN
- self.screenshot_path = screenshot_path or DEFAULT_SCREENSHOT_PATH
- @staticmethod
- def _safe_float(value, default=0.0):
- try:
- return float(value)
- except (TypeError, ValueError):
- return default
- def _run_js_safe(self, target, script, default=None):
- try:
- if hasattr(target, "run_js"):
- return target.run_js(script)
- if hasattr(target, "run_script"):
- return target.run_script(script)
- except Exception:
- return default
- return default
- def _get_device_pixel_ratio(self):
- ratio = self._run_js_safe(self.page, "return window.devicePixelRatio || 1;", default=1)
- ratio = self._safe_float(ratio, 1.0)
- return ratio if ratio > 0 else 1.0
- def _get_image_width(self, image_path):
- try:
- with Image.open(image_path) as img:
- return float(img.width)
- except Exception:
- return 0.0
- def _get_ele_css_width(self, ele):
- width = self._run_js_safe(ele, "return this.getBoundingClientRect().width || 0;", default=0)
- width = self._safe_float(width, 0.0)
- if width > 0:
- return width
- try:
- size = ele.rect.size
- if isinstance(size, (tuple, list)) and len(size) >= 1:
- return self._safe_float(size[0], 0.0)
- except Exception:
- pass
- return 0.0
- def _normalize_slider_distance(self, raw_distance, capt_ele, slider_ele, screenshot_path):
- distance = max(0.0, self._safe_float(raw_distance, 0.0))
- capt_css_width = self._get_ele_css_width(capt_ele)
- screenshot_width = self._get_image_width(screenshot_path)
- natural_width = self._safe_float(
- self._run_js_safe(capt_ele, "return this.naturalWidth || 0;", default=0),
- 0.0,
- )
- if capt_css_width > 0 and screenshot_width > 0:
- return distance * (capt_css_width / screenshot_width)
- if capt_css_width > 0 and natural_width > 0:
- return distance * (capt_css_width / natural_width)
- dpr = self._get_device_pixel_ratio()
- if dpr > 1.0:
- return distance / dpr
- return distance
- def generate_human_track(self, distance):
- try:
- distance = float(distance)
- except (TypeError, ValueError):
- return []
- if distance <= 0 or not math.isfinite(distance):
- return []
- tracks = []
- current = 0
- mid = distance * 0.7
- t = 0.2
- v = 0
- move_points = []
- while current < mid:
- a = random.uniform(2, 4)
- v0 = v
- v = v0 + a * t
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- while current < distance:
- a = -random.uniform(0.5, 1.5)
- v0 = v
- v = v0 + a * t
- if v < 0.5:
- v = 0.5
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- total_points = len(move_points)
- for i, move in enumerate(move_points):
- y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
- if i < total_points * 0.3:
- duration = random.uniform(0.01, 0.03)
- elif i > total_points * 0.7:
- duration = random.uniform(0.03, 0.08)
- else:
- duration = random.uniform(0.02, 0.05)
- if random.random() < 0.05:
- duration += random.uniform(0.05, 0.1)
- tracks.append((move, y_offset, duration))
- if random.random() < 0.7:
- tracks.append((-random.randint(1, 3), 0, 0.05))
- return tracks
- def simulate_slider_drag(self, slider_element, target_distance):
- if target_distance <= 0:
- return
- self.page.actions.move_to(slider_element).hold()
- for offset_x, offset_y, duration in self.generate_human_track(target_distance):
- self.page.actions.move(offset_x, offset_y, duration=duration / 1000)
- self.page.actions.release()
- def verify(self, type_num, image_path=None):
- """调用云码平台:type_num=1 坐标点选,2 滑块距离。"""
- image_path = image_path or self.screenshot_path
- with open(image_path, "rb") as f:
- image_b64 = base64.b64encode(f.read()).decode()
- if type_num == 1:
- data = {
- "token": self.token,
- "type": "30332",
- "direction": "top",
- "click_num": 3,
- "image": image_b64,
- }
- else:
- data = {
- "token": self.token,
- "type": "22222",
- "image": image_b64,
- }
- response = requests.post(
- JFBYM_API_URL,
- headers={"Content-Type": "application/json"},
- json=data,
- timeout=30,
- ).json()
- print(response)
- return response["data"]["data"]
- def handle_slider(
- self,
- capt_ele=None,
- slider_ele=None,
- drag_offset=1.5,
- inject_track_js=True,
- ):
- """
- 完整滑块流程:截图 -> 打码 -> 注入轨迹 -> 拖动。
- 成功返回 True,失败返回 False。
- """
- capt_ele = capt_ele or self.page.ele(CAPTCHA_IMG_XPATH, timeout=2)
- if not capt_ele:
- print("未找到验证码背景图")
- return False
- capt_ele.get_screenshot(self.screenshot_path)
- distance = self.verify(2)
- try:
- distance = float(distance)
- except (TypeError, ValueError):
- print(f"滑块距离格式异常:{distance}")
- return False
- print(f"滑块距离(接口原始值):{distance}")
- slider_ele = slider_ele or self.page.ele(SLIDER_IMG_XPATH, timeout=2)
- if not slider_ele:
- print("未找到滑块")
- return False
- drag_distance = self._normalize_slider_distance(
- distance,
- capt_ele=capt_ele,
- slider_ele=slider_ele,
- screenshot_path=self.screenshot_path,
- )
- drag_px = max(0.0, float(drag_distance) - drag_offset)
- if inject_track_js:
- result = simulate(math.ceil(int(drag_distance)))
- self.page.run_js("window.xxxll = {};".format(result))
- time.sleep(3)
- self.simulate_slider_drag(slider_ele, drag_px)
- return True
- def has_captcha_modal(self):
- return bool(self.page.ele(CAPTCHA_MODAL_XPATH, timeout=1))
- def has_moveslide_modal(self):
- capt_cha = "xpath://img[@class='move-img']"
- return bool(self.page.ele(capt_cha, timeout=1))
- def _wait_for_slider(self, rounds=5):
- if self.has_moveslide_modal():
- return True
- for _ in range(rounds):
- time.sleep(1)
- if self.has_moveslide_modal():
- return True
- return False
- def handle_slider_until_gone(self, max_attempts=3, wait_after=2, slider_wait_rounds=5, **handle_kwargs):
- """
- 处理滑块并在每次处理后检查验证码是否仍在页面。
- 验证码消失返回 True;达到 max_attempts 仍存在返回 False。
- """
- if not self.has_captcha_modal():
- return True
- for attempt in range(1, max_attempts + 1):
- print(f"验证码处理 第 {attempt}/{max_attempts} 次")
- if not self._wait_for_slider(slider_wait_rounds):
- print("验证码弹窗在,但滑块元素未出现(可能非滑块类型)")
- if attempt >= max_attempts:
- return False
- time.sleep(wait_after)
- continue
- ok = self.handle_slider(**handle_kwargs)
- if not ok:
- print("本次滑块处理失败")
- else:
- time.sleep(wait_after)
- if not self.has_captcha_modal():
- print("验证码已消失")
- return True
- print("验证码仍在页面")
- if attempt >= max_attempts:
- break
- time.sleep(wait_after)
- if self.has_captcha_modal():
- print(f"验证码处理失败,已尝试 {max_attempts} 次,弹窗仍在")
- return False
- return True
- def handle_jd_slider_captcha(
- page,
- token=None,
- screenshot_path=None,
- max_attempts=3,
- wait_after=2,
- slider_wait_rounds=5,
- pause_listen=True,
- pause_listen_clear=False,
- **kwargs,
- ):
- """
- 便捷入口:处理当前页面的京东滑块验证码,最多重试 max_attempts 次。
- 返回 True:无需验证码或已成功通过;False:处理失败或验证码仍在。
- pause_listen:auto_crawl 等已开启 listen 的场景建议 True。
- pause_listen_clear:暂停时是否清空监听队列;采集中应为 False,避免丢掉首屏 wareList。
- """
- handler = JdCaptchaHandler(page, token=token, screenshot_path=screenshot_path)
- if not handler.has_captcha_modal():
- return True
- if pause_listen:
- with pause_page_listen(page, clear=pause_listen_clear):
- return handler.handle_slider_until_gone(
- max_attempts=max_attempts,
- wait_after=wait_after,
- slider_wait_rounds=slider_wait_rounds,
- **kwargs,
- )
- return handler.handle_slider_until_gone(
- max_attempts=max_attempts,
- wait_after=wait_after,
- slider_wait_rounds=slider_wait_rounds,
- **kwargs,
- )
|