ysbang_crawl.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. import base64
  2. import hashlib
  3. import json
  4. import random
  5. import re
  6. import secrets
  7. import string
  8. import time
  9. import zlib
  10. from datetime import datetime, timedelta
  11. import requests
  12. from Crypto.Cipher import AES
  13. from commons.Logger import get_spider_logger
  14. from pipelines.drug_pipelines import DrugPipeline
  15. from area_info.city_name_to_id import get_city
  16. from commons.conn_mysql import MySQLPoolOnline
  17. from spiders.yaoshibang.login_yaoshibang import YaoShiBangLogin
  18. logger = get_spider_logger("yaoshibang")
  19. class YsbSpider:
  20. def __init__(self, drug_dict=None):
  21. self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270"
  22. self.headers = self.build_headers()
  23. self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
  24. self.platform = 5
  25. self.approval_num = ""
  26. self.task_dict = drug_dict or {}
  27. self.collect_task_id = None
  28. self.token = None
  29. self.account_name = "17097980383"
  30. self.pipeline = DrugPipeline("yaoshibang")
  31. if self.task_dict:
  32. self.get_product_data()
  33. self.is_success = True
  34. self.db_online = MySQLPoolOnline()
  35. def get_token(self, _retry_login=False):
  36. sql_account = f""" select `name`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=1 order by `cookie_timestamp` asc limit 1 """
  37. account_list = self.db_online.select_data(sql_account)
  38. if not account_list:
  39. logger.error("无可用爬取账号")
  40. return None
  41. account_dict = account_list[0]
  42. self.account_name = account_dict["name"]
  43. cookie_str = account_dict.get("cookie_str") or ""
  44. if not cookie_str:
  45. if _retry_login:
  46. logger.error("账号 %s 登录后 cookie 仍为空", self.account_name)
  47. return None
  48. logger.warning("账号 %s cookie_str 为空,尝试登录", self.account_name)
  49. YaoShiBangLogin().run()
  50. time.sleep(5)
  51. return self.get_token(_retry_login=True)
  52. try:
  53. cookie_dict = json.loads(cookie_str)
  54. except json.JSONDecodeError:
  55. logger.exception("账号 %s cookie_str 不是合法 JSON", self.account_name)
  56. return None
  57. token = cookie_dict.get("Token") or cookie_dict.get("token")
  58. if not token:
  59. logger.error("账号 %s cookie 中无 Token 字段: %s", self.account_name, list(cookie_dict.keys()))
  60. return None
  61. self.token = token
  62. logger.info("已刷新 token,账号=%s", self.account_name)
  63. return self.token
  64. def get_product_data(self):
  65. self.task_id = self.task_dict["id"]
  66. self.company_id = self.task_dict["company_id"]
  67. self.product = self.task_dict["product_name"]
  68. self.product_desc = self.task_dict.get("product_specs", "")
  69. self.brand = self.task_dict.get("product_brand", "")
  70. self.product_keyword = self.task_dict.get("product_keyword", "")
  71. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  72. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  73. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  74. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  75. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  76. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  77. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  78. self.collect_round = self.task_dict.get("collect_round", 1)
  79. def pkcs7_unpad(self, data):
  80. if not data:
  81. raise ValueError("Empty data for PKCS7 unpad")
  82. pad_len = data[-1]
  83. if pad_len < 1 or pad_len > 16:
  84. raise ValueError("Invalid PKCS7 padding length")
  85. if data[-pad_len:] != bytes([pad_len]) * pad_len:
  86. raise ValueError("Invalid PKCS7 padding bytes")
  87. return data[:-pad_len]
  88. def derive_key(self):
  89. base = "BhCLxFfFhd12K4qRGPfy"
  90. md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
  91. return md5_hex[:16].upper().encode("utf-8")
  92. def decrypt_payload(self, cipher_text_b64):
  93. key = self.derive_key()
  94. cipher_bytes = base64.b64decode(cipher_text_b64)
  95. cipher = AES.new(key, AES.MODE_ECB)
  96. decrypted = cipher.decrypt(cipher_bytes)
  97. unpadded = self.pkcs7_unpad(decrypted)
  98. json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
  99. return json.loads(json_bytes.decode("utf-8"))
  100. def gen_pair(self, ex1_len=9, o_raw_len=16):
  101. alphabet = string.ascii_lowercase + string.digits
  102. ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
  103. o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
  104. return {"ex1": ex1, "o": o}
  105. def build_headers(self):
  106. return {
  107. "Accept": "*/*",
  108. "Accept-Language": "zh-CN,zh;q=0.9",
  109. "Connection": "keep-alive",
  110. "Content-Type": "application/json",
  111. "Origin": "https://dian.ysbang.cn",
  112. "Referer": "https://dian.ysbang.cn/",
  113. "Sec-Fetch-Dest": "empty",
  114. "Sec-Fetch-Mode": "cors",
  115. "Sec-Fetch-Site": "same-origin",
  116. "User-Agent": (
  117. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  118. "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
  119. ),
  120. "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
  121. "sec-ch-ua-mobile": "?0",
  122. "sec-ch-ua-platform": '"Windows"',
  123. }
  124. def build_base_payload(self):
  125. keyword = self.product
  126. if self.brand:
  127. keyword = self.brand + " " + self.product
  128. if self.product_desc:
  129. keyword = keyword + self.product_desc
  130. date_str = time.strftime("%Y-%m-%d %H:%M:%S")
  131. return {
  132. "platform": "pc",
  133. "version": "6.0.0",
  134. "ua": "Chrome146",
  135. 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
  136. "trafficType": 1,
  137. "ex1": "",
  138. "o": "",
  139. "lastClick": -1,
  140. "page": 1,
  141. "pagesize": "60",
  142. "classify_id": "",
  143. "searchkey": keyword,
  144. "onlyTcm": 0,
  145. "operationtype": 1,
  146. "qualifiedLoanee": 0,
  147. "drugId": -1,
  148. "tagId": "",
  149. "showRecentlyPurchasedFlag": True,
  150. "onlySimpleLoan": 0,
  151. "sn": "",
  152. "buttons": [],
  153. "buttonList": [],
  154. "synonymId": 0,
  155. "activityTypes": [],
  156. "provider_filter": "",
  157. "factoryNames": "",
  158. "tcmGradeNames": [],
  159. "tcmExeStandardIds": [],
  160. "specs": "",
  161. "deliverFloor": 0,
  162. "purchaseLimitFloor": 0,
  163. "nextRequestKey": "",
  164. "adConfigId": 0,
  165. "stateValue": "",
  166. "firstSearch": True,
  167. "token": self.token,
  168. }
  169. def get_price(self, price_token):
  170. pattern = re.compile(r'(?<!\d)(\d+\.\d{2})(?!\d)')
  171. decoded = base64.b64decode(price_token)
  172. text_part = decoded.decode('utf-8', errors='ignore')
  173. numbers_from_text = pattern.findall(text_part)
  174. unique_prices = list(set(round(float(a), 2) for a in numbers_from_text))
  175. if not unique_prices:
  176. return ""
  177. last_prices = sorted(unique_prices, reverse=True)
  178. return last_prices[-1]
  179. def to_product(self, item, type_data):
  180. now = time.strftime("%Y-%m-%d %H:%M:%S")
  181. item_id = item.get("wholesaleid", "")
  182. provider_id = item.get("providerId", "")
  183. shop_url = f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  184. city_str = item.get("warehouseCity", "")
  185. city_id = province_id = city = province = ""
  186. price = item.get("disPrice", "")
  187. if not price:
  188. price = item.get("minprice", "")
  189. if not price:
  190. price = item.get("price", "")
  191. if not price:
  192. price_token = item.get("priceToken", "")
  193. if price_token:
  194. price = self.get_price(price_token)
  195. if not price:
  196. city_str, price = self.parse_detail(item_id, type_data)
  197. if city_str:
  198. city_id, province_id, city, province = get_city(city_str)
  199. shop_name = item.get("provider_name", "")
  200. if not shop_name:
  201. shop_name = item.get("abbreviation", "")
  202. product = {
  203. "platform": self.platform,
  204. "item_id": item_id,
  205. "enterprise_id": self.company_id,
  206. "product_name": item.get("drugname", ""),
  207. "spec": item.get("specification", ""),
  208. "one_price": '',
  209. "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
  210. "shop_name": shop_name,
  211. "anonymous_store_name": "",
  212. "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  213. "city_name": city,
  214. "city_id": city_id,
  215. "province_name": province,
  216. "province_id": province_id,
  217. "area_info": "",
  218. "factory_name": item.get("manufacturer", ""),
  219. "scrape_date": time.strftime("%Y-%m-%d"),
  220. "price": price,
  221. "sales": "",
  222. "stock_count": item.get("stockAvailable", ""),
  223. "snapshot_url": "",
  224. "approval_num": self.approval_num,
  225. "produced_time": item.get("prodDate", ""),
  226. "deadline": item.get("valid_date", ""),
  227. "update_time": now,
  228. "insert_time": now,
  229. "number": 1,
  230. "product_brand": self.brand or "",
  231. "collect_task_id": self.collect_task_id,
  232. "search_name": self.product,
  233. "company_name": "",
  234. "collect_config_info": json.dumps(
  235. {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
  236. "sampling_end_time": self.sampling_end_time}),
  237. "account_id": self.account_id,
  238. "collect_region_id": self.collect_region_id,
  239. "collect_round": self.collect_round,
  240. "is_sold_out": 0
  241. }
  242. return product
  243. def parse_detail(self, product_id, type_data):
  244. date_str = time.strftime("%Y-%m-%d %H:%M:%S")
  245. json_data = {
  246. 'platform': 'pc',
  247. 'version': '6.0.0',
  248. 'ua': 'Chrome146',
  249. 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
  250. 'trafficType': 1,
  251. 'ex1': 'qtrcqlxew',
  252. 'wholesaleid': str(product_id),
  253. 'showRecentlyPurchasedFlag': True,
  254. 'isClinic': 0,
  255. 'scene': [1, ],
  256. 'adConfigId': 0,
  257. 'token': self.token,
  258. }
  259. if type_data in [7]:
  260. json_data["wholesaleId"] = str(product_id)
  261. url = "https://dian.ysbang.cn/wholesale-drug/api/teambuy/getActivityDetail/v4260"
  262. else:
  263. json_data['wholesaleid'] = str(product_id)
  264. url = 'https://dian.ysbang.cn/wholesale-drug/sales/getPreferenceDetail/v5280'
  265. time.sleep(random.uniform(1, 3))
  266. response = requests.post(url, headers=self.headers, json=json_data)
  267. data_json = response.json()
  268. data = data_json.get("data", {})
  269. self.approval_num = data.get("druginfo", {}).get("approval", "")
  270. if not self.approval_num:
  271. team_info = (data.get("teamBuyDetailDrugInfo") or {}).get(
  272. "teamBuyDetailSingleSaleTypeDrugInfo"
  273. ) or {}
  274. self.approval_num = team_info.get("approval", "") or ""
  275. city_data = data.get("delivery_policy") or ""
  276. city_re = re.search(r"\[(\w+)\w+\]", str(city_data))
  277. price_dict = data.get("disPriceInfo", {})
  278. if not price_dict:
  279. price_dict = data.get("teamBuyDetailInfo", {})
  280. price = price_dict.get("disPrice", "")
  281. if not price:
  282. price = price_dict.get("minprice", "")
  283. if not price:
  284. price = data.get("price", "")
  285. if city_re:
  286. city_str = city_re.group(1)
  287. else:
  288. city_str = ""
  289. return city_str, price
  290. def search_data(self):
  291. if not self.task_dict:
  292. return
  293. page = 1
  294. while page < 100:
  295. logger.info(f"药师帮爬取第{page}页")
  296. pair = self.gen_pair()
  297. payload = self.build_base_payload()
  298. payload["ex1"] = pair["ex1"]
  299. payload["o"] = pair["o"]
  300. payload["page"] = page
  301. response = None
  302. for attempt in range(3):
  303. try:
  304. response = requests.post(
  305. self.url, headers=self.headers, json=payload, timeout=30
  306. )
  307. if response.status_code == 200:
  308. break
  309. except Exception as e:
  310. logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e)
  311. response = None
  312. time.sleep(10)
  313. if not response or response.status_code != 200:
  314. logger.error("第%s页请求失败,停止爬取", page)
  315. return
  316. try:
  317. data_json = response.json()
  318. except json.JSONDecodeError:
  319. logger.exception("第%s页响应不是合法 JSON", page)
  320. return
  321. data_block = data_json.get("data") or {}
  322. if data_json.get("message", "") == "该操作需要登录":
  323. logger.info("登录账号中。。。")
  324. YaoShiBangLogin().run()
  325. time.sleep(10)
  326. if not self.get_token():
  327. logger.error("登录后仍未从库中读到有效 Token,停止重试")
  328. return
  329. logger.info("token 已刷新,重试第 %s 页", page)
  330. continue
  331. encrypted_o = data_block.get("o")
  332. if not encrypted_o:
  333. logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
  334. break
  335. try:
  336. json_data = self.decrypt_payload(encrypted_o)
  337. except Exception as e:
  338. logger.exception("第%s页解密失败: %s", page, e)
  339. continue
  340. wholesales = json_data.get("wholesales", [])
  341. if not wholesales:
  342. logger.info(f"第{page}页无数据,停止")
  343. break
  344. # 获取国药准字
  345. for item in wholesales[0:5]:
  346. product_id = item.get("wholesaleid", "")
  347. type_data = item.get("activitytype", 0)
  348. self.parse_detail(product_id, type_data)
  349. if self.approval_num:
  350. break
  351. for item in wholesales:
  352. type_data = item.get("activitytype", 0)
  353. product = self.to_product(item, type_data)
  354. if not product.get("item_id"):
  355. continue
  356. try:
  357. self.pipeline.storge_data(product)
  358. logger.info("%s", json.dumps(product, ensure_ascii=False))
  359. except Exception as e:
  360. logger.exception("写入数据库失败: %s", e)
  361. page += 1
  362. def run(self):
  363. if not self.get_token():
  364. logger.error("启动失败:无可用 token")
  365. return 0, False
  366. try:
  367. self.search_data()
  368. except Exception as e:
  369. logger.error(e)
  370. logger.info(f"爬取总数{self.pipeline.crawl_count}")
  371. return self.pipeline.crawl_count, self.is_success