import random import signal import socket import sys import time import base64 import math import requests from commons.conn_mysql import MySQLPoolOnline from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader from commons.config import YSB_ACCOUNT, YSB_PASSWORD CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" SLIDER_OFFSET_FIX = 10 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class YaoShiBangSnapshot: def __init__(self, product=None): self.product = product self.driver = None self.account_name = "ysbang_1" self.platform = 5 self.db_online = MySQLPoolOnline() self.ossuploader = AliyunOSSUploader() self._register_signal_handler() def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 self.driver = ChromiumPage(co) def _solve_slider_captcha(self): """检测并处理易盾滑块验证码,成功返回 True。""" self.driver.wait.doc_loaded() time.sleep(2) yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not yidun: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = yidun.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: logger.error("验证码识别失败") return False logger.info("滑块距离: %s", distance) slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False try: drag_distance = float(distance) + SLIDER_OFFSET_FIX except (TypeError, ValueError): logger.error("滑块距离非数字: %r", distance) return False if not math.isfinite(drag_distance) or drag_distance <= 0: logger.error("滑块距离无效: %s", drag_distance) return False self._simulate_slider_drag(slider, drag_distance) time.sleep(3) return True def _call_captcha_api(self, image_bytes): """调用云码平台识别滑块距离,失败返回 None。""" try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) if not isinstance(resp, dict): return None data = resp.get("data") if isinstance(data, dict): dist = data.get("data") else: dist = data if dist is None: logger.error("验证码 API 未返回距离字段: %s", resp) return None try: d = float(dist) except (TypeError, ValueError): logger.error("验证码距离无法解析为数字: %r", dist) return None if not math.isfinite(d): logger.error("验证码距离非有限数值: %r", dist) return None return d except Exception as e: logger.exception("验证码 API 调用失败: %s", e) return None @staticmethod def _generate_human_track(distance): try: distance = float(distance) except (TypeError, ValueError): return [] if distance <= 0 or not math.isfinite(distance): return [] tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): if target_distance <= 0: logger.warning("滑块目标距离无效: %s", target_distance) return self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _is_logged_in(self): # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项 title = self.driver.ele( "xpath=//*[contains(text(),'广西好药师大药房连锁有限公司天峨远大药店')]", timeout=5, ) return bool(title) def login(self): logger.info("开始登录药师帮") self.driver.get("https://dian.ysbang.cn/#/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5) if not input_name: logger.error("未找到账号输入框") return False input_name.input(YSB_ACCOUNT) time.sleep(random.uniform(1.5, 2.5)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input(YSB_PASSWORD) time.sleep(random.uniform(1.5, 2.5)) login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5) if not login_btn: logger.error("未找到登录按钮") return False login_btn.click() time.sleep(3) for i in range(3): self._solve_slider_captcha() time.sleep(3) if self._is_logged_in(): logger.info("登录成功") return True logger.error("登录后未检测到目标店铺名,登录可能失败") return False def get_snapshot(self, detail_url, row_id): self.driver.get(detail_url, timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) self._dismiss_popup_before_screenshot() ele = self.driver.ele("xpath=//div[@class='drug-shopping-wrap']", timeout=8) if not ele: ele = self.driver.ele("xpath=//div[@class='drug-info']", timeout=5) if not ele: logger.warning("未找到详情区域元素,跳过截图 row_id=%s", row_id) return "" try: jpg_bytes = ele.get_screenshot(as_bytes="jpg") if not jpg_bytes: logger.warning("截图为空 row_id=%s", row_id) return "" img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(row_id)) except Exception: logger.exception("截图或 OSS 上传失败 row_id=%s url=%s", row_id, detail_url) return "" if not img_url: logger.warning("OSS 未返回有效地址 row_id=%s", row_id) return "" logger.info("截图上传完成 row_id=%s url=%s", row_id, img_url) time.sleep(random.uniform(0.5, 1.5)) return img_url def _dismiss_popup_before_screenshot(self): """截图前关闭或隐藏营销弹窗,避免遮挡。""" close_locs = [ "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]", "xpath=//button[contains(@class,'close')]", "xpath=//span[text()='×']", "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]", ] for loc in close_locs: try: btn = self.driver.ele(loc, timeout=0.5) if btn: btn.click() time.sleep(0.2) except Exception: pass try: # 兜底:隐藏常见高层弹窗和遮罩 self.driver.run_js( """ const sels = [ '[class*="modal"]', '[class*="popup"]', '[class*="dialog"]', '[class*="mask"]', '[class*="overlay"]' ]; for (const s of sels) { document.querySelectorAll(s).forEach(el => { const style = getComputedStyle(el); const z = parseInt(style.zIndex || '0', 10); if (z >= 999 && style.display !== 'none') { el.style.display = 'none'; } }); } document.body.style.overflow = 'auto'; """ ) time.sleep(0.2) except Exception: pass def _save_snapshot_url(self, row_id, img_url): """上传成功后回写库,避免下次任务重复拉取同一批。""" if row_id is None or not img_url: return sql = ( "UPDATE `retrieve_process_lowprice_product` " "SET `snapshot_url` = %s WHERE `id` = %s AND `platform` = %s" ) n = self.db_online.execute(sql, (img_url, row_id, self.platform)) if n <= 0: logger.warning("snapshot_url 回写未影响行数 id=%s platform=%s", row_id, self.platform) def search(self, data_list): self.driver.get("https://dian.ysbang.cn/#/home", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._is_logged_in(): if not self.login(): return False ok, fail = 0, 0 for data in data_list: row_id = data.get("id") link_url = data.get("link_url") if not link_url: logger.warning("缺少 link_url,跳过 id=%s", row_id) fail += 1 continue img_url = self.get_snapshot(link_url, row_id) if img_url: self._save_snapshot_url(row_id, img_url) ok += 1 else: fail += 1 logger.info("快照任务结束 成功=%s 失败=%s 总计=%s", ok, fail, len(data_list)) return ok > 0 def run(self): date_str = time.strftime("%Y-%m-%d") sql = """ SELECT `id`,`link_url` FROM `retrieve_process_lowprice_product` WHERE `platform`=%s AND `snapshot_url` IS NULL AND `scrape_date`=%s LIMIT 100 """ data_list = self.db_online.select_data(sql, (self.platform, date_str)) if not data_list: logger.info("当前不需要更新快照") return try: self.init_browser() self.search(data_list) except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser() if __name__ == "__main__": YaoShiBangSnapshot().run()