import base64 import hashlib import json import math import random import signal import socket import sys import time import zlib from pathlib import Path from urllib.parse import quote import requests from Crypto.Cipher import AES from commons.conn_mysql import MySQLPoolOnline from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader from commons.config import YSB_ACCOUNT from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" SLIDER_OFFSET_FIX = 10 LISTEN_CLEAR_ROUNDS = 3 LISTEN_CLEAR_TIMEOUT = 0.3 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" PROJECT_ROOT = Path(__file__).resolve().parents[2] YSB_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoshibang" BROWSER_PROFILE_SUBDIR = "chrome_profile" def pkcs7_unpad(data): if not data: raise ValueError("Empty data for PKCS7 unpad") pad_len = data[-1] if pad_len < 1 or pad_len > 16: raise ValueError("Invalid PKCS7 padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("Invalid PKCS7 padding bytes") return data[:-pad_len] def derive_ysb_key(): base = "BhCLxFfFhd12K4qRGPfy" md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest() return md5_hex[:16].upper().encode("utf-8") def decrypt_ysb_payload(cipher_text_b64): """解密药师帮列表接口 data.o 字段,返回 JSON 对象。""" key = derive_ysb_key() cipher_bytes = base64.b64decode(cipher_text_b64) cipher = AES.new(key, AES.MODE_ECB) decrypted = cipher.decrypt(cipher_bytes) unpadded = pkcs7_unpad(decrypted) json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16) return json.loads(json_bytes.decode("utf-8")) class YaoShiBangSnapshot: def __init__(self, drug_dict=None): self.driver = None self.db = MySQLPoolOnline() self.ip = None self.login_username = None self.login_password = None self.platform = 5 self.pipeline = DrugPipeline("ysb") self.task_dict = drug_dict or {} self.ossuploader = AliyunOSSUploader() self.start_page = 1 self.end_page = 1 self.account_name = YSB_ACCOUNT.get("username", "ysb_default") self._register_signal_handler() if self.task_dict: self.get_product_data() self.success = True self.is_no_prodcut = 0 self.is_product_count = 0 self._listen_started = False def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) self.start_page = self._parse_page(self.task_dict.get("start_page"), 1) self.end_page = max( self.start_page, self._parse_page(self.task_dict.get("end_page"), self.start_page), ) @staticmethod def _parse_page(value, default=1): try: page = int(value) return page if page >= 1 else default except (TypeError, ValueError): return default def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def _resolve_browser_profile_dir(self): """ 浏览器数据固定落在 <项目根>/spiders/yaoshibang/ 下。 优先 chrome_profile/<账号>;若旧版目录已有登录态则继续沿用。 """ preferred = YSB_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name legacy_flat = YSB_SPIDER_DIR / self.account_name legacy_nested = YSB_SPIDER_DIR / "spiders" / "yaoshibang" / self.account_name for candidate in (preferred, legacy_flat, legacy_nested): if (candidate / "Default").is_dir() or (candidate / "Local State").is_file(): logger.info("使用已有浏览器配置目录: %s", candidate) return candidate preferred.parent.mkdir(parents=True, exist_ok=True) logger.info("新建浏览器配置目录: %s", preferred) return preferred def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() profile_dir = self._resolve_browser_profile_dir() profile_dir.mkdir(parents=True, exist_ok=True) co.set_user_data_path(str(profile_dir)) logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve()) co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--start-maximized") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 self.driver = ChromiumPage(co) def _solve_slider_captcha(self): """检测并处理易盾滑块验证码,成功返回 True。""" self.driver.wait.doc_loaded() time.sleep(2) yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not yidun: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = yidun.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: logger.error("验证码识别失败") return False logger.info("滑块距离: %s", distance) slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False try: drag_distance = float(distance) + SLIDER_OFFSET_FIX except (TypeError, ValueError): logger.error("滑块距离非数字: %r", distance) return False if not math.isfinite(drag_distance) or drag_distance <= 0: logger.error("滑块距离无效: %s", drag_distance) return False self._simulate_slider_drag(slider, drag_distance - 5) time.sleep(3) return True def _call_captcha_api(self, image_bytes): """调用云码平台识别滑块距离,失败返回 None。""" try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) if not isinstance(resp, dict): return None data = resp.get("data") if isinstance(data, dict): dist = data.get("data") else: dist = data if dist is None: logger.error("验证码 API 未返回距离字段: %s", resp) return None try: d = float(dist) except (TypeError, ValueError): logger.error("验证码距离无法解析为数字: %r", dist) return None if not math.isfinite(d): logger.error("验证码距离非有限数值: %r", dist) return None return d except Exception as e: logger.exception("验证码 API 调用失败: %s", e) return None @staticmethod def _generate_human_track(distance): try: distance = float(distance) except (TypeError, ValueError): return [] if distance <= 0 or not math.isfinite(distance): return [] tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): if target_distance <= 0: logger.warning("滑块目标距离无效: %s", target_distance) return self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _is_logged_in(self): # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项 title = self.driver.ele( "xpath=//span[@class='logout']", timeout=5, ) return bool(title) def _start_listen(self): """监听列表接口 getWholesaleList。""" target = "wholesale-drug/sales/getWholesaleList/v4270" if self._listen_started and getattr(self.driver.listen, "listening", False): self.driver.listen.stop() self.driver.listen.start(target) self._listen_started = True logger.info("已启动监听: %s", target) def clear_listen_buffer(self, rounds=LISTEN_CLEAR_ROUNDS, timeout=LISTEN_CLEAR_TIMEOUT): if not self.driver: return try: for _ in range(rounds): resps = list(self.driver.listen.steps(timeout=timeout)) if not resps: break except Exception as e: logger.debug("清空监听缓冲失败: %s", e) @staticmethod def _parse_listen_body(resp): body = resp.response.body if isinstance(body, str): body = json.loads(body) if not isinstance(body, dict): return None return body @staticmethod def _extract_encrypted_o(body): data_block = (body or {}).get("data") or {} if isinstance(data_block, dict): return data_block.get("o") return None def _consume_list_listen(self, page, timeout=10): """消费列表接口响应,返回解密后的 json_data。""" for resp in self.driver.listen.steps(timeout=timeout): try: body = self._parse_listen_body(resp) if not body: continue message = str(body.get("message", "")) if message and "成功" not in message: logger.warning("第%s页 message=%s", page, message) continue encrypted_o = self._extract_encrypted_o(body) if not encrypted_o: continue json_data = decrypt_ysb_payload(encrypted_o) logger.info("第%s页列表解密成功 wholesales=%s", page, len(json_data.get("wholesales", []))) return json_data except Exception as e: logger.warning("第%s页解析列表监听失败: %s", page, e) return None def login(self): logger.info("开始登录药师帮") self.driver.get("https://dian.ysbang.cn/#/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5) if not input_name: logger.error("未找到账号输入框") return False input_name.input(YSB_ACCOUNT["username"]) time.sleep(random.uniform(1.5, 2.5)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input(YSB_ACCOUNT["password"]) time.sleep(random.uniform(1.5, 2.5)) login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5) if not login_btn: logger.error("未找到登录按钮") return False login_btn.click() time.sleep(3) for i in range(3): self._solve_slider_captcha() time.sleep(3) if self._is_logged_in(): logger.info("登录成功") return True logger.error("登录后未检测到目标店铺名,登录可能失败") return False def _take_snapshot(self, upload_key, image_ele): """在当前页面截图并上传,不再重复跳转。""" time.sleep(1) self._dismiss_popup_before_screenshot() try: jpg_bytes = image_ele.get_screenshot(as_bytes="jpg") if not jpg_bytes: logger.warning("截图为空 upload_key=%s", upload_key) return "" img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key)) except Exception: logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key) return "" if not img_url: logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key) return "" logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url) time.sleep(random.uniform(1, 2)) return img_url def _human_click(self, element): """在目标节点上触发 click,避免 move_to + 无目标 actions.click() 因布局位移点到商品链接触发详情页。""" if not element: return False try: time.sleep(random.uniform(0.8, 2.0)) try: self.driver.run_js( "arguments[0].scrollIntoView({block:'center',behavior:'instant'});", element, ) except Exception: pass time.sleep(random.uniform(0.3, 1)) self.driver.run_js("arguments[0].click();", element) return True except Exception as e: logger.warning("点击失败: %s", e) try: element.click() return True except Exception: return False def _dismiss_popup_before_screenshot(self): """截图前关闭或隐藏营销弹窗,避免遮挡。""" close_locs = [ "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]", "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]", "xpath=//button[contains(@class,'close')]", "xpath=//span[text()='×']", "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]", ] for loc in close_locs: try: btn = self.driver.ele(loc, timeout=0.5) if btn: btn.click() time.sleep(0.2) except Exception: pass try: # 兜底:隐藏常见高层弹窗和遮罩 self.driver.run_js( """ const sels = [ '[class*="modal"]', '[class*="popup"]', '[class*="dialog"]', '[class*="mask"]', '[class*="overlay"]' ]; for (const s of sels) { document.querySelectorAll(s).forEach(el => { const style = getComputedStyle(el); const z = parseInt(style.zIndex || '0', 10); if (z >= 999 && style.display !== 'none') { el.style.display = 'none'; } }); } document.body.style.overflow = 'auto'; """ ) time.sleep(0.2) except Exception: pass def to_product(self, item): now = time.strftime("%Y-%m-%d %H:%M:%S") item_id = item.get("wholesaleid", "") provider_id = item.get("providerId", "") city_id = province_id = city = province = "" city_str = item.get("warehouseCity", "") if city_str: city_id, province_id, city, province = get_city(city_str) price = item.get("disPrice", "") if not price: price = item.get("minprice", "") if not price: price = item.get("price", "") shop_name = item.get("provider_name", "") if not shop_name: shop_name = item.get("abbreviation", "") product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": item.get("drugname", ""), "spec": item.get("specification", ""), "one_price": '', "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1", "shop_name": shop_name, "anonymous_store_name": "", "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4", "city_name": city, "city_id": city_id, "province_name": province, "province_id": province_id, "area_info": "", "factory_name": item.get("manufacturer", ""), "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": "", "stock_count": item.get("stockAvailable", ""), "snapshot_url": "", "approval_num": "", "produced_time": item.get("prodDate", ""), "deadline": item.get("valid_date", ""), "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time}), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 0 } return product def search(self): self.driver.get("https://dian.ysbang.cn/#/home", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._is_logged_in(): if not self.login(): return False keyword = self.product if self.brand: keyword = (self.brand + " " + self.product).strip() if self.product_desc: keyword = (keyword + " " + self.product_desc).strip() search_key = quote(keyword) page = self.start_page url = ( f"https://dian.ysbang.cn/#/indexContent?lastClick=-1&page={page}" f"&pagesize=60&classify_id=&searchkey={search_key}" ) self._start_listen() self.driver.get(url) for page in range(1, 100): self.driver.wait.doc_loaded(timeout=10) time.sleep(1.5) json_data = self._consume_list_listen(page) if not json_data: logger.warning("第%s页未收到列表监听数据", page) break wholesales = json_data.get("wholesales", []) if not wholesales: logger.info("第%s页无数据,停止", page) break list_items = wholesales goods_wrappers = self.driver.eles( "xpath=//div[@class='drugListPage']//div[@class='drug-list']/div[contains(@class,'all-goods-wrapper')]" ) for list_idx, item in enumerate(list_items, start=1): item_id = item.get("wholesaleid", "") logger.info( "第%s页 列表第%s/%s条 wholesaleid=%s", page, list_idx, len(list_items), item_id, ) if not item_id: continue detail_url = ( f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1" ) product = self.to_product(item) title = product.get("product_name", "") if self.brand not in title: self.is_product_count += 1 if self.product not in title: self.is_product_count += 1 continue if self.product in title and self.brand in title: self.is_product_count = 0 if self.is_product_count >= 20: return dom_idx = list_idx - 1 image_ele = goods_wrappers[dom_idx] upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest() product["snapshot_url"] = self._take_snapshot(upload_key, image_ele) try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) # 检测下一页 self.clear_listen_buffer() next_button = self.driver.ele("xpath=//div[@class='condition']//div[@class='btn next']") if not next_button: logger.info("没有下一页,停止") break else: self._human_click(next_button) def run(self): try: self.init_browser() self.search() except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser() return self.pipeline.crawl_count, self.success