| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import random
- import re
- import signal
- import socket
- import sys
- import time
- import base64
- from DrissionPage import ChromiumPage, ChromiumOptions
- import json
- import requests
- from PIL import Image
- token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class JdCrawlerV2:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.register_signal_handler()
- self.ip = None
- self.account_name = None
- self.platform = 2
- self.task_dict = drug_dict or {}
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- self.driver = ChromiumPage(co)
- self.driver.listen.start("api?appid=search-pc-java")
- def register_signal_handler(self):
- def handler(signum, frame):
- print("\n⚠️ 程序退出")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def get_shop(self):
- # url = "https://mall.jd.com/index-10305746.html?from=pc"
- #
- # self.driver.get(url, timeout=10)
- # time.sleep(3)
- # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']")
- # if not hover_ele:
- # logger.error("未找到店铺 Logo,无法执行悬浮操作")
- # return
- # hover_ele.hover()
- # time.sleep(1.5)
- #
- # # 先在主文档中找“营业执照”
- # target_ele = self.driver.ele(
- # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]",
- # timeout=5,
- # )
- #
- #
- # if not target_ele:
- # logger.error("悬浮后仍未找到“营业执照”链接")
- # return
- #
- # try:
- # target_ele.scroll.to_see()
- # except Exception:
- # pass
- #
- # try:
- # target_ele.click()
- # except Exception:
- # # 回退到 JS 点击,避免被遮挡导致常规点击失败
- # target_ele.click(by_js=True)
- # logger.info("已点击“营业执照”链接")
- # time.sleep(10)
- url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
- self.driver.get(url, timeout=10)
- time.sleep(2)
- print("为滑块验证码")
- for i in range(3):
- capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
- capt_ele.get_screenshot('./element_screenshot.png')
- distance = self.verify(2)
- print(f"滑块距离:{distance}")
- slider_element = self.driver.ele(
- "xpath://img[@class='move-img']")
- self.simulate_slider_drag(slider_element, float(distance)-1.5)
- # 滑块验证处理
- time.sleep(5)
- capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
- if not capt_ele:
- break
- time.sleep(5)
- def verify(self, type_num):
- """调用云码平台服务"""
- with open('element_screenshot.png', 'rb') as f:
- b = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- if type_num == 1:
- # 坐标类型
- data = {
- "token": token,
- "type": "30332",
- "direction": "top",
- "click_num": 3,
- "image": b,
- }
- else:
- # 滑块类型
- data = {
- "token": token,
- "type": "22222",
- "image": b,
- }
- _headers = {
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, headers=_headers, json=data).json()
- print(response)
- return response["data"]["data"]
- def generate_human_track(self, distance):
- """
- 生成人类拖动的轨迹
- :param distance: 需要拖动的距离(像素)
- :return: 轨迹点列表,每个点包含(x偏移, y偏移, 延迟时间)
- """
- tracks = []
- current = 0
- mid = distance * 0.7 # 70%处开始减速
- t = 0.2
- v = 0
- move_points = []
- # 第一阶段:加速
- while current < mid:
- a = random.uniform(2, 4)
- v0 = v
- v = v0 + a * t
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- # 第二阶段:减速
- while current < distance:
- a = -random.uniform(0.5, 1.5)
- v0 = v
- v = v0 + a * t
- if v < 0.5: # 防止速度过小
- v = 0.5
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- # 添加随机性并生成最终轨迹
- total_points = len(move_points)
- for i, move in enumerate(move_points):
- x_offset = move
- # 添加垂直抖动(模拟手抖)
- if i % random.randint(2, 4) == 0:
- y_offset = random.randint(-2, 2)
- else:
- y_offset = 0
- # 时间间隔(模拟人类反应)
- if i < total_points * 0.3: # 开始阶段较快
- duration = random.uniform(0.01, 0.03)
- elif i > total_points * 0.7: # 结束阶段较慢
- duration = random.uniform(0.03, 0.08)
- else: # 中间阶段
- duration = random.uniform(0.02, 0.05)
- # 随机添加微小停顿
- if random.random() < 0.05:
- duration += random.uniform(0.05, 0.1)
- tracks.append((x_offset, y_offset, duration))
- # 最终微调:到达终点后轻微回拉
- if random.random() < 0.7:
- tracks.append((-random.randint(1, 3), 0, 0.05))
- return tracks
- def simulate_slider_drag(self, slider_element, target_distance):
- """
- 模拟人类拖动滑块
- """
- # 移动到滑块并按住
- self.driver.actions.move_to(slider_element).hold()
- # 生成轨迹
- tracks = self.generate_human_track(target_distance)
- # 按轨迹拖动
- for track in tracks:
- offset_x, offset_y, duration = track
- self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
- time.sleep(0.8)
- # 释放鼠标
- self.driver.actions.release()
- def run(self):
- try:
- self.init_browser()
- self.get_shop()
- except Exception as e:
- self.success = False
- finally:
- if self.driver:
- self.driver.quit()
- self.driver = None
- if __name__ == '__main__':
- JdCrawlerV2().run()
|