import random import re import signal import socket import sys import time import base64 from DrissionPage import ChromiumPage, ChromiumOptions import json import requests from PIL import Image token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class JdCrawlerV2: def __init__(self, drug_dict=None): self.driver = None self.register_signal_handler() self.ip = None self.account_name = None self.platform = 2 self.task_dict = drug_dict or {} if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") self.driver = ChromiumPage(co) self.driver.listen.start("api?appid=search-pc-java") def register_signal_handler(self): def handler(signum, frame): print("\n⚠️ 程序退出") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def get_shop(self): # url = "https://mall.jd.com/index-10305746.html?from=pc" # # self.driver.get(url, timeout=10) # time.sleep(3) # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']") # if not hover_ele: # logger.error("未找到店铺 Logo,无法执行悬浮操作") # return # hover_ele.hover() # time.sleep(1.5) # # # 先在主文档中找“营业执照” # target_ele = self.driver.ele( # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]", # timeout=5, # ) # # # if not target_ele: # logger.error("悬浮后仍未找到“营业执照”链接") # return # # try: # target_ele.scroll.to_see() # except Exception: # pass # # try: # target_ele.click() # except Exception: # # 回退到 JS 点击,避免被遮挡导致常规点击失败 # target_ele.click(by_js=True) # logger.info("已点击“营业执照”链接") # time.sleep(10) url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html" self.driver.get(url, timeout=10) time.sleep(2) print("为滑块验证码") for i in range(3): capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2) capt_ele.get_screenshot('./element_screenshot.png') distance = self.verify(2) print(f"滑块距离:{distance}") slider_element = self.driver.ele( "xpath://img[@class='move-img']") self.simulate_slider_drag(slider_element, float(distance)-1.5) # 滑块验证处理 time.sleep(5) capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2) if not capt_ele: break time.sleep(5) def verify(self, type_num): """调用云码平台服务""" with open('element_screenshot.png', 'rb') as f: b = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" if type_num == 1: # 坐标类型 data = { "token": token, "type": "30332", "direction": "top", "click_num": 3, "image": b, } else: # 滑块类型 data = { "token": token, "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) return response["data"]["data"] def generate_human_track(self, distance): """ 生成人类拖动的轨迹 :param distance: 需要拖动的距离(像素) :return: 轨迹点列表,每个点包含(x偏移, y偏移, 延迟时间) """ tracks = [] current = 0 mid = distance * 0.7 # 70%处开始减速 t = 0.2 v = 0 move_points = [] # 第一阶段:加速 while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) # 第二阶段:减速 while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: # 防止速度过小 v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) # 添加随机性并生成最终轨迹 total_points = len(move_points) for i, move in enumerate(move_points): x_offset = move # 添加垂直抖动(模拟手抖) if i % random.randint(2, 4) == 0: y_offset = random.randint(-2, 2) else: y_offset = 0 # 时间间隔(模拟人类反应) if i < total_points * 0.3: # 开始阶段较快 duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: # 结束阶段较慢 duration = random.uniform(0.03, 0.08) else: # 中间阶段 duration = random.uniform(0.02, 0.05) # 随机添加微小停顿 if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((x_offset, y_offset, duration)) # 最终微调:到达终点后轻微回拉 if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def simulate_slider_drag(self, slider_element, target_distance): """ 模拟人类拖动滑块 """ # 移动到滑块并按住 self.driver.actions.move_to(slider_element).hold() # 生成轨迹 tracks = self.generate_human_track(target_distance) # 按轨迹拖动 for track in tracks: offset_x, offset_y, duration = track self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) time.sleep(0.8) # 释放鼠标 self.driver.actions.release() def run(self): try: self.init_browser() self.get_shop() except Exception as e: self.success = False finally: if self.driver: self.driver.quit() self.driver = None if __name__ == '__main__': JdCrawlerV2().run()