import base64 import hashlib import json import random import re import secrets import string import time import zlib from datetime import datetime, timedelta import requests from Crypto.Cipher import AES from commons.Logger import get_spider_logger from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city from commons.conn_mysql import MySQLPoolOnline logger = get_spider_logger("yaoshibang") TOKEN = "bd2197bc55da4a11a94ca40c428c5529" class YsbSpider: def __init__(self, drug_dict=None): self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270" self.headers = self.build_headers() self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M") self.platform = 5 self.approval_num = "" self.task_dict = drug_dict or {} self.collect_task_id = None self.account_name = "17097980383" self.pipeline = DrugPipeline("yaoshibang") if self.task_dict: self.get_product_data() self.is_success = True self.db_online = MySQLPoolOnline() def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) def pkcs7_unpad(self, data): if not data: raise ValueError("Empty data for PKCS7 unpad") pad_len = data[-1] if pad_len < 1 or pad_len > 16: raise ValueError("Invalid PKCS7 padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len: raise ValueError("Invalid PKCS7 padding bytes") return data[:-pad_len] def derive_key(self): base = "BhCLxFfFhd12K4qRGPfy" md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest() return md5_hex[:16].upper().encode("utf-8") def decrypt_payload(self, cipher_text_b64): key = self.derive_key() cipher_bytes = base64.b64decode(cipher_text_b64) cipher = AES.new(key, AES.MODE_ECB) decrypted = cipher.decrypt(cipher_bytes) unpadded = self.pkcs7_unpad(decrypted) json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16) return json.loads(json_bytes.decode("utf-8")) def gen_pair(self, ex1_len=9, o_raw_len=16): alphabet = string.ascii_lowercase + string.digits ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len)) o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii") return {"ex1": ex1, "o": o} def build_headers(self): return { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/json", "Origin": "https://dian.ysbang.cn", "Referer": "https://dian.ysbang.cn/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36" ), "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } def build_base_payload(self): keyword = "" if self.brand: keyword = self.brand + " " + self.product if self.product_desc: keyword = keyword + self.product_desc date_str = time.strftime("%Y-%m-%d %H:%M:%S") return { "platform": "pc", "version": "6.0.0", "ua": "Chrome146", 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str), "trafficType": 1, "ex1": "", "o": "", "lastClick": -1, "page": 1, "pagesize": "60", "classify_id": "", "searchkey": keyword, "onlyTcm": 0, "operationtype": 1, "qualifiedLoanee": 0, "drugId": -1, "tagId": "", "showRecentlyPurchasedFlag": True, "onlySimpleLoan": 0, "sn": "", "buttons": [], "buttonList": [], "synonymId": 0, "activityTypes": [], "provider_filter": "", "factoryNames": "", "tcmGradeNames": [], "tcmExeStandardIds": [], "specs": "", "deliverFloor": 0, "purchaseLimitFloor": 0, "nextRequestKey": "", "adConfigId": 0, "stateValue": "", "firstSearch": True, "token": TOKEN, } def get_price(self, price_token): pattern = re.compile(r'(?