import base64 import hashlib import json import random import re import secrets import string import time import zlib from datetime import datetime, timedelta import requests from Crypto.Cipher import AES from commons.Logger import get_spider_logger from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city from commons.conn_mysql import MySQLPoolOnline from spiders.yaoshibang.login_yaoshibang import YaoShiBangLogin logger = get_spider_logger("yaoshibang") class YsbSpider: def __init__(self, drug_dict=None): self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270" self.headers = self.build_headers() self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M") self.platform = 5 self.approval_num = "" self.task_dict = drug_dict or {} self.collect_task_id = None self.token = None self.account_name = "17097980383" self.pipeline = DrugPipeline("yaoshibang") if self.task_dict: self.get_product_data() self.is_success = True self.db_online = MySQLPoolOnline() def get_token(self, _retry_login=False): sql_account = f""" select `name`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=1 order by `cookie_timestamp` asc limit 1 """ account_list = self.db_online.select_data(sql_account) if not account_list: logger.error("无可用爬取账号") return None account_dict = account_list[0] self.account_name = account_dict["name"] cookie_str = account_dict.get("cookie_str") or "" if not cookie_str: if _retry_login: logger.error("账号 %s 登录后 cookie 仍为空", self.account_name) return None logger.warning("账号 %s cookie_str 为空,尝试登录", self.account_name) YaoShiBangLogin().run() time.sleep(5) return self.get_token(_retry_login=True) try: cookie_dict = json.loads(cookie_str) except json.JSONDecodeError: logger.exception("账号 %s cookie_str 不是合法 JSON", self.account_name) return None token = cookie_dict.get("Token") or cookie_dict.get("token") if not token: logger.error("账号 %s cookie 中无 Token 字段: %s", self.account_name, list(cookie_dict.keys())) return None self.token = token logger.info("已刷新 token,账号=%s", self.account_name) return self.token def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) def pkcs7_unpad(self, data): if not data: raise ValueError("Empty data for PKCS7 unpad") pad_len = data[-1] if pad_len < 1 or pad_len > 16: raise ValueError("Invalid PKCS7 padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("Invalid PKCS7 padding bytes") return data[:-pad_len] def derive_key(self): base = "BhCLxFfFhd12K4qRGPfy" md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest() return md5_hex[:16].upper().encode("utf-8") def decrypt_payload(self, cipher_text_b64): key = self.derive_key() cipher_bytes = base64.b64decode(cipher_text_b64) cipher = AES.new(key, AES.MODE_ECB) decrypted = cipher.decrypt(cipher_bytes) unpadded = self.pkcs7_unpad(decrypted) json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16) return json.loads(json_bytes.decode("utf-8")) def gen_pair(self, ex1_len=9, o_raw_len=16): alphabet = string.ascii_lowercase + string.digits ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len)) o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii") return {"ex1": ex1, "o": o} def build_headers(self): return { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://dian.ysbang.cn", "Referer": "https://dian.ysbang.cn/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36" ), "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } def build_base_payload(self): keyword = self.product if self.brand: keyword = self.brand + " " + self.product if self.product_desc: keyword = keyword + self.product_desc date_str = time.strftime("%Y-%m-%d %H:%M:%S") return { "platform": "pc", "version": "6.0.0", "ua": "Chrome146", 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str), "trafficType": 1, "ex1": "", "o": "", "lastClick": -1, "page": 1, "pagesize": "60", "classify_id": "", "searchkey": keyword, "onlyTcm": 0, "operationtype": 1, "qualifiedLoanee": 0, "drugId": -1, "tagId": "", "showRecentlyPurchasedFlag": True, "onlySimpleLoan": 0, "sn": "", "buttons": [], "buttonList": [], "synonymId": 0, "activityTypes": [], "provider_filter": "", "factoryNames": "", "tcmGradeNames": [], "tcmExeStandardIds": [], "specs": "", "deliverFloor": 0, "purchaseLimitFloor": 0, "nextRequestKey": "", "adConfigId": 0, "stateValue": "", "firstSearch": True, "token": self.token, } def get_price(self, price_token): pattern = re.compile(r'(?