import random import re import signal import socket import sys import time import base64 from DrissionPage import ChromiumPage, ChromiumOptions import json import math import requests from DrissionPage.common import Actions from PIL import Image token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class JdCrawlerV2: def __init__(self, drug_dict=None): self.driver = None self.register_signal_handler() self.ip = None self.account_name = None self.platform = 2 self.task_dict = drug_dict or {} if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") self.driver = ChromiumPage(co) self.driver.listen.start("api?appid=search-pc-java") def register_signal_handler(self): def handler(signum, frame): print("\n⚠️ 程序退出") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def get_shop(self): url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html" self.driver.get(url, timeout=10) time.sleep(2) print("为滑块验证码") for i in range(3): capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2) capt_ele.get_screenshot('./element_screenshot.png') distance = self.verify(2) print(f"滑块距离:{distance}") slider_element = self.driver.ele( "xpath://img[@class='move-img']") # 获取滑块元素 slider = self.driver.ele("xpath://img[@class='move-img']") start_x = slider.mid_x + random.uniform(-4, 4) start_y = slider.mid_y + random.uniform(-3, 3) end_x = start_x + distance + random.uniform(-3, 3) end_y = start_y + random.uniform(-1, 1) # self.swipe(start_x, start_y, end_x, end_y, # duration=random.uniform(1.2, 2.0), # deviation=random.randint(20, 40)) self.human_slide(start_x, start_y, end_x, end_y) time.sleep(2) # self.simulate_slider_drag(slider_element, float(distance)-1.5) # # 滑块验证处理 # # time.sleep(5) # capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2) # if not capt_ele: # break # time.sleep(5) def verify(self, type_num): """调用云码平台服务""" with open('element_screenshot.png', 'rb') as f: b = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" if type_num == 1: # 坐标类型 data = { "token": token, "type": "30332", "direction": "top", "click_num": 3, "image": b, } else: # 滑块类型 data = { "token": token, "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) return response["data"]["data"] def human_slide(self,start_x, start_y, end_x, end_y, hold_time=0): """模拟真实人类滑动轨迹 - 连续变化的速度曲线,微小偏差""" points = [] # 随机参数 total_steps = random.randint(60, 85) # 更多步数使曲线更平滑 # 计算滑动距离 distance_x = end_x - start_x distance_y = end_y - start_y total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2) # 微小偏差设置 - 人类不完美的对齐 # X方向偏差:1-6像素,70%概率过冲,30%欠冲 if random.random() < 0.7: offset_x = random.randint(1, min(5, int(total_distance * 0.01))) else: offset_x = -random.randint(1, min(3, int(total_distance * 0.02))) # # Y方向微小偏差:±0-2像素 # offset_y = random.randint(-2, 2) # 实际停止位置 stop_x = end_x + offset_x stop_y = end_y # 物理参数:模拟手指滑动的物理过程 # 使用加速度、最大速度、减速度模型 accel_time_ratio = random.uniform(0.25, 0.35) # 加速阶段占总时间的比例 decel_time_ratio = random.uniform(0.25, 0.35) # 减速阶段占总时间的比例 max_speed = random.uniform(1.5, 2.2) # 最大速度倍数 # 生成轨迹 for i in range(total_steps): t = i / (total_steps - 1) # 时间进度 0-1 # 物理速度曲线:连续变化的加速度过程 if t < accel_time_ratio: # 加速阶段:从0加速到最大速度 phase_t = t / accel_time_ratio # 使用平滑的加速曲线(二次函数) speed_factor = max_speed * phase_t * phase_t elif t < 1 - decel_time_ratio: # 匀速阶段:保持最大速度 speed_factor = max_speed # 加入轻微的随机波动,模拟人类手部自然抖动 speed_factor += random.uniform(-0.05, 0.05) else: # 减速阶段:从最大速度减速到0 phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio # 使用平滑的减速曲线(二次函数,末尾更平缓) speed_factor = max_speed * (1 - phase_t * phase_t) # 计算位移(积分速度得到位置) # 使用贝塞尔曲线计算位置,让运动更自然 if t < accel_time_ratio: # 加速阶段的位置 phase_t = t / accel_time_ratio progress = (max_speed / 3) * phase_t * phase_t * phase_t elif t < 1 - decel_time_ratio: # 匀速阶段的位置 phase_t = (t - accel_time_ratio) / (1 - accel_time_ratio - decel_time_ratio) # 匀速阶段的位移加上加速阶段完成的位移 accel_distance = (max_speed / 3) # 加速阶段完成的位移 progress = accel_distance + (1 - 2 * accel_distance) * phase_t else: # 减速阶段的位置 phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio # 从减速起点平滑过渡到终点 progress = 1 - (max_speed / 3) * (1 - phase_t) * (1 - phase_t) * (1 - phase_t) # 限制进度在0-1之间 progress = max(0, min(1, progress)) # 添加自然的手部抖动 if t < 0.1 or t > 0.9: # 开始和结束:非常小的抖动 jitter_x = random.randint(-1, 1) jitter_y = random.randint(-1, 1) elif t < 0.3 or t > 0.7: # 过渡阶段:小抖动 jitter_x = random.randint(-2, 2) jitter_y = random.randint(-2, 2) else: # 中间快速阶段:稍大抖动 jitter_x = random.randint(-2, 2) if random.random() < 0.3 else 0 jitter_y = random.randint(-2, 2) if random.random() < 0.3 else 0 # 计算当前位置 current_x = start_x + (stop_x - start_x) * progress + jitter_x current_y = start_y + (stop_y - start_y) * progress + jitter_y # 确保轨迹单调性(不会回退) if points: if distance_x > 0: # 向右滑动 current_x = max(points[-1][0], current_x) elif distance_x < 0: # 向左滑动 current_x = min(points[-1][0], current_x) # 时间延迟 - 基于当前速度计算 # 速度越快,延迟越短 if t < 0.1: # 开始阶段 delay = random.uniform(0.002, 0.008) elif t < 0.9: # 中间阶段 # 延迟与速度成反比 base_delay = 0.008 speed_delay_factor = 1.0 / (speed_factor + 0.5) delay = base_delay * speed_delay_factor + random.uniform(-0.002, 0.002) delay = max(0.005, min(delay, 0.015)) else: # 结束阶段 # 逐渐增加延迟 slow_factor = 1.0 + (t - 0.9) * 10 delay = random.uniform(0.015, 0.025) * slow_factor points.append((current_x, current_y, delay)) # 确保最后一点是实际停止位置 if points: points[-1] = (stop_x, stop_y, 0) # 执行滑动 if points: # 按下起点 ac = Actions(self.driver) ac.hold() time.sleep(random.uniform(0.002, 0.006)) # 移动轨迹 for i, point in enumerate(points[1:]): ac.move(point[0], point[1]) # 最后阶段可能的微小停顿(人类犹豫) # progress = (i + 1) / len(points[1:]) # if progress > 0.98: # time.sleep(random.uniform(0.001, 0.003)) time.sleep(point[2]) # 抬起手指 ac.release() # 4. 执行所有动作 ac.perform() # 滑动后的随机延迟 hold_time = random.uniform(1, 2) time.sleep(hold_time) return points def run(self): try: self.init_browser() self.get_shop() except Exception as e: self.success = False finally: if self.driver: self.driver.quit() self.driver = None if __name__ == '__main__': JdCrawlerV2().run()