| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import random
- import signal
- import socket
- import sys
- import time
- import base64
- from DrissionPage import ChromiumPage, ChromiumOptions
- import math
- import requests
- from DrissionPage.common import Actions
- token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class JdCrawlerV2:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.register_signal_handler()
- self.ip = None
- self.account_name = None
- self.platform = 2
- self.task_dict = drug_dict or {}
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- self.driver = ChromiumPage(co)
- self.driver.listen.start("api?appid=search-pc-java")
- def register_signal_handler(self):
- def handler(signum, frame):
- print("\n⚠️ 程序退出")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def get_shop(self):
- url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
- self.driver.get(url, timeout=10)
- time.sleep(5)
- print("为滑块验证码")
- for i in range(2):
- capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
- if not capt_ele:
- print("未找到验证码主图,可能已通过验证或页面未加载完成")
- break
- capt_ele.get_screenshot('./element_screenshot.png')
- distance = self.verify(2)
- try:
- distance = float(distance)
- except (TypeError, ValueError):
- print(f"滑块距离格式异常:{distance}")
- continue
- print(f"滑块距离:{distance}")
- # 获取滑块元素
- slider = self.driver.ele(
- "xpath://img[@class='move-img']",
- timeout=2
- )
- if not slider:
- print("未找到滑块")
- return
- start_x, start_y = slider.rect.midpoint
- start_x += random.uniform(-1, 1)
- start_y += random.uniform(-1, 1)
- end_x = (
- start_x +
- distance +
- random.uniform(-3, 3)
- )
- end_y = start_y+ random.uniform(-1, 1)
- self.human_slide(
- start_x,
- start_y,
- end_x,
- end_y
- )
- # self.swipe(start_x, start_y, end_x, end_y,
- # duration=random.uniform(1.2, 2.0),
- # deviation=random.randint(20, 40))
- # self.human_slide(start_x, start_y, end_x, end_y)
- time.sleep(100)
- # self.simulate_slider_drag(slider_element, float(distance)-1.5)
- # # 滑块验证处理
- #
- # time.sleep(5)
- # capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
- # if not capt_ele:
- # break
- # time.sleep(5)
- def verify(self, type_num):
- """调用云码平台服务"""
- with open('element_screenshot.png', 'rb') as f:
- b = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- if type_num == 1:
- # 坐标类型
- data = {
- "token": token,
- "type": "30332",
- "direction": "top",
- "click_num": 3,
- "image": b,
- }
- else:
- # 滑块类型
- data = {
- "token": token,
- "type": "22222",
- "image": b,
- }
- _headers = {
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, headers=_headers, json=data, timeout=30).json()
- print(response)
- return response.get("data", {}).get("data")
- def human_slide(self, start_x, start_y, end_x, end_y):
- """
- 更真实滑块拖动
- """
- actions = Actions(self.driver)
- points = []
- total_steps = random.randint(20, 30)
- distance_x = end_x - start_x
- distance_y = end_y - start_y
- total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2)
- # 防止 randint 越界
- max_offset = max(2, min(
- 5,
- int(total_distance * 0.01) + 1
- ))
- if random.random() < 0.7:
- offset_x = random.randint(1, max_offset)
- else:
- offset_x = -random.randint(1, 3)
- stop_x = end_x + offset_x
- stop_y = end_y
- accel_ratio = random.uniform(
- 0.25,
- 0.35
- )
- decel_ratio = random.uniform(
- 0.25,
- 0.35
- )
- points.append((start_x, start_y))
- for i in range(1, total_steps):
- t = i / (total_steps - 1)
- if t < accel_ratio:
- p = (t / accel_ratio) ** 3 * 0.3
- elif t < (1 - decel_ratio):
- mid_t = (
- t - accel_ratio
- ) / (1 - accel_ratio - decel_ratio)
- p = 0.3 + mid_t * 0.5
- else:
- end_t = (
- t - (1 - decel_ratio)
- ) / decel_ratio
- p = 0.8 + (1 - (1 - end_t) ** 3) * 0.2
- jitter_x = random.randint(-1, 1)
- jitter_y = random.randint(-1, 1)
- x = start_x + (stop_x - start_x) * p + jitter_x
- y = start_y + (stop_y - start_y) * p + jitter_y
- if x < points[-1][0]:
- x = points[-1][0]
- points.append((x, y))
- points[-1] = (stop_x, stop_y)
- print("开始拖动")
- # 按住滑块
- # 先移动到滑块元素
- slider = self.driver.ele(
- "xpath://img[@class='move-img']"
- )
- actions = Actions(self.driver)
- actions.move_to(slider).hold()
- last_x, last_y = points[0]
- for x, y in points[1:]:
- dx = x - last_x
- dy = y - last_y
- actions.move(
- dx,
- dy
- )
- time.sleep(
- random.uniform(
- 0.005,
- 0.02
- )
- )
- last_x, last_y = x, y
- actions.release()
- time.sleep(
- random.uniform(
- 1,
- 2
- )
- )
- def run(self):
- try:
- self.init_browser()
- self.get_shop()
- except Exception as e:
- self.success = False
- print(f"运行异常: {e}")
- finally:
- if self.driver:
- self.driver.quit()
- self.driver = None
- if __name__ == '__main__':
- JdCrawlerV2().run()
|