| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import base64
- import hashlib
- import json
- import random
- import re
- import secrets
- import string
- import time
- import zlib
- from datetime import datetime, timedelta
- import requests
- from Crypto.Cipher import AES
- from commons.Logger import get_spider_logger
- from pipelines.drug_pipelines import DrugPipeline
- from area_info.city_name_to_id import get_city
- from commons.conn_mysql import MySQLPoolOnline
- from spiders.yaoshibang.login_yaoshibang import YaoShiBangLogin
- logger = get_spider_logger("yaoshibang")
- class YsbSpider:
- def __init__(self, drug_dict=None):
- self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270"
- self.headers = self.build_headers()
- self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
- self.platform = 5
- self.approval_num = ""
- self.task_dict = drug_dict or {}
- self.collect_task_id = None
- self.token = None
- self.account_name = "17097980383"
- self.pipeline = DrugPipeline("yaoshibang")
- if self.task_dict:
- self.get_product_data()
- self.is_success = True
- self.db_online = MySQLPoolOnline()
- def get_token(self, _retry_login=False):
- sql_account = f""" select `name`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=1 order by `cookie_timestamp` asc limit 1 """
- account_list = self.db_online.select_data(sql_account)
- if not account_list:
- logger.error("无可用爬取账号")
- return None
- account_dict = account_list[0]
- self.account_name = account_dict["name"]
- cookie_str = account_dict.get("cookie_str") or ""
- if not cookie_str:
- if _retry_login:
- logger.error("账号 %s 登录后 cookie 仍为空", self.account_name)
- return None
- logger.warning("账号 %s cookie_str 为空,尝试登录", self.account_name)
- YaoShiBangLogin().run()
- time.sleep(5)
- return self.get_token(_retry_login=True)
- try:
- cookie_dict = json.loads(cookie_str)
- except json.JSONDecodeError:
- logger.exception("账号 %s cookie_str 不是合法 JSON", self.account_name)
- return None
- token = cookie_dict.get("Token") or cookie_dict.get("token")
- if not token:
- logger.error("账号 %s cookie 中无 Token 字段: %s", self.account_name, list(cookie_dict.keys()))
- return None
- self.token = token
- logger.info("已刷新 token,账号=%s", self.account_name)
- return self.token
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- def pkcs7_unpad(self, data):
- if not data:
- raise ValueError("Empty data for PKCS7 unpad")
- pad_len = data[-1]
- if pad_len < 1 or pad_len > 16:
- raise ValueError("Invalid PKCS7 padding length")
- if data[-pad_len:] != bytes([pad_len]) * pad_len:
- raise ValueError("Invalid PKCS7 padding bytes")
- return data[:-pad_len]
- def derive_key(self):
- base = "BhCLxFfFhd12K4qRGPfy"
- md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
- return md5_hex[:16].upper().encode("utf-8")
- def decrypt_payload(self, cipher_text_b64):
- key = self.derive_key()
- cipher_bytes = base64.b64decode(cipher_text_b64)
- cipher = AES.new(key, AES.MODE_ECB)
- decrypted = cipher.decrypt(cipher_bytes)
- unpadded = self.pkcs7_unpad(decrypted)
- json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
- return json.loads(json_bytes.decode("utf-8"))
- def gen_pair(self, ex1_len=9, o_raw_len=16):
- alphabet = string.ascii_lowercase + string.digits
- ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
- o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
- return {"ex1": ex1, "o": o}
- def build_headers(self):
- return {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "Origin": "https://dian.ysbang.cn",
- "Referer": "https://dian.ysbang.cn/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
- ),
- "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- }
- def build_base_payload(self):
- keyword = self.product
- if self.brand:
- keyword = self.brand + " " + self.product
- if self.product_desc:
- keyword = keyword + self.product_desc
- date_str = time.strftime("%Y-%m-%d %H:%M:%S")
- return {
- "platform": "pc",
- "version": "6.0.0",
- "ua": "Chrome146",
- 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
- "trafficType": 1,
- "ex1": "",
- "o": "",
- "lastClick": -1,
- "page": 1,
- "pagesize": "60",
- "classify_id": "",
- "searchkey": keyword,
- "onlyTcm": 0,
- "operationtype": 1,
- "qualifiedLoanee": 0,
- "drugId": -1,
- "tagId": "",
- "showRecentlyPurchasedFlag": True,
- "onlySimpleLoan": 0,
- "sn": "",
- "buttons": [],
- "buttonList": [],
- "synonymId": 0,
- "activityTypes": [],
- "provider_filter": "",
- "factoryNames": "",
- "tcmGradeNames": [],
- "tcmExeStandardIds": [],
- "specs": "",
- "deliverFloor": 0,
- "purchaseLimitFloor": 0,
- "nextRequestKey": "",
- "adConfigId": 0,
- "stateValue": "",
- "firstSearch": True,
- "token": self.token,
- }
- def get_price(self, price_token):
- pattern = re.compile(r'(?<!\d)(\d+\.\d{2})(?!\d)')
- decoded = base64.b64decode(price_token)
- text_part = decoded.decode('utf-8', errors='ignore')
- numbers_from_text = pattern.findall(text_part)
- unique_prices = list(set(round(float(a), 2) for a in numbers_from_text))
- if not unique_prices:
- return ""
- last_prices = sorted(unique_prices, reverse=True)
- return last_prices[-1]
- def to_product(self, item, type_data):
- now = time.strftime("%Y-%m-%d %H:%M:%S")
- item_id = item.get("wholesaleid", "")
- provider_id = item.get("providerId", "")
- shop_url = f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
- city_str = item.get("warehouseCity", "")
- city_id = province_id = city = province = ""
- price = item.get("disPrice", "")
- if not price:
- price = item.get("minprice", "")
- if not price:
- price = item.get("price", "")
- if not price:
- price_token = item.get("priceToken", "")
- if price_token:
- price = self.get_price(price_token)
- if not price:
- city_str, price = self.parse_detail(item_id, type_data)
- if city_str:
- city_id, province_id, city, province = get_city(city_str)
- shop_name = item.get("provider_name", "")
- if not shop_name:
- shop_name = item.get("abbreviation", "")
- product = {
- "platform": self.platform,
- "item_id": item_id,
- "enterprise_id": self.company_id,
- "product_name": item.get("drugname", ""),
- "spec": item.get("specification", ""),
- "one_price": '',
- "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
- "shop_name": shop_name,
- "anonymous_store_name": "",
- "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
- "city_name": city,
- "city_id": city_id,
- "province_name": province,
- "province_id": province_id,
- "area_info": "",
- "factory_name": item.get("manufacturer", ""),
- "scrape_date": time.strftime("%Y-%m-%d"),
- "price": price,
- "sales": "",
- "stock_count": item.get("stockAvailable", ""),
- "snapshot_url": "",
- "approval_num": self.approval_num,
- "produced_time": item.get("prodDate", ""),
- "deadline": item.get("valid_date", ""),
- "update_time": now,
- "insert_time": now,
- "number": 1,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": "",
- "collect_config_info": json.dumps(
- {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time}),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": 0
- }
- return product
- def parse_detail(self, product_id, type_data):
- date_str = time.strftime("%Y-%m-%d %H:%M:%S")
- json_data = {
- 'platform': 'pc',
- 'version': '6.0.0',
- 'ua': 'Chrome146',
- 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
- 'trafficType': 1,
- 'ex1': 'qtrcqlxew',
- 'wholesaleid': str(product_id),
- 'showRecentlyPurchasedFlag': True,
- 'isClinic': 0,
- 'scene': [1, ],
- 'adConfigId': 0,
- 'token': self.token,
- }
- if type_data in [7]:
- json_data["wholesaleId"] = str(product_id)
- url = "https://dian.ysbang.cn/wholesale-drug/api/teambuy/getActivityDetail/v4260"
- else:
- json_data['wholesaleid'] = str(product_id)
- url = 'https://dian.ysbang.cn/wholesale-drug/sales/getPreferenceDetail/v5280'
- time.sleep(random.uniform(1, 3))
- response = requests.post(url, headers=self.headers, json=json_data)
- data_json = response.json()
- data = data_json.get("data", {})
- self.approval_num = data.get("druginfo", {}).get("approval", "")
- if not self.approval_num:
- team_info = (data.get("teamBuyDetailDrugInfo") or {}).get(
- "teamBuyDetailSingleSaleTypeDrugInfo"
- ) or {}
- self.approval_num = team_info.get("approval", "") or ""
- city_data = data.get("delivery_policy") or ""
- city_re = re.search(r"\[(\w+)\w+\]", str(city_data))
- price_dict = data.get("disPriceInfo", {})
- if not price_dict:
- price_dict = data.get("teamBuyDetailInfo", {})
- price = price_dict.get("disPrice", "")
- if not price:
- price = price_dict.get("minprice", "")
- if not price:
- price = data.get("price", "")
- if city_re:
- city_str = city_re.group(1)
- else:
- city_str = ""
- return city_str, price
- def search_data(self):
- if not self.task_dict:
- return
- page = 1
- while page < 100:
- logger.info(f"药师帮爬取第{page}页")
- pair = self.gen_pair()
- payload = self.build_base_payload()
- payload["ex1"] = pair["ex1"]
- payload["o"] = pair["o"]
- payload["page"] = page
- response = None
- for attempt in range(3):
- try:
- response = requests.post(
- self.url, headers=self.headers, json=payload, timeout=30
- )
- if response.status_code == 200:
- break
- except Exception as e:
- logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e)
- response = None
- time.sleep(10)
- if not response or response.status_code != 200:
- logger.error("第%s页请求失败,停止爬取", page)
- return
- try:
- data_json = response.json()
- except json.JSONDecodeError:
- logger.exception("第%s页响应不是合法 JSON", page)
- return
- data_block = data_json.get("data") or {}
- if data_json.get("message", "") == "该操作需要登录":
- logger.info("登录账号中。。。")
- YaoShiBangLogin().run()
- time.sleep(10)
- if not self.get_token():
- logger.error("登录后仍未从库中读到有效 Token,停止重试")
- return
- logger.info("token 已刷新,重试第 %s 页", page)
- continue
- encrypted_o = data_block.get("o")
- if not encrypted_o:
- logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
- break
- try:
- json_data = self.decrypt_payload(encrypted_o)
- except Exception as e:
- logger.exception("第%s页解密失败: %s", page, e)
- continue
- wholesales = json_data.get("wholesales", [])
- if not wholesales:
- logger.info(f"第{page}页无数据,停止")
- break
- # 获取国药准字
- for item in wholesales[0:5]:
- product_id = item.get("wholesaleid", "")
- type_data = item.get("activitytype", 0)
- self.parse_detail(product_id, type_data)
- if self.approval_num:
- break
- for item in wholesales:
- type_data = item.get("activitytype", 0)
- product = self.to_product(item, type_data)
- if not product.get("item_id"):
- continue
- try:
- self.pipeline.storge_data(product)
- logger.info("%s", json.dumps(product, ensure_ascii=False))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- page += 1
- def run(self):
- if not self.get_token():
- logger.error("启动失败:无可用 token")
- return 0, False
- try:
- self.search_data()
- except Exception as e:
- logger.error(e)
- logger.info(f"爬取总数{self.pipeline.crawl_count}")
- return self.pipeline.crawl_count, self.is_success
|