login_yaoex.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. import base64
  2. import random
  3. import signal
  4. import socket
  5. import sys
  6. import time
  7. import requests
  8. from DrissionPage import ChromiumPage, ChromiumOptions
  9. from commons.Logger import logger
  10. from oss_upload.oss_upload import AliyunOSSUploader
  11. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  12. CAPTCHA_TOKEN = "12445"
  13. CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
  14. SLIDER_OFFSET_FIX = 10
  15. class YaoExCrawl:
  16. def __init__(self, product=None):
  17. self.product = product
  18. self.driver = None
  19. self.account_name = "yiyaocheng_1"
  20. self.ossuploader = AliyunOSSUploader()
  21. self._register_signal_handler()
  22. def _register_signal_handler(self):
  23. def handler(signum, frame):
  24. logger.info("收到退出信号,准备关闭浏览器")
  25. self._quit_browser()
  26. sys.exit(0)
  27. signal.signal(signal.SIGINT, handler)
  28. if hasattr(signal, "SIGTERM"):
  29. signal.signal(signal.SIGTERM, handler)
  30. def _quit_browser(self):
  31. if self.driver:
  32. try:
  33. self.driver.quit()
  34. except Exception:
  35. pass
  36. self.driver = None
  37. @staticmethod
  38. def _get_free_port():
  39. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  40. s.bind(("127.0.0.1", 0))
  41. return s.getsockname()[1]
  42. def init_browser(self):
  43. co = ChromiumOptions().set_browser_path(chrome_path)
  44. debug_port = self._get_free_port()
  45. co.set_user_data_path(f"./{self.account_name}")
  46. co.set_local_port(debug_port)
  47. co.set_argument(f"--remote-debugging-port={debug_port}")
  48. co.set_argument("--remote-debugging-address=127.0.0.1")
  49. co.set_argument("--disable-dev-shm-usage")
  50. co.set_argument("--no-first-run")
  51. co.set_argument("--no-default-browser-check")
  52. co.set_user_agent(
  53. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  54. "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
  55. )
  56. co.headless(False)
  57. self.driver = ChromiumPage(co)
  58. def _is_logged_in(self):
  59. title = self.driver.ele(
  60. "xpath=//*[contains(text(),'云南靓桐医药有限公司蒙自益寿大药房二店')]",
  61. timeout=5,
  62. )
  63. return bool(title)
  64. def _call_captcha_api(self, image_bytes):
  65. try:
  66. b64 = base64.b64encode(image_bytes).decode()
  67. resp = requests.post(
  68. CAPTCHA_API_URL,
  69. json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
  70. headers={"Content-Type": "application/json"},
  71. timeout=15,
  72. ).json()
  73. logger.info("验证码 API 返回: %s", resp)
  74. return resp["data"]["data"]
  75. except Exception as e:
  76. logger.exception("验证码识别失败: %s", e)
  77. return None
  78. @staticmethod
  79. def _generate_human_track(distance):
  80. tracks = []
  81. current = 0
  82. mid = distance * 0.7
  83. t = 0.2
  84. v = 0
  85. move_points = []
  86. while current < mid:
  87. a = random.uniform(2, 4)
  88. v0 = v
  89. v = v0 + a * t
  90. move = v0 * t + 0.5 * a * t * t
  91. current += move
  92. move_points.append(move)
  93. while current < distance:
  94. a = -random.uniform(0.5, 1.5)
  95. v0 = v
  96. v = v0 + a * t
  97. if v < 0.5:
  98. v = 0.5
  99. move = v0 * t + 0.5 * a * t * t
  100. current += move
  101. move_points.append(move)
  102. total_points = len(move_points)
  103. for i, move in enumerate(move_points):
  104. y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
  105. if i < total_points * 0.3:
  106. duration = random.uniform(0.01, 0.03)
  107. elif i > total_points * 0.7:
  108. duration = random.uniform(0.03, 0.08)
  109. else:
  110. duration = random.uniform(0.02, 0.05)
  111. if random.random() < 0.05:
  112. duration += random.uniform(0.05, 0.1)
  113. tracks.append((move, y_offset, duration))
  114. if random.random() < 0.7:
  115. tracks.append((-random.randint(1, 3), 0, 0.05))
  116. return tracks
  117. def _simulate_slider_drag(self, slider_element, target_distance):
  118. self.driver.actions.move_to(slider_element).hold()
  119. for offset_x, offset_y, duration in self._generate_human_track(target_distance):
  120. self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
  121. self.driver.actions.release()
  122. def _solve_slider_if_present(self):
  123. modal = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
  124. if not modal:
  125. return True
  126. logger.info("检测到滑块验证码,开始处理")
  127. jpg_bytes = modal.get_screenshot(as_bytes="jpg")
  128. distance = self._call_captcha_api(jpg_bytes)
  129. if distance is None:
  130. return False
  131. slider = self.driver.ele(
  132. "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
  133. )
  134. if not slider:
  135. logger.error("未找到滑块元素")
  136. return False
  137. self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX)
  138. time.sleep(3)
  139. return True
  140. def login(self):
  141. self.driver.get("https://mall.yaoex.com/login", timeout=15)
  142. self.driver.wait.doc_loaded(timeout=10)
  143. input_name = self.driver.ele("xpath://input[@name='username']", timeout=5)
  144. if not input_name:
  145. logger.error("未找到用户名输入框")
  146. return False
  147. input_name.input("18687653982")
  148. time.sleep(random.uniform(1.2, 2.0))
  149. input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
  150. if not input_pass:
  151. logger.error("未找到密码输入框")
  152. return False
  153. input_pass.input("liu198810060814")
  154. time.sleep(random.uniform(1.2, 2.0))
  155. geetest_click = self.driver.ele(
  156. "xpath=//div[contains(@class,'geetest_btn_click')]", timeout=3
  157. )
  158. if geetest_click:
  159. geetest_click.click()
  160. time.sleep(1.5)
  161. login_button = self.driver.ele("xpath://input[@id='login-btn']", timeout=5)
  162. if not login_button:
  163. logger.error("未找到登录按钮")
  164. return False
  165. login_button.click()
  166. self.driver.wait.doc_loaded(timeout=10)
  167. time.sleep(2)
  168. if not self._solve_slider_if_present():
  169. return False
  170. return self._is_logged_in()
  171. def get_snapshot(self):
  172. detail_url = "https://mall.yaoex.com/v2/product/#/spuCode/2918544090/sellerCode/8353"
  173. item_id = "2918544090"
  174. self.driver.get(detail_url, timeout=10)
  175. self.driver.wait.doc_loaded(timeout=10)
  176. time.sleep(3)
  177. ele = self.driver.ele("xpath=//div[@class='yaoex-product-detail__content']", timeout=8)
  178. if not ele:
  179. logger.warning("未找到详情截图区域")
  180. return ""
  181. jpg_bytes = ele.get_screenshot(as_bytes="jpg")
  182. img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id))
  183. logger.info("截图上传完成: %s", img_url)
  184. time.sleep(random.uniform(0.5, 1.2))
  185. return img_url
  186. def search(self):
  187. self.driver.get("https://mall.yaoex.com/", timeout=15)
  188. self.driver.wait.doc_loaded(timeout=10)
  189. if not self._is_logged_in():
  190. if not self.login():
  191. logger.error("登录失败")
  192. return False
  193. self.get_snapshot()
  194. return True
  195. def run(self):
  196. try:
  197. self.init_browser()
  198. self.search()
  199. except Exception as e:
  200. logger.exception("爬取异常: %s", e)
  201. time.sleep(3)
  202. finally:
  203. self._quit_browser()
  204. if __name__ == "__main__":
  205. YaoExCrawl().run()