| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- import random
- import re
- import signal
- import socket
- import sys
- import time
- import base64
- from DrissionPage import ChromiumPage, ChromiumOptions
- import json
- import math
- import requests
- from DrissionPage.common import Actions
- from PIL import Image
- token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class JdCrawlerV2:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.register_signal_handler()
- self.ip = None
- self.account_name = None
- self.platform = 2
- self.task_dict = drug_dict or {}
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- self.driver = ChromiumPage(co)
- self.driver.listen.start("api?appid=search-pc-java")
- def register_signal_handler(self):
- def handler(signum, frame):
- print("\n⚠️ 程序退出")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def get_shop(self):
- url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
- self.driver.get(url, timeout=10)
- time.sleep(2)
- print("为滑块验证码")
- for i in range(3):
- capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
- capt_ele.get_screenshot('./element_screenshot.png')
- distance = self.verify(2)
- print(f"滑块距离:{distance}")
- slider_element = self.driver.ele(
- "xpath://img[@class='move-img']")
- # 获取滑块元素
- slider = self.driver.ele("xpath://img[@class='move-img']")
- start_x = slider.mid_x + random.uniform(-4, 4)
- start_y = slider.mid_y + random.uniform(-3, 3)
- end_x = start_x + distance + random.uniform(-3, 3)
- end_y = start_y + random.uniform(-1, 1)
- # self.swipe(start_x, start_y, end_x, end_y,
- # duration=random.uniform(1.2, 2.0),
- # deviation=random.randint(20, 40))
- self.human_slide(start_x, start_y, end_x, end_y)
- time.sleep(2)
- # self.simulate_slider_drag(slider_element, float(distance)-1.5)
- # # 滑块验证处理
- #
- # time.sleep(5)
- # capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
- # if not capt_ele:
- # break
- # time.sleep(5)
- def verify(self, type_num):
- """调用云码平台服务"""
- with open('element_screenshot.png', 'rb') as f:
- b = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- if type_num == 1:
- # 坐标类型
- data = {
- "token": token,
- "type": "30332",
- "direction": "top",
- "click_num": 3,
- "image": b,
- }
- else:
- # 滑块类型
- data = {
- "token": token,
- "type": "22222",
- "image": b,
- }
- _headers = {
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, headers=_headers, json=data).json()
- print(response)
- return response["data"]["data"]
- def human_slide(self,start_x, start_y, end_x, end_y, hold_time=0):
- """模拟真实人类滑动轨迹 - 连续变化的速度曲线,微小偏差"""
- points = []
- # 随机参数
- total_steps = random.randint(60, 85) # 更多步数使曲线更平滑
- # 计算滑动距离
- distance_x = end_x - start_x
- distance_y = end_y - start_y
- total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2)
- # 微小偏差设置 - 人类不完美的对齐
- # X方向偏差:1-6像素,70%概率过冲,30%欠冲
- if random.random() < 0.7:
- offset_x = random.randint(1, min(5, int(total_distance * 0.01)))
- else:
- offset_x = -random.randint(1, min(3, int(total_distance * 0.02)))
- # # Y方向微小偏差:±0-2像素
- # offset_y = random.randint(-2, 2)
- # 实际停止位置
- stop_x = end_x + offset_x
- stop_y = end_y
- # 物理参数:模拟手指滑动的物理过程
- # 使用加速度、最大速度、减速度模型
- accel_time_ratio = random.uniform(0.25, 0.35) # 加速阶段占总时间的比例
- decel_time_ratio = random.uniform(0.25, 0.35) # 减速阶段占总时间的比例
- max_speed = random.uniform(1.5, 2.2) # 最大速度倍数
- # 生成轨迹
- for i in range(total_steps):
- t = i / (total_steps - 1) # 时间进度 0-1
- # 物理速度曲线:连续变化的加速度过程
- if t < accel_time_ratio:
- # 加速阶段:从0加速到最大速度
- phase_t = t / accel_time_ratio
- # 使用平滑的加速曲线(二次函数)
- speed_factor = max_speed * phase_t * phase_t
- elif t < 1 - decel_time_ratio:
- # 匀速阶段:保持最大速度
- speed_factor = max_speed
- # 加入轻微的随机波动,模拟人类手部自然抖动
- speed_factor += random.uniform(-0.05, 0.05)
- else:
- # 减速阶段:从最大速度减速到0
- phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio
- # 使用平滑的减速曲线(二次函数,末尾更平缓)
- speed_factor = max_speed * (1 - phase_t * phase_t)
- # 计算位移(积分速度得到位置)
- # 使用贝塞尔曲线计算位置,让运动更自然
- if t < accel_time_ratio:
- # 加速阶段的位置
- phase_t = t / accel_time_ratio
- progress = (max_speed / 3) * phase_t * phase_t * phase_t
- elif t < 1 - decel_time_ratio:
- # 匀速阶段的位置
- phase_t = (t - accel_time_ratio) / (1 - accel_time_ratio - decel_time_ratio)
- # 匀速阶段的位移加上加速阶段完成的位移
- accel_distance = (max_speed / 3) # 加速阶段完成的位移
- progress = accel_distance + (1 - 2 * accel_distance) * phase_t
- else:
- # 减速阶段的位置
- phase_t = (t - (1 - decel_time_ratio)) / decel_time_ratio
- # 从减速起点平滑过渡到终点
- progress = 1 - (max_speed / 3) * (1 - phase_t) * (1 - phase_t) * (1 - phase_t)
- # 限制进度在0-1之间
- progress = max(0, min(1, progress))
- # 添加自然的手部抖动
- if t < 0.1 or t > 0.9:
- # 开始和结束:非常小的抖动
- jitter_x = random.randint(-1, 1)
- jitter_y = random.randint(-1, 1)
- elif t < 0.3 or t > 0.7:
- # 过渡阶段:小抖动
- jitter_x = random.randint(-2, 2)
- jitter_y = random.randint(-2, 2)
- else:
- # 中间快速阶段:稍大抖动
- jitter_x = random.randint(-2, 2) if random.random() < 0.3 else 0
- jitter_y = random.randint(-2, 2) if random.random() < 0.3 else 0
- # 计算当前位置
- current_x = start_x + (stop_x - start_x) * progress + jitter_x
- current_y = start_y + (stop_y - start_y) * progress + jitter_y
- # 确保轨迹单调性(不会回退)
- if points:
- if distance_x > 0: # 向右滑动
- current_x = max(points[-1][0], current_x)
- elif distance_x < 0: # 向左滑动
- current_x = min(points[-1][0], current_x)
- # 时间延迟 - 基于当前速度计算
- # 速度越快,延迟越短
- if t < 0.1: # 开始阶段
- delay = random.uniform(0.002, 0.008)
- elif t < 0.9: # 中间阶段
- # 延迟与速度成反比
- base_delay = 0.008
- speed_delay_factor = 1.0 / (speed_factor + 0.5)
- delay = base_delay * speed_delay_factor + random.uniform(-0.002, 0.002)
- delay = max(0.005, min(delay, 0.015))
- else: # 结束阶段
- # 逐渐增加延迟
- slow_factor = 1.0 + (t - 0.9) * 10
- delay = random.uniform(0.015, 0.025) * slow_factor
- points.append((current_x, current_y, delay))
- # 确保最后一点是实际停止位置
- if points:
- points[-1] = (stop_x, stop_y, 0)
- # 执行滑动
- if points:
- # 按下起点
- ac = Actions(self.driver)
- ac.hold()
- time.sleep(random.uniform(0.002, 0.006))
- # 移动轨迹
- for i, point in enumerate(points[1:]):
- ac.move(point[0], point[1])
- # 最后阶段可能的微小停顿(人类犹豫)
- # progress = (i + 1) / len(points[1:])
- # if progress > 0.98:
- # time.sleep(random.uniform(0.001, 0.003))
- time.sleep(point[2])
- # 抬起手指
- ac.release()
- # 4. 执行所有动作
- ac.perform()
- # 滑动后的随机延迟
- hold_time = random.uniform(1, 2)
- time.sleep(hold_time)
- return points
- def run(self):
- try:
- self.init_browser()
- self.get_shop()
- except Exception as e:
- self.success = False
- finally:
- if self.driver:
- self.driver.quit()
- self.driver = None
- if __name__ == '__main__':
- JdCrawlerV2().run()
|