import random import re import signal import socket import sys import time import base64 from DrissionPage import ChromiumPage, ChromiumOptions import json from DrissionPage.common import Actions import requests from PIL import Image token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class JdCrawlerV2: def __init__(self, drug_dict=None): self.driver = None self.register_signal_handler() self.ip = None self.account_name = None self.platform = 2 self.task_dict = drug_dict or {} if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") self.driver = ChromiumPage(co) self.driver.listen.start("api?appid=search-pc-java") def register_signal_handler(self): def handler(signum, frame): print("\n⚠️ 程序退出") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def get_shop(self): # url = "https://mall.jd.com/index-10305746.html?from=pc" # # self.driver.get(url, timeout=10) # time.sleep(3) # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']") # if not hover_ele: # logger.error("未找到店铺 Logo,无法执行悬浮操作") # return # hover_ele.hover() # time.sleep(1.5) # # # 先在主文档中找“营业执照” # target_ele = self.driver.ele( # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]", # timeout=5, # ) # # # if not target_ele: # logger.error("悬浮后仍未找到“营业执照”链接") # return # # try: # target_ele.scroll.to_see() # except Exception: # pass # # try: # target_ele.click() # except Exception: # # 回退到 JS 点击,避免被遮挡导致常规点击失败 # target_ele.click(by_js=True) # logger.info("已点击“营业执照”链接") # time.sleep(10) url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html" self.driver.get(url, timeout=10) time.sleep(2) print("为滑块验证码") for i in range(2): capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2) capt_ele.get_screenshot('./element_screenshot.png') distance = self.verify(2) print(f"滑块距离:{distance}") slider_element = self.driver.ele( "xpath://img[@class='move-img']") self.simulate_slider_drag(slider_element, float(distance)-1.5) # 滑块验证处理 time.sleep(5) capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2) if not capt_ele: break time.sleep(5) def verify(self, type_num): """调用云码平台服务""" with open('element_screenshot.png', 'rb') as f: b = base64.b64encode(f.read()).decode() url = "http://api.jfbym.com/api/YmServer/customApi" if type_num == 1: # 坐标类型 data = { "token": token, "type": "30332", "direction": "top", "click_num": 3, "image": b, } else: # 滑块类型 data = { "token": token, "type": "22222", "image": b, } _headers = { "Content-Type": "application/json" } response = requests.request("POST", url, headers=_headers, json=data).json() print(response) return response["data"]["data"] import random import math def build_track(self,distance): """ 通用滑动轨迹(UI测试用) """ track = [] current = 0 mid = distance * 0.6 t = 0.2 v = 0 while current < distance: if current < mid: a = random.uniform(2.0, 3.5) # 加速 else: a = random.uniform(-3.0, -1.5) # 减速 v0 = v v = max(0.5, v0 + a * t) move = v0 * t + 0.5 * a * t * t current += move # 防止超出 if current > distance: move -= (current - distance) x = move y = random.uniform(-1, 1) track.append((x, y)) # 轻微回调(模拟人手修正) if random.random() < 0.3: track.append((-random.uniform(1, 3), 0)) return track def simulate_slider_drag(self, slider_element, target_distance): """ 模拟人类拖动滑块 """ actions = Actions(self.driver) track = self.build_track(target_distance) actions.move_to(slider_element).hold() for x, y in track: actions.move(x, y) time.sleep(random.uniform(0.01, 0.03)) actions.release() def run(self): try: self.init_browser() self.get_shop() except Exception as e: self.success = False finally: if self.driver: self.driver.quit() self.driver = None if __name__ == '__main__': JdCrawlerV2().run()