jd_shop_info.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import random
  2. import re
  3. import signal
  4. import socket
  5. import sys
  6. import time
  7. import base64
  8. from DrissionPage import ChromiumPage, ChromiumOptions
  9. import json
  10. import requests
  11. from PIL import Image
  12. token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
  13. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  14. class JdCrawlerV2:
  15. def __init__(self, drug_dict=None):
  16. self.driver = None
  17. self.register_signal_handler()
  18. self.ip = None
  19. self.account_name = None
  20. self.platform = 2
  21. self.task_dict = drug_dict or {}
  22. if self.task_dict:
  23. self.get_product_data()
  24. self.success = True
  25. self.is_no_prodcut = 0
  26. def get_product_data(self):
  27. self.task_id = self.task_dict["id"]
  28. self.company_id = self.task_dict["company_id"]
  29. self.product = self.task_dict["product_name"]
  30. self.product_desc = self.task_dict.get("product_specs", "")
  31. self.brand = self.task_dict.get("product_brand", "")
  32. self.product_keyword = self.task_dict.get("product_keyword", "")
  33. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  34. @staticmethod
  35. def _get_free_port():
  36. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  37. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  38. s.bind(("127.0.0.1", 0))
  39. return s.getsockname()[1]
  40. def init_browser(self):
  41. co = ChromiumOptions().set_browser_path(chrome_path)
  42. debug_port = self._get_free_port()
  43. co.set_user_data_path(f"./{self.account_name}")
  44. co.set_local_port(debug_port)
  45. co.set_argument(f"--remote-debugging-port={debug_port}")
  46. co.set_argument("--remote-debugging-address=127.0.0.1")
  47. # co.set_argument("--disable-blink-features=AutomationControlled")
  48. co.set_argument("--disable-dev-shm-usage")
  49. co.set_argument("--no-first-run") # 避免首次运行弹窗
  50. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  51. if self.ip:
  52. proxy = self.ip.strip()
  53. if not proxy.startswith(("http://", "https://")):
  54. proxy = f"http://{proxy}"
  55. co.set_argument(f"--proxy-server={proxy}")
  56. self.driver = ChromiumPage(co)
  57. self.driver.listen.start("api?appid=search-pc-java")
  58. def register_signal_handler(self):
  59. def handler(signum, frame):
  60. print("\n⚠️ 程序退出")
  61. if self.driver:
  62. self.driver.quit()
  63. sys.exit(0)
  64. signal.signal(signal.SIGINT, handler)
  65. if hasattr(signal, "SIGTERM"):
  66. signal.signal(signal.SIGTERM, handler)
  67. def get_shop(self):
  68. # url = "https://mall.jd.com/index-10305746.html?from=pc"
  69. #
  70. # self.driver.get(url, timeout=10)
  71. # time.sleep(3)
  72. # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']")
  73. # if not hover_ele:
  74. # logger.error("未找到店铺 Logo,无法执行悬浮操作")
  75. # return
  76. # hover_ele.hover()
  77. # time.sleep(1.5)
  78. #
  79. # # 先在主文档中找“营业执照”
  80. # target_ele = self.driver.ele(
  81. # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]",
  82. # timeout=5,
  83. # )
  84. #
  85. #
  86. # if not target_ele:
  87. # logger.error("悬浮后仍未找到“营业执照”链接")
  88. # return
  89. #
  90. # try:
  91. # target_ele.scroll.to_see()
  92. # except Exception:
  93. # pass
  94. #
  95. # try:
  96. # target_ele.click()
  97. # except Exception:
  98. # # 回退到 JS 点击,避免被遮挡导致常规点击失败
  99. # target_ele.click(by_js=True)
  100. # logger.info("已点击“营业执照”链接")
  101. # time.sleep(10)
  102. url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
  103. self.driver.get(url, timeout=10)
  104. time.sleep(2)
  105. print("为滑块验证码")
  106. for i in range(3):
  107. capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
  108. capt_ele.get_screenshot('./element_screenshot.png')
  109. distance = self.verify(2)
  110. print(f"滑块距离:{distance}")
  111. slider_element = self.driver.ele(
  112. "xpath://img[@class='move-img']")
  113. self.simulate_slider_drag(slider_element, float(distance)-1.5)
  114. # 滑块验证处理
  115. time.sleep(5)
  116. capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
  117. if not capt_ele:
  118. break
  119. time.sleep(5)
  120. def verify(self, type_num):
  121. """调用云码平台服务"""
  122. with open('element_screenshot.png', 'rb') as f:
  123. b = base64.b64encode(f.read()).decode()
  124. url = "http://api.jfbym.com/api/YmServer/customApi"
  125. if type_num == 1:
  126. # 坐标类型
  127. data = {
  128. "token": token,
  129. "type": "30332",
  130. "direction": "top",
  131. "click_num": 3,
  132. "image": b,
  133. }
  134. else:
  135. # 滑块类型
  136. data = {
  137. "token": token,
  138. "type": "22222",
  139. "image": b,
  140. }
  141. _headers = {
  142. "Content-Type": "application/json"
  143. }
  144. response = requests.request("POST", url, headers=_headers, json=data).json()
  145. print(response)
  146. return response["data"]["data"]
  147. def generate_human_track(self, distance):
  148. """
  149. 生成人类拖动的轨迹
  150. :param distance: 需要拖动的距离(像素)
  151. :return: 轨迹点列表,每个点包含(x偏移, y偏移, 延迟时间)
  152. """
  153. tracks = []
  154. current = 0
  155. mid = distance * 0.7 # 70%处开始减速
  156. t = 0.2
  157. v = 0
  158. move_points = []
  159. # 第一阶段:加速
  160. while current < mid:
  161. a = random.uniform(2, 4)
  162. v0 = v
  163. v = v0 + a * t
  164. move = v0 * t + 0.5 * a * t * t
  165. current += move
  166. move_points.append(move)
  167. # 第二阶段:减速
  168. while current < distance:
  169. a = -random.uniform(0.5, 1.5)
  170. v0 = v
  171. v = v0 + a * t
  172. if v < 0.5: # 防止速度过小
  173. v = 0.5
  174. move = v0 * t + 0.5 * a * t * t
  175. current += move
  176. move_points.append(move)
  177. # 添加随机性并生成最终轨迹
  178. total_points = len(move_points)
  179. for i, move in enumerate(move_points):
  180. x_offset = move
  181. # 添加垂直抖动(模拟手抖)
  182. if i % random.randint(2, 4) == 0:
  183. y_offset = random.randint(-2, 2)
  184. else:
  185. y_offset = 0
  186. # 时间间隔(模拟人类反应)
  187. if i < total_points * 0.3: # 开始阶段较快
  188. duration = random.uniform(0.01, 0.03)
  189. elif i > total_points * 0.7: # 结束阶段较慢
  190. duration = random.uniform(0.03, 0.08)
  191. else: # 中间阶段
  192. duration = random.uniform(0.02, 0.05)
  193. # 随机添加微小停顿
  194. if random.random() < 0.05:
  195. duration += random.uniform(0.05, 0.1)
  196. tracks.append((x_offset, y_offset, duration))
  197. # 最终微调:到达终点后轻微回拉
  198. if random.random() < 0.7:
  199. tracks.append((-random.randint(1, 3), 0, 0.05))
  200. return tracks
  201. def simulate_slider_drag(self, slider_element, target_distance):
  202. """
  203. 模拟人类拖动滑块
  204. """
  205. # 移动到滑块并按住
  206. self.driver.actions.move_to(slider_element).hold()
  207. # 生成轨迹
  208. tracks = self.generate_human_track(target_distance)
  209. # 按轨迹拖动
  210. for track in tracks:
  211. offset_x, offset_y, duration = track
  212. self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
  213. time.sleep(0.8)
  214. # 释放鼠标
  215. self.driver.actions.release()
  216. def run(self):
  217. try:
  218. self.init_browser()
  219. self.get_shop()
  220. except Exception as e:
  221. self.success = False
  222. finally:
  223. if self.driver:
  224. self.driver.quit()
  225. self.driver = None
  226. if __name__ == '__main__':
  227. JdCrawlerV2().run()