jd_shop_info2.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import random
  2. import signal
  3. import socket
  4. import sys
  5. import time
  6. import base64
  7. from DrissionPage import ChromiumPage, ChromiumOptions
  8. import math
  9. import requests
  10. from DrissionPage.common import Actions
  11. token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
  12. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  13. class JdCrawlerV2:
  14. def __init__(self, drug_dict=None):
  15. self.driver = None
  16. self.register_signal_handler()
  17. self.ip = None
  18. self.account_name = None
  19. self.platform = 2
  20. self.task_dict = drug_dict or {}
  21. if self.task_dict:
  22. self.get_product_data()
  23. self.success = True
  24. self.is_no_prodcut = 0
  25. def get_product_data(self):
  26. self.task_id = self.task_dict["id"]
  27. self.company_id = self.task_dict["company_id"]
  28. self.product = self.task_dict["product_name"]
  29. self.product_desc = self.task_dict.get("product_specs", "")
  30. self.brand = self.task_dict.get("product_brand", "")
  31. self.product_keyword = self.task_dict.get("product_keyword", "")
  32. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  33. @staticmethod
  34. def _get_free_port():
  35. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  36. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  37. s.bind(("127.0.0.1", 0))
  38. return s.getsockname()[1]
  39. def init_browser(self):
  40. co = ChromiumOptions().set_browser_path(chrome_path)
  41. debug_port = self._get_free_port()
  42. co.set_user_data_path(f"./{self.account_name}")
  43. co.set_local_port(debug_port)
  44. co.set_argument(f"--remote-debugging-port={debug_port}")
  45. co.set_argument("--remote-debugging-address=127.0.0.1")
  46. # co.set_argument("--disable-blink-features=AutomationControlled")
  47. co.set_argument("--disable-dev-shm-usage")
  48. co.set_argument("--no-first-run") # 避免首次运行弹窗
  49. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  50. if self.ip:
  51. proxy = self.ip.strip()
  52. if not proxy.startswith(("http://", "https://")):
  53. proxy = f"http://{proxy}"
  54. co.set_argument(f"--proxy-server={proxy}")
  55. self.driver = ChromiumPage(co)
  56. self.driver.listen.start("api?appid=search-pc-java")
  57. def register_signal_handler(self):
  58. def handler(signum, frame):
  59. print("\n⚠️ 程序退出")
  60. if self.driver:
  61. self.driver.quit()
  62. sys.exit(0)
  63. signal.signal(signal.SIGINT, handler)
  64. if hasattr(signal, "SIGTERM"):
  65. signal.signal(signal.SIGTERM, handler)
  66. def get_shop(self):
  67. url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
  68. self.driver.get(url, timeout=10)
  69. time.sleep(5)
  70. print("为滑块验证码")
  71. for i in range(2):
  72. capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
  73. if not capt_ele:
  74. print("未找到验证码主图,可能已通过验证或页面未加载完成")
  75. break
  76. capt_ele.get_screenshot('./element_screenshot.png')
  77. distance = self.verify(2)
  78. try:
  79. distance = float(distance)
  80. except (TypeError, ValueError):
  81. print(f"滑块距离格式异常:{distance}")
  82. continue
  83. print(f"滑块距离:{distance}")
  84. # 获取滑块元素
  85. slider = self.driver.ele(
  86. "xpath://img[@class='move-img']",
  87. timeout=2
  88. )
  89. if not slider:
  90. print("未找到滑块")
  91. return
  92. start_x, start_y = slider.rect.midpoint
  93. start_x += random.uniform(-1, 1)
  94. start_y += random.uniform(-1, 1)
  95. end_x = (
  96. start_x +
  97. distance +
  98. random.uniform(-3, 3)
  99. )
  100. end_y = start_y+ random.uniform(-1, 1)
  101. self.human_slide(
  102. start_x,
  103. start_y,
  104. end_x,
  105. end_y
  106. )
  107. # self.swipe(start_x, start_y, end_x, end_y,
  108. # duration=random.uniform(1.2, 2.0),
  109. # deviation=random.randint(20, 40))
  110. # self.human_slide(start_x, start_y, end_x, end_y)
  111. time.sleep(100)
  112. # self.simulate_slider_drag(slider_element, float(distance)-1.5)
  113. # # 滑块验证处理
  114. #
  115. # time.sleep(5)
  116. # capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
  117. # if not capt_ele:
  118. # break
  119. # time.sleep(5)
  120. def verify(self, type_num):
  121. """调用云码平台服务"""
  122. with open('element_screenshot.png', 'rb') as f:
  123. b = base64.b64encode(f.read()).decode()
  124. url = "http://api.jfbym.com/api/YmServer/customApi"
  125. if type_num == 1:
  126. # 坐标类型
  127. data = {
  128. "token": token,
  129. "type": "30332",
  130. "direction": "top",
  131. "click_num": 3,
  132. "image": b,
  133. }
  134. else:
  135. # 滑块类型
  136. data = {
  137. "token": token,
  138. "type": "22222",
  139. "image": b,
  140. }
  141. _headers = {
  142. "Content-Type": "application/json"
  143. }
  144. response = requests.request("POST", url, headers=_headers, json=data, timeout=30).json()
  145. print(response)
  146. return response.get("data", {}).get("data")
  147. def human_slide(self, start_x, start_y, end_x, end_y):
  148. """
  149. 更真实滑块拖动
  150. """
  151. actions = Actions(self.driver)
  152. points = []
  153. total_steps = random.randint(20, 30)
  154. distance_x = end_x - start_x
  155. distance_y = end_y - start_y
  156. total_distance = math.sqrt(distance_x ** 2 + distance_y ** 2)
  157. # 防止 randint 越界
  158. max_offset = max(2, min(
  159. 5,
  160. int(total_distance * 0.01) + 1
  161. ))
  162. if random.random() < 0.7:
  163. offset_x = random.randint(1, max_offset)
  164. else:
  165. offset_x = -random.randint(1, 3)
  166. stop_x = end_x + offset_x
  167. stop_y = end_y
  168. accel_ratio = random.uniform(
  169. 0.25,
  170. 0.35
  171. )
  172. decel_ratio = random.uniform(
  173. 0.25,
  174. 0.35
  175. )
  176. points.append((start_x, start_y))
  177. for i in range(1, total_steps):
  178. t = i / (total_steps - 1)
  179. if t < accel_ratio:
  180. p = (t / accel_ratio) ** 3 * 0.3
  181. elif t < (1 - decel_ratio):
  182. mid_t = (
  183. t - accel_ratio
  184. ) / (1 - accel_ratio - decel_ratio)
  185. p = 0.3 + mid_t * 0.5
  186. else:
  187. end_t = (
  188. t - (1 - decel_ratio)
  189. ) / decel_ratio
  190. p = 0.8 + (1 - (1 - end_t) ** 3) * 0.2
  191. jitter_x = random.randint(-1, 1)
  192. jitter_y = random.randint(-1, 1)
  193. x = start_x + (stop_x - start_x) * p + jitter_x
  194. y = start_y + (stop_y - start_y) * p + jitter_y
  195. if x < points[-1][0]:
  196. x = points[-1][0]
  197. points.append((x, y))
  198. points[-1] = (stop_x, stop_y)
  199. print("开始拖动")
  200. # 按住滑块
  201. # 先移动到滑块元素
  202. slider = self.driver.ele(
  203. "xpath://img[@class='move-img']"
  204. )
  205. actions = Actions(self.driver)
  206. actions.move_to(slider).hold()
  207. last_x, last_y = points[0]
  208. for x, y in points[1:]:
  209. dx = x - last_x
  210. dy = y - last_y
  211. actions.move(
  212. dx,
  213. dy
  214. )
  215. time.sleep(
  216. random.uniform(
  217. 0.005,
  218. 0.02
  219. )
  220. )
  221. last_x, last_y = x, y
  222. actions.release()
  223. time.sleep(
  224. random.uniform(
  225. 1,
  226. 2
  227. )
  228. )
  229. def run(self):
  230. try:
  231. self.init_browser()
  232. self.get_shop()
  233. except Exception as e:
  234. self.success = False
  235. print(f"运行异常: {e}")
  236. finally:
  237. if self.driver:
  238. self.driver.quit()
  239. self.driver = None
  240. if __name__ == '__main__':
  241. JdCrawlerV2().run()