| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- import random
- import re
- import signal
- import socket
- import sys
- import time
- import base64
- from DrissionPage import ChromiumPage, ChromiumOptions
- import json
- from DrissionPage.common import Actions
- import requests
- from PIL import Image
- token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class JdCrawlerV2:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.register_signal_handler()
- self.ip = None
- self.account_name = None
- self.platform = 2
- self.task_dict = drug_dict or {}
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- self.driver = ChromiumPage(co)
- self.driver.listen.start("api?appid=search-pc-java")
- def register_signal_handler(self):
- def handler(signum, frame):
- print("\n⚠️ 程序退出")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def get_shop(self):
- # url = "https://mall.jd.com/index-10305746.html?from=pc"
- #
- # self.driver.get(url, timeout=10)
- # time.sleep(3)
- # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']")
- # if not hover_ele:
- # logger.error("未找到店铺 Logo,无法执行悬浮操作")
- # return
- # hover_ele.hover()
- # time.sleep(1.5)
- #
- # # 先在主文档中找“营业执照”
- # target_ele = self.driver.ele(
- # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]",
- # timeout=5,
- # )
- #
- #
- # if not target_ele:
- # logger.error("悬浮后仍未找到“营业执照”链接")
- # return
- #
- # try:
- # target_ele.scroll.to_see()
- # except Exception:
- # pass
- #
- # try:
- # target_ele.click()
- # except Exception:
- # # 回退到 JS 点击,避免被遮挡导致常规点击失败
- # target_ele.click(by_js=True)
- # logger.info("已点击“营业执照”链接")
- # time.sleep(10)
- url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
- self.driver.get(url, timeout=10)
- time.sleep(2)
- print("为滑块验证码")
- for i in range(2):
- capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
- capt_ele.get_screenshot('./element_screenshot.png')
- distance = self.verify(2)
- print(f"滑块距离:{distance}")
- slider_element = self.driver.ele(
- "xpath://img[@class='move-img']")
- self.simulate_slider_drag(slider_element, float(distance)-1.5)
- # 滑块验证处理
- time.sleep(5)
- capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
- if not capt_ele:
- break
- time.sleep(5)
- def verify(self, type_num):
- """调用云码平台服务"""
- with open('element_screenshot.png', 'rb') as f:
- b = base64.b64encode(f.read()).decode()
- url = "http://api.jfbym.com/api/YmServer/customApi"
- if type_num == 1:
- # 坐标类型
- data = {
- "token": token,
- "type": "30332",
- "direction": "top",
- "click_num": 3,
- "image": b,
- }
- else:
- # 滑块类型
- data = {
- "token": token,
- "type": "22222",
- "image": b,
- }
- _headers = {
- "Content-Type": "application/json"
- }
- response = requests.request("POST", url, headers=_headers, json=data).json()
- print(response)
- return response["data"]["data"]
- import random
- import math
- def build_track(self,distance):
- """
- 通用滑动轨迹(UI测试用)
- """
- track = []
- current = 0
- mid = distance * 0.6
- t = 0.2
- v = 0
- while current < distance:
- if current < mid:
- a = random.uniform(2.0, 3.5) # 加速
- else:
- a = random.uniform(-3.0, -1.5) # 减速
- v0 = v
- v = max(0.5, v0 + a * t)
- move = v0 * t + 0.5 * a * t * t
- current += move
- # 防止超出
- if current > distance:
- move -= (current - distance)
- x = move
- y = random.uniform(-1, 1)
- track.append((x, y))
- # 轻微回调(模拟人手修正)
- if random.random() < 0.3:
- track.append((-random.uniform(1, 3), 0))
- return track
- def simulate_slider_drag(self, slider_element, target_distance):
- """
- 模拟人类拖动滑块
- """
- actions = Actions(self.driver)
- track = self.build_track(target_distance)
- actions.move_to(slider_element).hold()
- for x, y in track:
- actions.move(x, y)
- time.sleep(random.uniform(0.01, 0.03))
- actions.release()
- def run(self):
- try:
- self.init_browser()
- self.get_shop()
- except Exception as e:
- self.success = False
- finally:
- if self.driver:
- self.driver.quit()
- self.driver = None
- if __name__ == '__main__':
- JdCrawlerV2().run()
|