import base64 import random import signal import socket import sys import time import requests from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" CAPTCHA_TOKEN = "12445" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" SLIDER_OFFSET_FIX = 10 class YaoExCrawl: def __init__(self, product=None): self.product = product self.driver = None self.account_name = "yiyaocheng_1" self.ossuploader = AliyunOSSUploader() self._register_signal_handler() def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,准备关闭浏览器") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") co.set_argument("--no-default-browser-check") co.set_user_agent( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" ) co.headless(False) self.driver = ChromiumPage(co) def _is_logged_in(self): title = self.driver.ele( "xpath=//*[contains(text(),'云南靓桐医药有限公司蒙自益寿大药房二店')]", timeout=5, ) return bool(title) def _call_captcha_api(self, image_bytes): try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) return resp["data"]["data"] except Exception as e: logger.exception("验证码识别失败: %s", e) return None @staticmethod def _generate_human_track(distance): tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _solve_slider_if_present(self): modal = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not modal: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = modal.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: return False slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX) time.sleep(3) return True def login(self): self.driver.get("https://mall.yaoex.com/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) input_name = self.driver.ele("xpath://input[@name='username']", timeout=5) if not input_name: logger.error("未找到用户名输入框") return False input_name.input("18687653982") time.sleep(random.uniform(1.2, 2.0)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input("liu198810060814") time.sleep(random.uniform(1.2, 2.0)) geetest_click = self.driver.ele( "xpath=//div[contains(@class,'geetest_btn_click')]", timeout=3 ) if geetest_click: geetest_click.click() time.sleep(1.5) login_button = self.driver.ele("xpath://input[@id='login-btn']", timeout=5) if not login_button: logger.error("未找到登录按钮") return False login_button.click() self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._solve_slider_if_present(): return False return self._is_logged_in() def get_snapshot(self): detail_url = "https://mall.yaoex.com/v2/product/#/spuCode/2918544090/sellerCode/8353" item_id = "2918544090" self.driver.get(detail_url, timeout=10) self.driver.wait.doc_loaded(timeout=10) time.sleep(3) ele = self.driver.ele("xpath=//div[@class='yaoex-product-detail__content']", timeout=8) if not ele: logger.warning("未找到详情截图区域") return "" jpg_bytes = ele.get_screenshot(as_bytes="jpg") img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(item_id)) logger.info("截图上传完成: %s", img_url) time.sleep(random.uniform(0.5, 1.2)) return img_url def search(self): self.driver.get("https://mall.yaoex.com/", timeout=15) self.driver.wait.doc_loaded(timeout=10) if not self._is_logged_in(): if not self.login(): logger.error("登录失败") return False self.get_snapshot() return True def run(self): try: self.init_browser() self.search() except Exception as e: logger.exception("爬取异常: %s", e) time.sleep(3) finally: self._quit_browser() if __name__ == "__main__": YaoExCrawl().run()