import random import signal import socket import sys import time import base64 from DrissionPage import ChromiumPage, ChromiumOptions import math import requests from DrissionPage.common import Actions token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class JdCrawlerV2: def __init__(self, drug_dict=None): self.driver = None self.register_signal_handler() self.ip = None self.account_name = None self.platform = 2 self.task_dict = drug_dict or {} if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") self.driver = ChromiumPage(co) self.driver.listen.start("api?appid=search-pc-java") def register_signal_handler(self): def handler(signum, frame): print("\n⚠️ 程序退出") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def get_shop(self): url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html" self.driver.get(url, timeout=10) time.sleep(5) print("为滑块验证码") for i in range(2): capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2) if not capt_ele: print("未找到验证码主图,可能已通过验证或页面未加载完成") break capt_ele.get_screenshot('./element_screenshot.png') distance = self.verify(2) try: distance = float(distance) except (TypeError, ValueError): print(f"滑块距离格式异常:{distance}") continue print(f"滑块距离:{distance}") # 获取滑块元素 slider = self.driver.ele( "xpath://img[@class='move-img']", timeout=2 ) if not slider: print("未找到滑块") return start_x, start_y = slider.rect.midpoint start_x += random.uniform(-1, 1) start_y += random.uniform(-1, 1) end_x = ( start_x + distance + random.uniform(-3, 3) ) end_y = start_y+ random.uniform(-1, 1) self.human_slide( start_x, start_y, end_x, end_y ) # self.swipe(start_x, start_y, end_x, end_y, # duration=random.uniform(1.2, 2.0), # deviation=random.randint(20, 40)) # self.human_slide(start_x, start_y, end_x, end_y) time.sleep(100) # self.simulate_slider_drag(slider_element, float(distance)-1.5) # # 滑块验证处理 # # time.sleep(5) # capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2) # if not capt_ele: # break # time.sleep(5) def verify(self, type_num): """调用云码平台服务""" with open('element_screenshot.png', 'rb') as f: b = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" if type_num == 1: # 坐标类型 data = { "token": token, "type": "30332", "direction": "top", "click_num": 3, "image": b, } else: # 滑块类型 data = { "token": token, "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data, timeout=30).json() print(response) return response.get("data", {}).get("data") def human_slide(self, start_x, start_y, end_x, end_y): """ 更真实滑块拖动 """ actions = Actions(self.driver) points = [] total_steps = random.randint(20, 30) distance_x = end_x - start_x distance_y = end_y - start_y total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2) # 防止 randint 越界 max_offset = max(2, min( 5, int(total_distance * 0.01) + 1 )) if random.random() < 0.7: offset_x = random.randint(1, max_offset) else: offset_x = -random.randint(1, 3) stop_x = end_x + offset_x stop_y = end_y accel_ratio = random.uniform( 0.25, 0.35 ) decel_ratio = random.uniform( 0.25, 0.35 ) points.append((start_x, start_y)) for i in range(1, total_steps): t = i / (total_steps - 1) if t < accel_ratio: p = (t / accel_ratio) ** 3 * 0.3 elif t < (1 - decel_ratio): mid_t = ( t - accel_ratio ) / (1 - accel_ratio - decel_ratio) p = 0.3 + mid_t * 0.5 else: end_t = ( t - (1 - decel_ratio) ) / decel_ratio p = 0.8 + (1 - (1 - end_t) ** 3) * 0.2 jitter_x = random.randint(-1, 1) jitter_y = random.randint(-1, 1) x = start_x + (stop_x - start_x) * p + jitter_x y = start_y + (stop_y - start_y) * p + jitter_y if x < points[-1][0]: x = points[-1][0] points.append((x, y)) points[-1] = (stop_x, stop_y) print("开始拖动") # 按住滑块 # 先移动到滑块元素 slider = self.driver.ele( "xpath://img[@class='move-img']" ) actions = Actions(self.driver) actions.move_to(slider).hold() last_x, last_y = points[0] for x, y in points[1:]: dx = x - last_x dy = y - last_y actions.move( dx, dy ) time.sleep( random.uniform( 0.005, 0.02 ) ) last_x, last_y = x, y actions.release() time.sleep( random.uniform( 1, 2 ) ) def run(self): try: self.init_browser() self.get_shop() except Exception as e: self.success = False print(f"运行异常: {e}") finally: if self.driver: self.driver.quit() self.driver = None if __name__ == '__main__': JdCrawlerV2().run()