jd_shop_info3.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import random
  2. import re
  3. import signal
  4. import socket
  5. import sys
  6. import time
  7. import base64
  8. from DrissionPage import ChromiumPage, ChromiumOptions
  9. import json
  10. from DrissionPage.common import Actions
  11. import requests
  12. from PIL import Image
  13. token = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
  14. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  15. class JdCrawlerV2:
  16. def __init__(self, drug_dict=None):
  17. self.driver = None
  18. self.register_signal_handler()
  19. self.ip = None
  20. self.account_name = None
  21. self.platform = 2
  22. self.task_dict = drug_dict or {}
  23. if self.task_dict:
  24. self.get_product_data()
  25. self.success = True
  26. self.is_no_prodcut = 0
  27. def get_product_data(self):
  28. self.task_id = self.task_dict["id"]
  29. self.company_id = self.task_dict["company_id"]
  30. self.product = self.task_dict["product_name"]
  31. self.product_desc = self.task_dict.get("product_specs", "")
  32. self.brand = self.task_dict.get("product_brand", "")
  33. self.product_keyword = self.task_dict.get("product_keyword", "")
  34. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  35. @staticmethod
  36. def _get_free_port():
  37. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  38. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  39. s.bind(("127.0.0.1", 0))
  40. return s.getsockname()[1]
  41. def init_browser(self):
  42. co = ChromiumOptions().set_browser_path(chrome_path)
  43. debug_port = self._get_free_port()
  44. co.set_user_data_path(f"./{self.account_name}")
  45. co.set_local_port(debug_port)
  46. co.set_argument(f"--remote-debugging-port={debug_port}")
  47. co.set_argument("--remote-debugging-address=127.0.0.1")
  48. # co.set_argument("--disable-blink-features=AutomationControlled")
  49. co.set_argument("--disable-dev-shm-usage")
  50. co.set_argument("--no-first-run") # 避免首次运行弹窗
  51. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  52. if self.ip:
  53. proxy = self.ip.strip()
  54. if not proxy.startswith(("http://", "https://")):
  55. proxy = f"http://{proxy}"
  56. co.set_argument(f"--proxy-server={proxy}")
  57. self.driver = ChromiumPage(co)
  58. self.driver.listen.start("api?appid=search-pc-java")
  59. def register_signal_handler(self):
  60. def handler(signum, frame):
  61. print("\n⚠️ 程序退出")
  62. if self.driver:
  63. self.driver.quit()
  64. sys.exit(0)
  65. signal.signal(signal.SIGINT, handler)
  66. if hasattr(signal, "SIGTERM"):
  67. signal.signal(signal.SIGTERM, handler)
  68. def get_shop(self):
  69. # url = "https://mall.jd.com/index-10305746.html?from=pc"
  70. #
  71. # self.driver.get(url, timeout=10)
  72. # time.sleep(3)
  73. # hover_ele = self.driver.ele("xpath=//div[@class='j-shopHeader']//div[@class='jLogo']")
  74. # if not hover_ele:
  75. # logger.error("未找到店铺 Logo,无法执行悬浮操作")
  76. # return
  77. # hover_ele.hover()
  78. # time.sleep(1.5)
  79. #
  80. # # 先在主文档中找“营业执照”
  81. # target_ele = self.driver.ele(
  82. # "xpath=//a[contains(@title,'营业执照') or contains(normalize-space(text()),'营业执照')]",
  83. # timeout=5,
  84. # )
  85. #
  86. #
  87. # if not target_ele:
  88. # logger.error("悬浮后仍未找到“营业执照”链接")
  89. # return
  90. #
  91. # try:
  92. # target_ele.scroll.to_see()
  93. # except Exception:
  94. # pass
  95. #
  96. # try:
  97. # target_ele.click()
  98. # except Exception:
  99. # # 回退到 JS 点击,避免被遮挡导致常规点击失败
  100. # target_ele.click(by_js=True)
  101. # logger.info("已点击“营业执照”链接")
  102. # time.sleep(10)
  103. url = "https://mall.jd.com/showLicence-4fc010bb739186871c97fe8159fdb58e68030b5168522fc2aa8be6dedfec0d63.html"
  104. self.driver.get(url, timeout=10)
  105. time.sleep(2)
  106. print("为滑块验证码")
  107. for i in range(2):
  108. capt_ele = self.driver.ele('xpath://img[@id="main_img"]', timeout=2)
  109. capt_ele.get_screenshot('./element_screenshot.png')
  110. distance = self.verify(2)
  111. print(f"滑块距离:{distance}")
  112. slider_element = self.driver.ele(
  113. "xpath://img[@class='move-img']")
  114. self.simulate_slider_drag(slider_element, float(distance)-1.5)
  115. # 滑块验证处理
  116. time.sleep(5)
  117. capt_ele = self.driver.ele('xpath://*[@id="captcha_modal"]', timeout=2)
  118. if not capt_ele:
  119. break
  120. time.sleep(5)
  121. def verify(self, type_num):
  122. """调用云码平台服务"""
  123. with open('element_screenshot.png', 'rb') as f:
  124. b = base64.b64encode(f.read()).decode()
  125. url = "http://api.jfbym.com/api/YmServer/customApi"
  126. if type_num == 1:
  127. # 坐标类型
  128. data = {
  129. "token": token,
  130. "type": "30332",
  131. "direction": "top",
  132. "click_num": 3,
  133. "image": b,
  134. }
  135. else:
  136. # 滑块类型
  137. data = {
  138. "token": token,
  139. "type": "22222",
  140. "image": b,
  141. }
  142. _headers = {
  143. "Content-Type": "application/json"
  144. }
  145. response = requests.request("POST", url, headers=_headers, json=data).json()
  146. print(response)
  147. return response["data"]["data"]
  148. import random
  149. import math
  150. def build_track(self,distance):
  151. """
  152. 通用滑动轨迹(UI测试用)
  153. """
  154. track = []
  155. current = 0
  156. mid = distance * 0.6
  157. t = 0.2
  158. v = 0
  159. while current < distance:
  160. if current < mid:
  161. a = random.uniform(2.0, 3.5) # 加速
  162. else:
  163. a = random.uniform(-3.0, -1.5) # 减速
  164. v0 = v
  165. v = max(0.5, v0 + a * t)
  166. move = v0 * t + 0.5 * a * t * t
  167. current += move
  168. # 防止超出
  169. if current > distance:
  170. move -= (current - distance)
  171. x = move
  172. y = random.uniform(-1, 1)
  173. track.append((x, y))
  174. # 轻微回调(模拟人手修正)
  175. if random.random() < 0.3:
  176. track.append((-random.uniform(1, 3), 0))
  177. return track
  178. def simulate_slider_drag(self, slider_element, target_distance):
  179. """
  180. 模拟人类拖动滑块
  181. """
  182. actions = Actions(self.driver)
  183. track = self.build_track(target_distance)
  184. actions.move_to(slider_element).hold()
  185. for x, y in track:
  186. actions.move(x, y)
  187. time.sleep(random.uniform(0.01, 0.03))
  188. actions.release()
  189. def run(self):
  190. try:
  191. self.init_browser()
  192. self.get_shop()
  193. except Exception as e:
  194. self.success = False
  195. finally:
  196. if self.driver:
  197. self.driver.quit()
  198. self.driver = None
  199. if __name__ == '__main__':
  200. JdCrawlerV2().run()