import random import signal import socket import sys import time import base64 import requests from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" SLIDER_OFFSET_FIX = 10 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class YaoShiBangCrawl: def __init__(self, product=None): self.product = product self.driver = None self.account_name = "ysbang_1" self.ossuploader = AliyunOSSUploader() self._register_signal_handler() def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 self.driver = ChromiumPage(co) def _solve_slider_captcha(self): """检测并处理易盾滑块验证码,成功返回 True。""" self.driver.wait.doc_loaded() time.sleep(2) yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not yidun: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = yidun.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: logger.error("验证码识别失败") return False logger.info("滑块距离: %s", distance) slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX) time.sleep(3) return True def _call_captcha_api(self, image_bytes): """调用云码平台识别滑块距离,失败返回 None。""" try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) return resp["data"]["data"] except Exception as e: logger.exception("验证码 API 调用失败: %s", e) return None @staticmethod def _generate_human_track(distance): tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _is_logged_in(self): title = self.driver.ele( "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]", timeout=5, ) return bool(title) def login(self): logger.info("开始登录药师帮") self.driver.get("https://dian.ysbang.cn/#/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5) if not input_name: logger.error("未找到账号输入框") return False input_name.input("13097980383") time.sleep(random.uniform(1.5, 2.5)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input("a123456") time.sleep(random.uniform(1.5, 2.5)) login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5) if not login_btn: logger.error("未找到登录按钮") return False login_btn.click() time.sleep(3) for i in range(3): self._solve_slider_captcha() time.sleep(3) if self._is_logged_in(): logger.info("登录成功") cookies_list = self.driver.cookies() cookies_dict = {c['name']: c['value'] for c in cookies_list} print(cookies_dict) return True logger.error("登录后未检测到目标店铺名,登录可能失败") return False def get_snapshot(self, detail_url, item_id): self.driver.get(detail_url, timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) self._dismiss_popup_before_screenshot() ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8) if not ele: ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5) if not ele: logger.warning("未找到详情区域元素,跳过截图 item_id=%s", item_id) return "" jpg_bytes = ele.get_screenshot(as_bytes="jpg") img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id)) logger.info("截图上传完成: %s", img_url) time.sleep(random.uniform(0.5, 1.5)) return img_url def _dismiss_popup_before_screenshot(self): """截图前关闭或隐藏营销弹窗,避免遮挡。""" close_locs = [ "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]", "xpath=//button[contains(@class,'close')]", "xpath=//span[text()='×']", "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]", ] for loc in close_locs: try: btn = self.driver.ele(loc, timeout=0.5) if btn: btn.click() time.sleep(0.2) except Exception: pass try: # 兜底:隐藏常见高层弹窗和遮罩 self.driver.run_js( """ const sels = [ '[class*="modal"]', '[class*="popup"]', '[class*="dialog"]', '[class*="mask"]', '[class*="overlay"]' ]; for (const s of sels) { document.querySelectorAll(s).forEach(el => { const style = getComputedStyle(el); const z = parseInt(style.zIndex || '0', 10); if (z >= 999 && style.display !== 'none') { el.style.display = 'none'; } }); } document.body.style.overflow = 'auto'; """ ) time.sleep(0.2) except Exception: pass def search(self): self.driver.get("https://dian.ysbang.cn/#/home", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._is_logged_in(): if not self.login(): return False detail_url = "https://dian.ysbang.cn/#/drugInfo?wholesaleid=376456110&isAssemble=true&trafficType=15" self.get_snapshot(detail_url, "376456110") return True def run(self): try: self.init_browser() self.search() except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser() if __name__ == "__main__": YaoShiBangCrawl().run()