| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754 |
- import base64
- import hashlib
- import json
- import math
- import random
- import re
- import signal
- import socket
- import sys
- import time
- import zlib
- from pathlib import Path
- import requests
- import secrets
- import string
- from Crypto.Cipher import AES
- from commons.conn_mysql import MySQLPoolOnline
- from DrissionPage import ChromiumPage, ChromiumOptions
- from commons.Logger import logger
- from oss_upload.oss_upload import AliyunOSSUploader
- from commons.config import YSB_ACCOUNT
- from pipelines.drug_pipelines import DrugPipeline
- from datetime import datetime, timedelta
- from area_info.city_name_to_id import get_city
- CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco"
- CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi"
- SLIDER_OFFSET_FIX = 10
- DETAIL_GET_TIMEOUT = 15
- DETAIL_URL_WAIT = 10
- DETAIL_DOM_WAIT = 8
- DETAIL_NAV_RETRIES = 3
- DETAIL_APPROVAL_XPATH = (
- 'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]'
- )
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- PROJECT_ROOT = Path(__file__).resolve().parents[2]
- YSB_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoshibang"
- BROWSER_PROFILE_SUBDIR = "chrome_profile"
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "Origin": "https://dian.ysbang.cn",
- "Referer": "https://dian.ysbang.cn/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
- ),
- "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- }
- def pkcs7_unpad(data):
- if not data:
- raise ValueError("Empty data for PKCS7 unpad")
- pad_len = data[-1]
- if pad_len < 1 or pad_len > 16:
- raise ValueError("Invalid PKCS7 padding length")
- if data[-pad_len:] != bytes([pad_len]) * pad_len:
- raise ValueError("Invalid PKCS7 padding bytes")
- return data[:-pad_len]
- def derive_ysb_key():
- base = "BhCLxFfFhd12K4qRGPfy"
- md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
- return md5_hex[:16].upper().encode("utf-8")
- def decrypt_ysb_payload(cipher_text_b64):
- """解密药师帮列表接口 data.o 字段,返回 JSON 对象。"""
- key = derive_ysb_key()
- cipher_bytes = base64.b64decode(cipher_text_b64)
- cipher = AES.new(key, AES.MODE_ECB)
- decrypted = cipher.decrypt(cipher_bytes)
- unpadded = pkcs7_unpad(decrypted)
- json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
- return json.loads(json_bytes.decode("utf-8"))
- class YaoShiBangSnapshot:
- def __init__(self, drug_dict=None):
- self.driver = None
- self.db = MySQLPoolOnline()
- self.ip = None
- self.login_username = None
- self.login_password = None
- self.platform = 5
- self.pipeline = DrugPipeline("ysb")
- self.task_dict = drug_dict or {}
- self.ossuploader = AliyunOSSUploader()
- self.start_page = 1
- self.end_page = 1
- self.account_name = YSB_ACCOUNT.get("username", "ysb_default")
- self._register_signal_handler()
- if self.task_dict:
- self.get_product_data()
- self.success = True
- self.is_no_prodcut = 0
- self.is_product_count = 0
- self.token = ""
- self._state_value = ""
- self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- self.start_page = self._parse_page(self.task_dict.get("start_page"), 1)
- self.end_page = max(
- self.start_page,
- self._parse_page(self.task_dict.get("end_page"), self.start_page),
- )
- @staticmethod
- def _parse_page(value, default=1):
- try:
- page = int(value)
- return page if page >= 1 else default
- except (TypeError, ValueError):
- return default
- def _register_signal_handler(self):
- def handler(signum, frame):
- logger.info("收到退出信号,正在关闭浏览器...")
- self._quit_browser()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def _quit_browser(self):
- if self.driver:
- try:
- self.driver.quit()
- except Exception:
- pass
- self.driver = None
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def _resolve_browser_profile_dir(self):
- """
- 浏览器数据固定落在 <项目根>/spiders/yaoshibang/ 下。
- 优先 chrome_profile/<账号>;若旧版目录已有登录态则继续沿用。
- """
- preferred = YSB_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name
- legacy_flat = YSB_SPIDER_DIR / self.account_name
- legacy_nested = YSB_SPIDER_DIR / "spiders" / "yaoshibang" / self.account_name
- for candidate in (preferred, legacy_flat, legacy_nested):
- if (candidate / "Default").is_dir() or (candidate / "Local State").is_file():
- logger.info("使用已有浏览器配置目录: %s", candidate)
- return candidate
- preferred.parent.mkdir(parents=True, exist_ok=True)
- logger.info("新建浏览器配置目录: %s", preferred)
- return preferred
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- profile_dir = self._resolve_browser_profile_dir()
- profile_dir.mkdir(parents=True, exist_ok=True)
- co.set_user_data_path(str(profile_dir))
- logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve())
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--start-maximized")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- self.driver = ChromiumPage(co)
- def _solve_slider_captcha(self):
- """检测并处理易盾滑块验证码,成功返回 True。"""
- self.driver.wait.doc_loaded()
- time.sleep(2)
- yidun = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3)
- if not yidun:
- return True
- logger.info("检测到滑块验证码,开始处理")
- jpg_bytes = yidun.get_screenshot(as_bytes="jpg")
- distance = self._call_captcha_api(jpg_bytes)
- if distance is None:
- logger.error("验证码识别失败")
- return False
- logger.info("滑块距离: %s", distance)
- slider = self.driver.ele(
- "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5
- )
- if not slider:
- logger.error("未找到滑块元素")
- return False
- try:
- drag_distance = float(distance) + SLIDER_OFFSET_FIX
- except (TypeError, ValueError):
- logger.error("滑块距离非数字: %r", distance)
- return False
- if not math.isfinite(drag_distance) or drag_distance <= 0:
- logger.error("滑块距离无效: %s", drag_distance)
- return False
- self._simulate_slider_drag(slider, drag_distance - 5)
- time.sleep(3)
- return True
- def _call_captcha_api(self, image_bytes):
- """调用云码平台识别滑块距离,失败返回 None。"""
- try:
- b64 = base64.b64encode(image_bytes).decode()
- resp = requests.post(
- CAPTCHA_API_URL,
- json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64},
- headers={"Content-Type": "application/json"},
- timeout=15,
- ).json()
- logger.info("验证码 API 返回: %s", resp)
- if not isinstance(resp, dict):
- return None
- data = resp.get("data")
- if isinstance(data, dict):
- dist = data.get("data")
- else:
- dist = data
- if dist is None:
- logger.error("验证码 API 未返回距离字段: %s", resp)
- return None
- try:
- d = float(dist)
- except (TypeError, ValueError):
- logger.error("验证码距离无法解析为数字: %r", dist)
- return None
- if not math.isfinite(d):
- logger.error("验证码距离非有限数值: %r", dist)
- return None
- return d
- except Exception as e:
- logger.exception("验证码 API 调用失败: %s", e)
- return None
- @staticmethod
- def _generate_human_track(distance):
- try:
- distance = float(distance)
- except (TypeError, ValueError):
- return []
- if distance <= 0 or not math.isfinite(distance):
- return []
- tracks = []
- current = 0
- mid = distance * 0.7
- t = 0.2
- v = 0
- move_points = []
- while current < mid:
- a = random.uniform(2, 4)
- v0 = v
- v = v0 + a * t
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- while current < distance:
- a = -random.uniform(0.5, 1.5)
- v0 = v
- v = v0 + a * t
- if v < 0.5:
- v = 0.5
- move = v0 * t + 0.5 * a * t * t
- current += move
- move_points.append(move)
- total_points = len(move_points)
- for i, move in enumerate(move_points):
- y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0
- if i < total_points * 0.3:
- duration = random.uniform(0.01, 0.03)
- elif i > total_points * 0.7:
- duration = random.uniform(0.03, 0.08)
- else:
- duration = random.uniform(0.02, 0.05)
- if random.random() < 0.05:
- duration += random.uniform(0.05, 0.1)
- tracks.append((move, y_offset, duration))
- if random.random() < 0.7:
- tracks.append((-random.randint(1, 3), 0, 0.05))
- return tracks
- def _simulate_slider_drag(self, slider_element, target_distance):
- if target_distance <= 0:
- logger.warning("滑块目标距离无效: %s", target_distance)
- return
- self.driver.actions.move_to(slider_element).hold()
- for offset_x, offset_y, duration in self._generate_human_track(target_distance):
- self.driver.actions.move(offset_x, offset_y, duration=duration / 1000)
- self.driver.actions.release()
- def _is_logged_in(self):
- # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
- title = self.driver.ele(
- "xpath=//span[@class='logout']",
- timeout=5,
- )
- return bool(title)
- def _current_url(self):
- try:
- return self.driver.url or ""
- except Exception:
- return ""
- def _goto_detail_page(self, item_id, detail_url):
- """get 后 refresh 一次,让 SPA 按当前 URL 重新渲染详情。"""
- for attempt in range(1, DETAIL_NAV_RETRIES + 1):
- try:
- self.driver.get(detail_url, timeout=5)
- time.sleep(1.5)
- eles = self.driver.eles("xpath=//div[@class='y-dialog']//button[contains(text(),'确认')]", timeout=3)
- if len(eles) == 2:
- eles[1].click()
- time.sleep(1)
- self.driver.refresh()
- time.sleep(1.5)
- if str(item_id) in self.driver.url:
- return True
- except Exception as e:
- logger.warning(
- "跳转详情异常 item_id=%s attempt=%s: %s",
- item_id, attempt, e,
- )
- time.sleep(random.uniform(0.8, 1.5))
- return False
- def login(self):
- logger.info("开始登录药师帮")
- self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
- if not input_name:
- logger.error("未找到账号输入框")
- return False
- input_name.input(YSB_ACCOUNT["username"])
- time.sleep(random.uniform(1.5, 2.5))
- input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
- if not input_pass:
- logger.error("未找到密码输入框")
- return False
- input_pass.input(YSB_ACCOUNT["password"])
- time.sleep(random.uniform(1.5, 2.5))
- login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
- if not login_btn:
- logger.error("未找到登录按钮")
- return False
- login_btn.click()
- time.sleep(3)
- for i in range(3):
- self._solve_slider_captcha()
- time.sleep(3)
- if self._is_logged_in():
- logger.info("登录成功")
- return True
- logger.error("登录后未检测到目标店铺名,登录可能失败")
- return False
- def _take_snapshot(self, upload_key):
- """在当前页面截图并上传,不再重复跳转。"""
- time.sleep(1)
- self._dismiss_popup_before_screenshot()
- try:
- jpg_bytes = self.driver.get_screenshot(as_bytes="jpg")
- if not jpg_bytes:
- logger.warning("截图为空 upload_key=%s", upload_key)
- return ""
- img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
- except Exception:
- logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key)
- return ""
- if not img_url:
- logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key)
- return ""
- logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url)
- time.sleep(random.uniform(1, 2))
- return img_url
- def gen_pair(self, ex1_len=9, o_raw_len=16):
- alphabet = string.ascii_lowercase + string.digits
- ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
- o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
- return {"ex1": ex1, "o": o}
- def build_base_payload(self, keyword, page, first_search):
- date_str = time.strftime("%Y-%m-%d %H:%M:%S")
- return {
- "platform": "pc",
- "version": "6.0.0",
- "ua": "Chrome146",
- 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
- "trafficType": 1,
- "ex1": "",
- "o": "",
- "lastClick": -1,
- "page": page,
- "pagesize": "60",
- "classify_id": "",
- "searchkey": keyword,
- "onlyTcm": 0,
- "operationtype": 1,
- "qualifiedLoanee": 0,
- "drugId": -1,
- "tagId": "",
- "showRecentlyPurchasedFlag": True,
- "onlySimpleLoan": 0,
- "sn": "",
- "buttons": [],
- "buttonList": [],
- "synonymId": 0,
- "activityTypes": [],
- "provider_filter": "",
- "factoryNames": "",
- "tcmGradeNames": [],
- "tcmExeStandardIds": [],
- "specs": "",
- "deliverFloor": 0,
- "purchaseLimitFloor": 0,
- "nextRequestKey": "",
- "adConfigId": 0,
- "stateValue": self._state_value,
- "firstSearch": first_search,
- "token": self.token,
- }
- @staticmethod
- def _extract_state_value(json_data, data_block):
- for src in (json_data, data_block):
- if not isinstance(src, dict):
- continue
- val = src.get("stateValue") or src.get("state_value")
- if val:
- return str(val)
- return None
- def _dismiss_popup_before_screenshot(self):
- """截图前关闭或隐藏营销弹窗,避免遮挡。"""
- close_locs = [
- "xpath=//div[contains(@class,'dialog')]//i[contains(@class,'close')]",
- "xpath=//div[contains(@class,'popup')]//i[contains(@class,'close')]",
- "xpath=//div[contains(@class,'modal')]//i[contains(@class,'close')]",
- "xpath=//button[contains(@class,'close')]",
- "xpath=//span[text()='×']",
- "xpath=//*[contains(text(),'智能采购')]/ancestor::div[1]//*[contains(@class,'close')]",
- ]
- for loc in close_locs:
- try:
- btn = self.driver.ele(loc, timeout=0.5)
- if btn:
- btn.click()
- time.sleep(0.2)
- except Exception:
- pass
- try:
- # 兜底:隐藏常见高层弹窗和遮罩
- self.driver.run_js(
- """
- const sels = [
- '[class*="modal"]',
- '[class*="popup"]',
- '[class*="dialog"]',
- '[class*="mask"]',
- '[class*="overlay"]'
- ];
- for (const s of sels) {
- document.querySelectorAll(s).forEach(el => {
- const style = getComputedStyle(el);
- const z = parseInt(style.zIndex || '0', 10);
- if (z >= 999 && style.display !== 'none') {
- el.style.display = 'none';
- }
- });
- }
- document.body.style.overflow = 'auto';
- """
- )
- time.sleep(0.2)
- except Exception:
- pass
- def to_product(self, item):
- now = time.strftime("%Y-%m-%d %H:%M:%S")
- item_id = item.get("wholesaleid", "")
- provider_id = item.get("providerId", "")
- city_str = item.get("warehouseCity", "")
- city_id = province_id = city = province = ""
- price = item.get("disPrice", "")
- if not price:
- price = item.get("minprice", "")
- if not price:
- price = item.get("price", "")
- shop_name = item.get("provider_name", "")
- if not shop_name:
- shop_name = item.get("abbreviation", "")
- product = {
- "platform": self.platform,
- "item_id": item_id,
- "enterprise_id": self.company_id,
- "product_name": item.get("drugname", ""),
- "spec": item.get("specification", ""),
- "one_price": '',
- "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
- "shop_name": shop_name,
- "anonymous_store_name": "",
- "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
- "city_name": city,
- "city_id": city_id,
- "province_name": province,
- "province_id": province_id,
- "area_info": "",
- "factory_name": item.get("manufacturer", ""),
- "scrape_date": time.strftime("%Y-%m-%d"),
- "price": price,
- "sales": "",
- "stock_count": item.get("stockAvailable", ""),
- "snapshot_url": "",
- "approval_num": "",
- "produced_time": item.get("prodDate", ""),
- "deadline": item.get("valid_date", ""),
- "update_time": now,
- "insert_time": now,
- "number": 1,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": "",
- "collect_config_info": json.dumps(
- {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time}),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": 0
- }
- return product
- def parse_detail(self, product):
- appvolnum_ele = self.driver.ele(
- 'xpath://div[@class="drug-info"]//span[contains(text(),"批准文号")]/following-sibling::span[1]')
- appvolnum_value = appvolnum_ele.text if appvolnum_ele else ""
- price = ""
- discount_ele = self.driver.ele(
- 'xpath://div[@class="sale-info-wrap"]//div[@class="tooltip-content"]',
- timeout=2,
- )
- discount_value = discount_ele.text if discount_ele else ""
- if not price and discount_value:
- price_re = re.search(r"¥([0-9.]+)", discount_value)
- if price_re:
- price = price_re.group(1).strip()
- current_ele = self.driver.ele(
- 'xpath://div[@class="sale-info-wrap"]//span[contains(@class,"current-price")]',
- timeout=3,
- )
- if current_ele and not price:
- price = (current_ele.text or "").replace("¥", "").strip()
- list_price = product.get("price", "")
- if price:
- product["price"] = price
- if appvolnum_value:
- product["approval_num"] = appvolnum_value
- logger.info(
- "详情解析 wholesaleid=%s list_price=%s dom_price=%s url=%s",
- product.get("item_id"),
- list_price,
- product.get("price"),
- self._current_url(),
- )
- return product
- def search(self):
- self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- if not self._is_logged_in():
- if not self.login():
- return False
- cookies_list = self.driver.cookies()
- cookies_dict = {c['name']: c['value'] for c in cookies_list}
- self.token = cookies_dict.get("Token") or cookies_dict.get("token")
- keyword = self.product
- if self.brand:
- keyword = (self.brand + " " + self.product).strip()
- if self.product_desc:
- keyword = (keyword + " " + self.product_desc).strip()
- self._state_value = ""
- for page in range(1, 100):
- first_search = page == 1
- logger.info("药师帮爬取第%s页 firstSearch=%s stateValue=%s", page, first_search,
- self._state_value or "(空)")
- pair = self.gen_pair()
- payload = self.build_base_payload(keyword, page=page, first_search=first_search)
- payload["ex1"] = pair["ex1"]
- payload["o"] = pair["o"]
- response = None
- for attempt in range(3):
- try:
- response = requests.post(
- "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270", headers=headers,
- json=payload, timeout=30
- )
- if response.status_code == 200:
- break
- except Exception as e:
- logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e)
- response = None
- time.sleep(10)
- if not response or response.status_code != 200:
- logger.error("第%s页请求失败,停止爬取", page)
- return False
- try:
- data_json = response.json()
- except json.JSONDecodeError:
- logger.exception("第%s页响应不是合法 JSON", page)
- return False
- data_block = data_json.get("data") or {}
- if str(data_json.get("message", "")) == "该操作需要登录":
- logger.warning("第%s页需要登录,请检查浏览器登录态", page)
- return False
- encrypted_o = data_block.get("o")
- if not encrypted_o:
- logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
- break
- try:
- json_data = decrypt_ysb_payload(encrypted_o)
- except Exception as e:
- logger.exception("第%s页解密失败: %s", page, e)
- continue
- state_val = self._extract_state_value(json_data, data_block)
- if state_val:
- self._state_value = state_val
- wholesales = json_data.get("wholesales", [])
- if not wholesales:
- logger.info(f"第{page}页无数据,停止")
- break
- for item in wholesales:
- item_id = item.get("wholesaleid", "")
- if not item_id:
- continue
- detail_url = (
- f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1"
- )
- product = self.to_product(item)
- title = product.get("product_name", "")
- if self.brand not in title:
- self.is_product_count += 1
- continue
- if self.product not in title:
- self.is_product_count += 1
- continue
- if self.product in title and self.brand in title:
- self.is_product_count = 0
- if self.is_product_count >= 20:
- return False
- self._goto_detail_page(item_id, detail_url)
- product = self.parse_detail(product)
- upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest()
- product["snapshot_url"] = self._take_snapshot(upload_key)
- try:
- self.pipeline.storge_data(product)
- logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- def run(self):
- try:
- self.init_browser()
- self.search()
- except Exception as e:
- logger.exception("运行异常: %s", e)
- finally:
- self._quit_browser()
|