login_yaoshibang.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import random
  2. import signal
  3. import socket
  4. import sys
  5. import time
  6. import base64
  7. import requests
  8. from DrissionPage import ChromiumPage, ChromiumOptions
  9. from commons.Logger import logger
  10. from oss_upload.oss_upload import AliyunOSSUploader
  11. CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
  12. CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
  13. SLIDER_OFFSET_FIX = 10
  14. chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
  15. class YaoShiBangCrawl:
  16. def __init__(self, product=None):
  17. self.product = product
  18. self.driver = None
  19. self.account_name = "ysbang_1"
  20. self.ossuploader = AliyunOSSUploader()
  21. self._register_signal_handler()
  22. def _register_signal_handler(self):
  23. def handler(signum, frame):
  24. logger.info("收到退出信号,正在关闭浏览器...")
  25. self._quit_browser()
  26. sys.exit(0)
  27. signal.signal(signal.SIGINT, handler)
  28. if hasattr(signal, "SIGTERM"):
  29. signal.signal(signal.SIGTERM, handler)
  30. def _quit_browser(self):
  31. if self.driver:
  32. try:
  33. self.driver.quit()
  34. except Exception:
  35. pass
  36. self.driver = None
  37. @staticmethod
  38. def _get_free_port():
  39. """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
  40. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  41. s.bind(("127.0.0.1", 0))
  42. return s.getsockname()[1]
  43. def init_browser(self):
  44. co = ChromiumOptions().set_browser_path(chrome_path)
  45. debug_port = self._get_free_port()
  46. co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
  47. co.set_local_port(debug_port)
  48. co.set_argument(f"--remote-debugging-port={debug_port}")
  49. co.set_argument("--remote-debugging-address=127.0.0.1")
  50. # co.set_argument("--disable-blink-features=AutomationControlled")
  51. co.set_argument("--disable-dev-shm-usage")
  52. co.set_argument("--no-first-run") # 避免首次运行弹窗
  53. co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
  54. self.driver = ChromiumPage(co)
  55. def _solve_slider_captcha(self):
  56. """检测并处理易盾滑块验证码,成功返回 True。"""
  57. self.driver.wait.doc_loaded()
  58. time.sleep(2)
  59. yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
  60. if not yidun:
  61. return True
  62. logger.info("检测到滑块验证码,开始处理")
  63. jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
  64. distance = self._call_captcha_api(jpg_bytes)
  65. if distance is None:
  66. logger.error("验证码识别失败")
  67. return False
  68. logger.info("滑块距离: %s", distance)
  69. slider = self.driver.ele(
  70. "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
  71. )
  72. if not slider:
  73. logger.error("未找到滑块元素")
  74. return False
  75. self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX)
  76. time.sleep(3)
  77. return True
  78. def _call_captcha_api(self, image_bytes):
  79. """调用云码平台识别滑块距离,失败返回 None。"""
  80. try:
  81. b64 = base64.b64encode(image_bytes).decode()
  82. resp = requests.post(
  83. CAPTCHA_API_URL,
  84. json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
  85. headers={"Content-Type": "application/json"},
  86. timeout=15,
  87. ).json()
  88. logger.info("验证码 API 返回: %s", resp)
  89. return resp["data"]["data"]
  90. except Exception as e:
  91. logger.exception("验证码 API 调用失败: %s", e)
  92. return None
  93. @staticmethod
  94. def _generate_human_track(distance):
  95. tracks = []
  96. current = 0
  97. mid = distance * 0.7
  98. t = 0.2
  99. v = 0
  100. move_points = []
  101. while current < mid:
  102. a = random.uniform(2, 4)
  103. v0 = v
  104. v = v0 + a * t
  105. move = v0 * t + 0.5 * a * t * t
  106. current += move
  107. move_points.append(move)
  108. while current < distance:
  109. a = -random.uniform(0.5, 1.5)
  110. v0 = v
  111. v = v0 + a * t
  112. if v < 0.5:
  113. v = 0.5
  114. move = v0 * t + 0.5 * a * t * t
  115. current += move
  116. move_points.append(move)
  117. total_points = len(move_points)
  118. for i, move in enumerate(move_points):
  119. y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
  120. if i < total_points * 0.3:
  121. duration = random.uniform(0.01, 0.03)
  122. elif i > total_points * 0.7:
  123. duration = random.uniform(0.03, 0.08)
  124. else:
  125. duration = random.uniform(0.02, 0.05)
  126. if random.random() < 0.05:
  127. duration += random.uniform(0.05, 0.1)
  128. tracks.append((move, y_offset, duration))
  129. if random.random() < 0.7:
  130. tracks.append((-random.randint(1, 3), 0, 0.05))
  131. return tracks
  132. def _simulate_slider_drag(self, slider_element, target_distance):
  133. self.driver.actions.move_to(slider_element).hold()
  134. for offset_x, offset_y, duration in self._generate_human_track(target_distance):
  135. self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
  136. self.driver.actions.release()
  137. def _is_logged_in(self):
  138. title = self.driver.ele(
  139. "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]",
  140. timeout=5,
  141. )
  142. return bool(title)
  143. def login(self):
  144. logger.info("开始登录药师帮")
  145. self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
  146. self.driver.wait.doc_loaded(timeout=10)
  147. time.sleep(2)
  148. input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
  149. if not input_name:
  150. logger.error("未找到账号输入框")
  151. return False
  152. input_name.input("13097980383")
  153. time.sleep(random.uniform(1.5, 2.5))
  154. input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
  155. if not input_pass:
  156. logger.error("未找到密码输入框")
  157. return False
  158. input_pass.input("a123456")
  159. time.sleep(random.uniform(1.5, 2.5))
  160. login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
  161. if not login_btn:
  162. logger.error("未找到登录按钮")
  163. return False
  164. login_btn.click()
  165. time.sleep(3)
  166. for i in range(3):
  167. self._solve_slider_captcha()
  168. time.sleep(3)
  169. if self._is_logged_in():
  170. logger.info("登录成功")
  171. cookies_list = self.driver.cookies()
  172. cookies_dict = {c['name']: c['value'] for c in cookies_list}
  173. print(cookies_dict)
  174. return True
  175. logger.error("登录后未检测到目标店铺名,登录可能失败")
  176. return False
  177. def get_snapshot(self, detail_url, item_id):
  178. self.driver.get(detail_url, timeout=15)
  179. self.driver.wait.doc_loaded(timeout=10)
  180. time.sleep(2)
  181. self._dismiss_popup_before_screenshot()
  182. ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8)
  183. if not ele:
  184. ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5)
  185. if not ele:
  186. logger.warning("未找到详情区域元素,跳过截图 item_id=%s", item_id)
  187. return ""
  188. jpg_bytes = ele.get_screenshot(as_bytes="jpg")
  189. img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id))
  190. logger.info("截图上传完成: %s", img_url)
  191. time.sleep(random.uniform(0.5, 1.5))
  192. return img_url
  193. def _dismiss_popup_before_screenshot(self):
  194. """截图前关闭或隐藏营销弹窗,避免遮挡。"""
  195. close_locs = [
  196. "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
  197. "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
  198. "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
  199. "xpath=//button[contains(@class,'close')]",
  200. "xpath=//span[text()='×']",
  201. "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
  202. ]
  203. for loc in close_locs:
  204. try:
  205. btn = self.driver.ele(loc, timeout=0.5)
  206. if btn:
  207. btn.click()
  208. time.sleep(0.2)
  209. except Exception:
  210. pass
  211. try:
  212. # 兜底:隐藏常见高层弹窗和遮罩
  213. self.driver.run_js(
  214. """
  215. const sels = [
  216. '[class*="modal"]',
  217. '[class*="popup"]',
  218. '[class*="dialog"]',
  219. '[class*="mask"]',
  220. '[class*="overlay"]'
  221. ];
  222. for (const s of sels) {
  223. document.querySelectorAll(s).forEach(el => {
  224. const style = getComputedStyle(el);
  225. const z = parseInt(style.zIndex || '0', 10);
  226. if (z >= 999 && style.display !== 'none') {
  227. el.style.display = 'none';
  228. }
  229. });
  230. }
  231. document.body.style.overflow = 'auto';
  232. """
  233. )
  234. time.sleep(0.2)
  235. except Exception:
  236. pass
  237. def search(self):
  238. self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
  239. self.driver.wait.doc_loaded(timeout=10)
  240. time.sleep(2)
  241. if not self._is_logged_in():
  242. if not self.login():
  243. return False
  244. detail_url = "https://dian.ysbang.cn/#/drugInfo?wholesaleid=376456110&isAssemble=true&trafficType=15"
  245. self.get_snapshot(detail_url, "376456110")
  246. return True
  247. def run(self):
  248. try:
  249. self.init_browser()
  250. self.search()
  251. except Exception as e:
  252. logger.exception("运行异常: %s", e)
  253. finally:
  254. self._quit_browser()
  255. if __name__ == "__main__":
  256. YaoShiBangCrawl().run()