import base64 import hashlib import json import math import random import re import signal import socket import sys import time import zlib from pathlib import Path import requests import secrets import string from Crypto.Cipher import AES from commons.conn_mysql import MySQLPoolOnline from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader from commons.config import YSB_ACCOUNT from pipelines.drug_pipelines import DrugPipeline from datetime import datetime, timedelta from area_info.city_name_to_id import get_city CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" SLIDER_OFFSET_FIX = 10 DETAIL_GET_TIMEOUT = 15 DETAIL_URL_WAIT = 10 DETAIL_DOM_WAIT = 8 DETAIL_NAV_RETRIES = 3 DETAIL_APPROVAL_XPATH = ( 'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]' ) chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" PROJECT_ROOT = Path(__file__).resolve().parents[2] YSB_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoshibang" BROWSER_PROFILE_SUBDIR = "chrome_profile" headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://dian.ysbang.cn", "Referer": "https://dian.ysbang.cn/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36" ), "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } def pkcs7_unpad(data): if not data: raise ValueError("Empty data for PKCS7 unpad") pad_len = data[-1] if pad_len < 1 or pad_len > 16: raise ValueError("Invalid PKCS7 padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("Invalid PKCS7 padding bytes") return data[:-pad_len] def derive_ysb_key(): base = "BhCLxFfFhd12K4qRGPfy" md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest() return md5_hex[:16].upper().encode("utf-8") def decrypt_ysb_payload(cipher_text_b64): """解密药师帮列表接口 data.o 字段,返回 JSON 对象。""" key = derive_ysb_key() cipher_bytes = base64.b64decode(cipher_text_b64) cipher = AES.new(key, AES.MODE_ECB) decrypted = cipher.decrypt(cipher_bytes) unpadded = pkcs7_unpad(decrypted) json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16) return json.loads(json_bytes.decode("utf-8")) class YaoShiBangSnapshot: def __init__(self, drug_dict=None): self.driver = None self.db = MySQLPoolOnline() self.ip = None self.login_username = None self.login_password = None self.platform = 5 self.pipeline = DrugPipeline("ysb") self.task_dict = drug_dict or {} self.ossuploader = AliyunOSSUploader() self.start_page = 1 self.end_page = 1 self.account_name = YSB_ACCOUNT.get("username", "ysb_default") self._register_signal_handler() if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 self.is_product_count = 0 self.token = "" self._state_value = "" self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M") def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) self.start_page = self._parse_page(self.task_dict.get("start_page"), 1) self.end_page = max( self.start_page, self._parse_page(self.task_dict.get("end_page"), self.start_page), ) @staticmethod def _parse_page(value, default=1): try: page = int(value) return page if page >= 1 else default except (TypeError, ValueError): return default def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def _resolve_browser_profile_dir(self): """ 浏览器数据固定落在 <项目根>/spiders/yaoshibang/ 下。 优先 chrome_profile/<账号>;若旧版目录已有登录态则继续沿用。 """ preferred = YSB_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name legacy_flat = YSB_SPIDER_DIR / self.account_name legacy_nested = YSB_SPIDER_DIR / "spiders" / "yaoshibang" / self.account_name for candidate in (preferred, legacy_flat, legacy_nested): if (candidate / "Default").is_dir() or (candidate / "Local State").is_file(): logger.info("使用已有浏览器配置目录: %s", candidate) return candidate preferred.parent.mkdir(parents=True, exist_ok=True) logger.info("新建浏览器配置目录: %s", preferred) return preferred def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() profile_dir = self._resolve_browser_profile_dir() profile_dir.mkdir(parents=True, exist_ok=True) co.set_user_data_path(str(profile_dir)) logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve()) co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--start-maximized") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 self.driver = ChromiumPage(co) def _solve_slider_captcha(self): """检测并处理易盾滑块验证码,成功返回 True。""" self.driver.wait.doc_loaded() time.sleep(2) yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not yidun: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = yidun.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: logger.error("验证码识别失败") return False logger.info("滑块距离: %s", distance) slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False try: drag_distance = float(distance) + SLIDER_OFFSET_FIX except (TypeError, ValueError): logger.error("滑块距离非数字: %r", distance) return False if not math.isfinite(drag_distance) or drag_distance <= 0: logger.error("滑块距离无效: %s", drag_distance) return False self._simulate_slider_drag(slider, drag_distance - 5) time.sleep(3) return True def _call_captcha_api(self, image_bytes): """调用云码平台识别滑块距离,失败返回 None。""" try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) if not isinstance(resp, dict): return None data = resp.get("data") if isinstance(data, dict): dist = data.get("data") else: dist = data if dist is None: logger.error("验证码 API 未返回距离字段: %s", resp) return None try: d = float(dist) except (TypeError, ValueError): logger.error("验证码距离无法解析为数字: %r", dist) return None if not math.isfinite(d): logger.error("验证码距离非有限数值: %r", dist) return None return d except Exception as e: logger.exception("验证码 API 调用失败: %s", e) return None @staticmethod def _generate_human_track(distance): try: distance = float(distance) except (TypeError, ValueError): return [] if distance <= 0 or not math.isfinite(distance): return [] tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): if target_distance <= 0: logger.warning("滑块目标距离无效: %s", target_distance) return self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _is_logged_in(self): # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项 title = self.driver.ele( "xpath=//span[@class='logout']", timeout=5, ) return bool(title) def _current_url(self): try: return self.driver.url or "" except Exception: return "" def _goto_detail_page(self, item_id, detail_url): """get 后 refresh 一次,让 SPA 按当前 URL 重新渲染详情。""" for attempt in range(1, DETAIL_NAV_RETRIES + 1): try: self.driver.get(detail_url, timeout=5) time.sleep(1.5) eles = self.driver.eles("xpath=//div[@class='y-dialog']//button[contains(text(),'确认')]", timeout=3) if len(eles) == 2: eles[1].click() time.sleep(1) self.driver.refresh() time.sleep(1.5) if str(item_id) in self.driver.url: return True except Exception as e: logger.warning( "跳转详情异常 item_id=%s attempt=%s: %s", item_id, attempt, e, ) time.sleep(random.uniform(0.8, 1.5)) return False def login(self): logger.info("开始登录药师帮") self.driver.get("https://dian.ysbang.cn/#/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5) if not input_name: logger.error("未找到账号输入框") return False input_name.input(YSB_ACCOUNT["username"]) time.sleep(random.uniform(1.5, 2.5)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input(YSB_ACCOUNT["password"]) time.sleep(random.uniform(1.5, 2.5)) login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5) if not login_btn: logger.error("未找到登录按钮") return False login_btn.click() time.sleep(3) for i in range(3): self._solve_slider_captcha() time.sleep(3) if self._is_logged_in(): logger.info("登录成功") return True logger.error("登录后未检测到目标店铺名,登录可能失败") return False def _take_snapshot(self, upload_key): """在当前页面截图并上传,不再重复跳转。""" time.sleep(1) self._dismiss_popup_before_screenshot() try: jpg_bytes = self.driver.get_screenshot(as_bytes="jpg") if not jpg_bytes: logger.warning("截图为空 upload_key=%s", upload_key) return "" img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key)) except Exception: logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key) return "" if not img_url: logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key) return "" logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url) time.sleep(random.uniform(1, 2)) return img_url def gen_pair(self, ex1_len=9, o_raw_len=16): alphabet = string.ascii_lowercase + string.digits ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len)) o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii") return {"ex1": ex1, "o": o} def build_base_payload(self, keyword, page, first_search): date_str = time.strftime("%Y-%m-%d %H:%M:%S") return { "platform": "pc", "version": "6.0.0", "ua": "Chrome146", 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str), "trafficType": 1, "ex1": "", "o": "", "lastClick": -1, "page": page, "pagesize": "60", "classify_id": "", "searchkey": keyword, "onlyTcm": 0, "operationtype": 1, "qualifiedLoanee": 0, "drugId": -1, "tagId": "", "showRecentlyPurchasedFlag": True, "onlySimpleLoan": 0, "sn": "", "buttons": [], "buttonList": [], "synonymId": 0, "activityTypes": [], "provider_filter": "", "factoryNames": "", "tcmGradeNames": [], "tcmExeStandardIds": [], "specs": "", "deliverFloor": 0, "purchaseLimitFloor": 0, "nextRequestKey": "", "adConfigId": 0, "stateValue": self._state_value, "firstSearch": first_search, "token": self.token, } @staticmethod def _extract_state_value(json_data, data_block): for src in (json_data, data_block): if not isinstance(src, dict): continue val = src.get("stateValue") or src.get("state_value") if val: return str(val) return None def _dismiss_popup_before_screenshot(self): """截图前关闭或隐藏营销弹窗,避免遮挡。""" close_locs = [ "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]", "xpath=//button[contains(@class,'close')]", "xpath=//span[text()='×']", "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]", ] for loc in close_locs: try: btn = self.driver.ele(loc, timeout=0.5) if btn: btn.click() time.sleep(0.2) except Exception: pass try: # 兜底:隐藏常见高层弹窗和遮罩 self.driver.run_js( """ const sels = [ '[class*="modal"]', '[class*="popup"]', '[class*="dialog"]', '[class*="mask"]', '[class*="overlay"]' ]; for (const s of sels) { document.querySelectorAll(s).forEach(el => { const style = getComputedStyle(el); const z = parseInt(style.zIndex || '0', 10); if (z >= 999 && style.display !== 'none') { el.style.display = 'none'; } }); } document.body.style.overflow = 'auto'; """ ) time.sleep(0.2) except Exception: pass def to_product(self, item): now = time.strftime("%Y-%m-%d %H:%M:%S") item_id = item.get("wholesaleid", "") provider_id = item.get("providerId", "") city_str = item.get("warehouseCity", "") city_id = province_id = city = province = "" price = item.get("disPrice", "") if not price: price = item.get("minprice", "") if not price: price = item.get("price", "") shop_name = item.get("provider_name", "") if not shop_name: shop_name = item.get("abbreviation", "") product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": item.get("drugname", ""), "spec": item.get("specification", ""), "one_price": '', "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1", "shop_name": shop_name, "anonymous_store_name": "", "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4", "city_name": city, "city_id": city_id, "province_name": province, "province_id": province_id, "area_info": "", "factory_name": item.get("manufacturer", ""), "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": "", "stock_count": item.get("stockAvailable", ""), "snapshot_url": "", "approval_num": "", "produced_time": item.get("prodDate", ""), "deadline": item.get("valid_date", ""), "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time}), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 0 } return product def parse_detail(self, product): appvolnum_ele = self.driver.ele( 'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]/following-sibling::span[1]') appvolnum_value = appvolnum_ele.text if appvolnum_ele else "" price = "" discount_ele = self.driver.ele( 'xpath://div[@class="sale-info-wrap"]//div[@class="tooltip-content"]', timeout=2, ) discount_value = discount_ele.text if discount_ele else "" if not price and discount_value: price_re = re.search(r"¥([0-9.]+)", discount_value) if price_re: price = price_re.group(1).strip() current_ele = self.driver.ele( 'xpath://div[@class="sale-info-wrap"]//span[contains(@class,"current-price")]', timeout=3, ) if current_ele and not price: price = (current_ele.text or "").replace("¥", "").strip() list_price = product.get("price", "") if price: product["price"] = price if appvolnum_value: product["approval_num"] = appvolnum_value logger.info( "详情解析 wholesaleid=%s list_price=%s dom_price=%s url=%s", product.get("item_id"), list_price, product.get("price"), self._current_url(), ) return product def search(self): self.driver.get("https://dian.ysbang.cn/#/home", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._is_logged_in(): if not self.login(): return False cookies_list = self.driver.cookies() cookies_dict = {c['name']: c['value'] for c in cookies_list} self.token = cookies_dict.get("Token") or cookies_dict.get("token") keyword = self.product if self.brand: keyword = (self.brand + " " + self.product).strip() if self.product_desc: keyword = (keyword + " " + self.product_desc).strip() self._state_value = "" for page in range(1, 100): first_search = page == 1 logger.info("药师帮爬取第%s页 firstSearch=%s stateValue=%s", page, first_search, self._state_value or "(空)") pair = self.gen_pair() payload = self.build_base_payload(keyword, page=page, first_search=first_search) payload["ex1"] = pair["ex1"] payload["o"] = pair["o"] response = None for attempt in range(3): try: response = requests.post( "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270", headers=headers, json=payload, timeout=30 ) if response.status_code == 200: break except Exception as e: logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e) response = None time.sleep(10) if not response or response.status_code != 200: logger.error("第%s页请求失败,停止爬取", page) return False try: data_json = response.json() except json.JSONDecodeError: logger.exception("第%s页响应不是合法 JSON", page) return False data_block = data_json.get("data") or {} if str(data_json.get("message", "")) == "该操作需要登录": logger.warning("第%s页需要登录,请检查浏览器登录态", page) return False encrypted_o = data_block.get("o") if not encrypted_o: logger.warning("第%s页返回无加密 data.o: %s", page, data_json) break try: json_data = decrypt_ysb_payload(encrypted_o) except Exception as e: logger.exception("第%s页解密失败: %s", page, e) continue state_val = self._extract_state_value(json_data, data_block) if state_val: self._state_value = state_val wholesales = json_data.get("wholesales", []) if not wholesales: logger.info(f"第{page}页无数据,停止") break for item in wholesales: item_id = item.get("wholesaleid", "") if not item_id: continue detail_url = ( f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1" ) product = self.to_product(item) title = product.get("product_name", "") if self.brand not in title: self.is_product_count += 1 continue if self.product not in title: self.is_product_count += 1 continue if self.product in title and self.brand in title: self.is_product_count = 0 if self.is_product_count >= 20: return False self._goto_detail_page(item_id, detail_url) product = self.parse_detail(product) upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest() product["snapshot_url"] = self._take_snapshot(upload_key) try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) def run(self): try: self.init_browser() self.search() except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser()