ysbang_crawl.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. import base64
  2. import hashlib
  3. import json
  4. import random
  5. import re
  6. import secrets
  7. import string
  8. import time
  9. import zlib
  10. from datetime import datetime, timedelta
  11. import requests
  12. from Crypto.Cipher import AES
  13. from commons.Logger import get_spider_logger
  14. from pipelines.drug_pipelines import DrugPipeline
  15. from area_info.city_name_to_id import get_city
  16. from commons.conn_mysql import MySQLPoolOnline
  17. logger = get_spider_logger("yaoshibang")
  18. TOKEN = "bd2197bc55da4a11a94ca40c428c5529"
  19. class YsbSpider:
  20. def __init__(self, drug_dict=None):
  21. self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270"
  22. self.headers = self.build_headers()
  23. self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
  24. self.platform = 5
  25. self.approval_num = ""
  26. self.task_dict = drug_dict or {}
  27. self.collect_task_id = None
  28. self.account_name = "17097980383"
  29. self.pipeline = DrugPipeline("yaoshibang")
  30. if self.task_dict:
  31. self.get_product_data()
  32. self.is_success = True
  33. self.db_online = MySQLPoolOnline()
  34. def get_product_data(self):
  35. self.task_id = self.task_dict["id"]
  36. self.company_id = self.task_dict["company_id"]
  37. self.product = self.task_dict["product_name"]
  38. self.product_desc = self.task_dict.get("product_specs", "")
  39. self.brand = self.task_dict.get("product_brand", "")
  40. self.product_keyword = self.task_dict.get("product_keyword", "")
  41. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  42. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  43. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  44. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  45. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  46. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  47. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  48. self.collect_round = self.task_dict.get("collect_round", 1)
  49. def pkcs7_unpad(self, data):
  50. if not data:
  51. raise ValueError("Empty data for PKCS7 unpad")
  52. pad_len = data[-1]
  53. if pad_len < 1 or pad_len > 16:
  54. raise ValueError("Invalid PKCS7 padding length")
  55. if data[-pad_len:] != bytes([pad_len]) * pad_len:
  56. raise ValueError("Invalid PKCS7 padding bytes")
  57. return data[:-pad_len]
  58. def derive_key(self):
  59. base = "BhCLxFfFhd12K4qRGPfy"
  60. md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
  61. return md5_hex[:16].upper().encode("utf-8")
  62. def decrypt_payload(self, cipher_text_b64):
  63. key = self.derive_key()
  64. cipher_bytes = base64.b64decode(cipher_text_b64)
  65. cipher = AES.new(key, AES.MODE_ECB)
  66. decrypted = cipher.decrypt(cipher_bytes)
  67. unpadded = self.pkcs7_unpad(decrypted)
  68. json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
  69. return json.loads(json_bytes.decode("utf-8"))
  70. def gen_pair(self, ex1_len=9, o_raw_len=16):
  71. alphabet = string.ascii_lowercase + string.digits
  72. ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
  73. o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
  74. return {"ex1": ex1, "o": o}
  75. def build_headers(self):
  76. return {
  77. "Accept": "*/*",
  78. "Accept-Language": "zh-CN,zh;q=0.9",
  79. "Connection": "keep-alive",
  80. "Content-Type": "application/json",
  81. "Origin": "https://dian.ysbang.cn",
  82. "Referer": "https://dian.ysbang.cn/",
  83. "Sec-Fetch-Dest": "empty",
  84. "Sec-Fetch-Mode": "cors",
  85. "Sec-Fetch-Site": "same-origin",
  86. "User-Agent": (
  87. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  88. "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
  89. ),
  90. "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
  91. "sec-ch-ua-mobile": "?0",
  92. "sec-ch-ua-platform": '"Windows"',
  93. }
  94. def build_base_payload(self):
  95. keyword = ""
  96. if self.brand:
  97. keyword = self.brand + " " + self.product
  98. if self.product_desc:
  99. keyword = keyword + self.product_desc
  100. date_str = time.strftime("%Y-%m-%d %H:%M:%S")
  101. return {
  102. "platform": "pc",
  103. "version": "6.0.0",
  104. "ua": "Chrome146",
  105. 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
  106. "trafficType": 1,
  107. "ex1": "",
  108. "o": "",
  109. "lastClick": -1,
  110. "page": 1,
  111. "pagesize": "60",
  112. "classify_id": "",
  113. "searchkey": keyword,
  114. "onlyTcm": 0,
  115. "operationtype": 1,
  116. "qualifiedLoanee": 0,
  117. "drugId": -1,
  118. "tagId": "",
  119. "showRecentlyPurchasedFlag": True,
  120. "onlySimpleLoan": 0,
  121. "sn": "",
  122. "buttons": [],
  123. "buttonList": [],
  124. "synonymId": 0,
  125. "activityTypes": [],
  126. "provider_filter": "",
  127. "factoryNames": "",
  128. "tcmGradeNames": [],
  129. "tcmExeStandardIds": [],
  130. "specs": "",
  131. "deliverFloor": 0,
  132. "purchaseLimitFloor": 0,
  133. "nextRequestKey": "",
  134. "adConfigId": 0,
  135. "stateValue": "",
  136. "firstSearch": True,
  137. "token": TOKEN,
  138. }
  139. def get_price(self, price_token):
  140. pattern = re.compile(r'(?<!\d)(\d+\.\d{2})(?!\d)')
  141. decoded = base64.b64decode(price_token)
  142. text_part = decoded.decode('utf-8', errors='ignore')
  143. numbers_from_text = pattern.findall(text_part)
  144. unique_prices = list(set(round(float(a), 2) for a in numbers_from_text))
  145. last_prices = sorted(unique_prices, reverse=True)
  146. return last_prices[-1]
  147. def to_product(self, item, type_data):
  148. now = time.strftime("%Y-%m-%d %H:%M:%S")
  149. item_id = item.get("wholesaleid", "")
  150. provider_id = item.get("providerId", "")
  151. shop_url = f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  152. city_str = item.get("warehouseCity", "")
  153. city_id = province_id = city = province = ""
  154. price = item.get("disPrice", "")
  155. if not price:
  156. price = item.get("minprice", "")
  157. if not price:
  158. price = item.get("price", "")
  159. if not price:
  160. price_token = item.get("priceToken", "")
  161. if price_token:
  162. price = self.get_price(price_token)
  163. if not price:
  164. city_str, price = self.parse_detail(item_id, type_data)
  165. if city_str:
  166. city_id, province_id, city, province = get_city(city_str)
  167. shop_name = item.get("provider_name", "")
  168. if not shop_name:
  169. shop_name = item.get("abbreviation", "")
  170. product = {
  171. "platform": self.platform,
  172. "item_id": item_id,
  173. "enterprise_id": self.company_id,
  174. "product_name": item.get("drugname", ""),
  175. "spec": item.get("specification", ""),
  176. "one_price": '',
  177. "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
  178. "shop_name": shop_name,
  179. "anonymous_store_name": "",
  180. "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  181. "city_name": city,
  182. "city_id": city_id,
  183. "province_name": province,
  184. "province_id": province_id,
  185. "area_info": "",
  186. "factory_name": item.get("manufacturer", ""),
  187. "scrape_date": time.strftime("%Y-%m-%d"),
  188. "price": price,
  189. "sales": "",
  190. "stock_count": item.get("stockAvailable", ""),
  191. "snapshot_url": "",
  192. "approval_num": self.approval_num,
  193. "produced_time": item.get("prodDate", ""),
  194. "deadline": item.get("valid_date", ""),
  195. "update_time": now,
  196. "insert_time": now,
  197. "number": 1,
  198. "product_brand": self.brand or "",
  199. "collect_task_id": self.collect_task_id,
  200. "search_name": self.product,
  201. "company_name": "",
  202. "collect_config_info": json.dumps(
  203. {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
  204. "sampling_end_time": self.sampling_end_time}),
  205. "account_id": self.account_id,
  206. "collect_region_id": self.collect_region_id,
  207. "collect_round": self.collect_round,
  208. "is_sold_out": 1
  209. }
  210. return product
  211. def parse_detail(self, product_id, type_data):
  212. date_str = time.strftime("%Y-%m-%d %H:%M:%S")
  213. json_data = {
  214. 'platform': 'pc',
  215. 'version': '6.0.0',
  216. 'ua': 'Chrome146',
  217. 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
  218. 'trafficType': 1,
  219. 'ex1': 'qtrcqlxew',
  220. 'wholesaleid': str(product_id),
  221. 'showRecentlyPurchasedFlag': True,
  222. 'isClinic': 0,
  223. 'scene': [1, ],
  224. 'adConfigId': 0,
  225. 'token': TOKEN,
  226. }
  227. if type_data in [7]:
  228. json_data["wholesaleId"] = str(product_id)
  229. url = "https://dian.ysbang.cn/wholesale-drug/api/teambuy/getActivityDetail/v4260"
  230. else:
  231. json_data['wholesaleid'] = str(product_id)
  232. url = 'https://dian.ysbang.cn/wholesale-drug/sales/getPreferenceDetail/v5280'
  233. time.sleep(random.uniform(1, 3))
  234. response = requests.post(url, headers=self.headers, json=json_data)
  235. data_json = response.json()
  236. data = data_json.get("data", {})
  237. self.approval_num = data.get("druginfo", {}).get("approval", "")
  238. if not self.approval_num:
  239. self.approval_num = data.get("teamBuyDetailDrugInfo", {}).get("teamBuyDetailSingleSaleTypeDrugInfo",
  240. {}).get("approval")
  241. city_data = data.get("delivery_policy", "")
  242. city_re = re.search(r"\[(\w+)\w+\]", city_data)
  243. price_dict = data.get("disPriceInfo", {})
  244. if not price_dict:
  245. price_dict = data.get("teamBuyDetailInfo", {})
  246. price = price_dict.get("disPrice", "")
  247. if not price:
  248. price = price_dict.get("minprice", "")
  249. if not price:
  250. price = data.get("price", "")
  251. if city_re:
  252. city_str = city_re.group(1)
  253. else:
  254. city_str = ""
  255. return city_str, price
  256. def search_data(self):
  257. if not self.task_dict:
  258. return
  259. for page in range(1, 100):
  260. pair = self.gen_pair()
  261. payload = self.build_base_payload()
  262. payload["ex1"] = pair["ex1"]
  263. payload["o"] = pair["o"]
  264. payload["page"] = page
  265. response = None
  266. for i in range(3):
  267. try:
  268. response = requests.post(
  269. self.url,
  270. headers=self.headers,
  271. json=payload,
  272. timeout=30,
  273. )
  274. if response.status_code == 200:
  275. break
  276. except Exception as e:
  277. response = None
  278. time.sleep(10)
  279. continue
  280. if not response:
  281. self.is_success = False
  282. return
  283. data_json = response.json()
  284. data_block = data_json.get("data") or {}
  285. encrypted_o = data_block.get("o")
  286. if not encrypted_o:
  287. logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
  288. break
  289. json_data = self.decrypt_payload(encrypted_o)
  290. wholesales = json_data.get("wholesales", [])
  291. if not wholesales:
  292. logger.info(f"第{page}页无数据,停止")
  293. break
  294. # 获取国药准字
  295. for item in wholesales[0:5]:
  296. product_id = item.get("wholesaleid", "")
  297. type_data = item.get("activitytype", 0)
  298. self.parse_detail(product_id, type_data)
  299. if self.approval_num:
  300. break
  301. for item in wholesales:
  302. type_data = item.get("activitytype", 0)
  303. product = self.to_product(item, type_data)
  304. if not product.get("item_id"):
  305. continue
  306. try:
  307. self.pipeline.storge_data(product)
  308. logger.info("%s", json.dumps(product, ensure_ascii=False))
  309. except Exception as e:
  310. logger.exception("写入数据库失败: %s", e)
  311. def run(self):
  312. try:
  313. self.search_data()
  314. except Exception as e:
  315. self.is_success = False
  316. logger.error(e)
  317. logger.info(f"爬取总数{self.pipeline.crawl_count}")
  318. return self.pipeline.crawl_count, self.is_success