ysbang_crawl.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. import base64
  2. import hashlib
  3. import json
  4. import random
  5. import re
  6. import secrets
  7. import string
  8. import time
  9. import token
  10. import zlib
  11. from datetime import datetime, timedelta
  12. import requests
  13. from Crypto.Cipher import AES
  14. from openpyxl.worksheet import page
  15. from commons.Logger import get_spider_logger
  16. from pipelines.drug_pipelines import DrugPipeline
  17. from area_info.city_name_to_id import get_city
  18. from commons.conn_mysql import MySQLPoolOnline
  19. from spiders.yaoshibang.login_yaoshibang import YaoShiBangLogin
  20. logger = get_spider_logger("yaoshibang")
  21. class YsbSpider:
  22. def __init__(self, drug_dict=None):
  23. self.url = "https://dian.ysbang.cn/wholesale-drug/sales/getWholesaleList/v4270"
  24. self.headers = self.build_headers()
  25. self.start_date = (datetime.now() - timedelta(minutes=500)).strftime("%Y-%m-%d %H:%M")
  26. self.platform = 5
  27. self.approval_num = ""
  28. self.task_dict = drug_dict or {}
  29. self.collect_task_id = None
  30. self.token = None
  31. self.account_name = "17097980383"
  32. self.pipeline = DrugPipeline("yaoshibang")
  33. if self.task_dict:
  34. self.get_product_data()
  35. self.is_success = True
  36. self.db_online = MySQLPoolOnline()
  37. self.is_product_count = 0
  38. def get_token(self, _retry_login=False):
  39. sql_account = f""" select `name`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=3 order by `cookie_timestamp` asc limit 1 """
  40. account_list = self.db_online.select_data(sql_account)
  41. if not account_list:
  42. logger.error("无可用爬取账号")
  43. return None
  44. account_dict = account_list[0]
  45. self.account_name = account_dict["name"]
  46. cookie_str = account_dict.get("cookie_str") or ""
  47. if not cookie_str:
  48. if _retry_login:
  49. logger.error("账号 %s 登录后 cookie 仍为空", self.account_name)
  50. return None
  51. logger.warning("账号 %s cookie_str 为空,尝试登录", self.account_name)
  52. YaoShiBangLogin().run()
  53. time.sleep(5)
  54. return self.get_token(_retry_login=True)
  55. try:
  56. cookie_dict = json.loads(cookie_str)
  57. except json.JSONDecodeError:
  58. logger.exception("账号 %s cookie_str 不是合法 JSON", self.account_name)
  59. return None
  60. token = cookie_dict.get("Token") or cookie_dict.get("token")
  61. if not token:
  62. logger.error("账号 %s cookie 中无 Token 字段: %s", self.account_name, list(cookie_dict.keys()))
  63. return None
  64. self.token = token
  65. logger.info("已刷新 token,账号=%s", self.account_name)
  66. return self.token
  67. def get_product_data(self):
  68. self.task_id = self.task_dict["id"]
  69. self.company_id = self.task_dict["company_id"]
  70. self.product = self.task_dict["product_name"]
  71. self.product_desc = self.task_dict.get("product_specs", "")
  72. self.brand = self.task_dict.get("product_brand", "")
  73. self.product_keyword = self.task_dict.get("product_keyword", "")
  74. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  75. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  76. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  77. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  78. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  79. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  80. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  81. self.collect_round = self.task_dict.get("collect_round", 1)
  82. def pkcs7_unpad(self, data):
  83. if not data:
  84. raise ValueError("Empty data for PKCS7 unpad")
  85. pad_len = data[-1]
  86. if pad_len < 1 or pad_len > 16:
  87. raise ValueError("Invalid PKCS7 padding length")
  88. if data[-pad_len:] != bytes([pad_len]) * pad_len:
  89. raise ValueError("Invalid PKCS7 padding bytes")
  90. return data[:-pad_len]
  91. def derive_key(self):
  92. base = "BhCLxFfFhd12K4qRGPfy"
  93. md5_hex = hashlib.md5(base.encode("utf-8")).hexdigest()
  94. return md5_hex[:16].upper().encode("utf-8")
  95. def decrypt_payload(self, cipher_text_b64):
  96. key = self.derive_key()
  97. cipher_bytes = base64.b64decode(cipher_text_b64)
  98. cipher = AES.new(key, AES.MODE_ECB)
  99. decrypted = cipher.decrypt(cipher_bytes)
  100. unpadded = self.pkcs7_unpad(decrypted)
  101. json_bytes = zlib.decompress(unpadded, zlib.MAX_WBITS | 16)
  102. return json.loads(json_bytes.decode("utf-8"))
  103. def gen_pair(self, ex1_len=9, o_raw_len=16):
  104. alphabet = string.ascii_lowercase + string.digits
  105. ex1 = "".join(secrets.choice(alphabet) for _ in range(ex1_len))
  106. o = base64.b64encode(secrets.token_bytes(o_raw_len)).decode("ascii")
  107. return {"ex1": ex1, "o": o}
  108. def build_headers(self):
  109. return {
  110. "Accept": "*/*",
  111. "Accept-Language": "zh-CN,zh;q=0.9",
  112. "Connection": "keep-alive",
  113. "Content-Type": "application/json",
  114. "Origin": "https://dian.ysbang.cn",
  115. "Referer": "https://dian.ysbang.cn/",
  116. "Sec-Fetch-Dest": "empty",
  117. "Sec-Fetch-Mode": "cors",
  118. "Sec-Fetch-Site": "same-origin",
  119. "User-Agent": (
  120. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
  121. "(KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36"
  122. ),
  123. "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
  124. "sec-ch-ua-mobile": "?0",
  125. "sec-ch-ua-platform": '"Windows"',
  126. }
  127. def build_base_payload(self):
  128. keyword = self.product
  129. if self.brand:
  130. keyword = self.brand + " " + self.product
  131. if self.product_desc:
  132. keyword = keyword + self.product_desc
  133. date_str = time.strftime("%Y-%m-%d %H:%M")
  134. json_data = {
  135. 'platform': 'pc',
  136. 'version': '6.1.10',
  137. 'ua': 'Chrome148',
  138. 'ex': f'{date_str} https://dian.ysbang.cn os=Windows 10 indexContent 05-25 11:20:53 05-27 17:27:01',
  139. 'trafficType': 1,
  140. 'ex1': '',
  141. 'o': '',
  142. 'lastClick': -1,
  143. 'page': page,
  144. 'pagesize': '60',
  145. 'classify_id': '',
  146. 'searchkey': keyword,
  147. 'onlyTcm': 0,
  148. 'operationtype': 1,
  149. 'qualifiedLoanee': 0,
  150. 'drugId': -1,
  151. 'tagId': '',
  152. 'showRecentlyPurchasedFlag': True,
  153. 'onlySimpleLoan': 0,
  154. 'sn': '',
  155. 'buttons': [],
  156. 'buttonList': [],
  157. 'synonymId': 0,
  158. 'activityTypes': [],
  159. 'provider_filter': '',
  160. 'factoryNames': '',
  161. 'tcmGradeNames': [],
  162. 'tcmExeStandardIds': [],
  163. 'specs': '',
  164. 'deliverFloor': 0,
  165. 'purchaseLimitFloor': 0,
  166. 'nextRequestKey': '',
  167. 'adConfigId': 0,
  168. 'stateValue': '',
  169. 'filterLeyoProvider': False,
  170. 'firstSearch': False,
  171. 'token': self.token,
  172. }
  173. return json_data
  174. def get_price(self, price_token):
  175. pattern = re.compile(r'(?<!\d)(\d+\.\d{2})(?!\d)')
  176. decoded = base64.b64decode(price_token)
  177. text_part = decoded.decode('utf-8', errors='ignore')
  178. numbers_from_text = pattern.findall(text_part)
  179. unique_prices = list(set(round(float(a), 2) for a in numbers_from_text))
  180. if not unique_prices:
  181. return ""
  182. last_prices = sorted(unique_prices, reverse=True)
  183. return last_prices[-1]
  184. def to_product(self, item, type_data):
  185. now = time.strftime("%Y-%m-%d %H:%M:%S")
  186. item_id = item.get("wholesaleid", "")
  187. provider_id = item.get("providerId", "")
  188. shop_url = f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  189. city_str = item.get("warehouseCity", "")
  190. city_id = province_id = city = province = ""
  191. price = item.get("disPrice", "")
  192. if not price:
  193. price = item.get("minprice", "")
  194. if not price:
  195. price = item.get("price", "")
  196. if not price:
  197. price_token = item.get("priceToken", "")
  198. if price_token:
  199. price = self.get_price(price_token)
  200. if not price:
  201. city_str, price = self.parse_detail(item_id, type_data)
  202. if city_str:
  203. city_id, province_id, city, province = get_city(city_str)
  204. shop_name = item.get("provider_name", "")
  205. if not shop_name:
  206. shop_name = item.get("abbreviation", "")
  207. brand = item.get("brand","")
  208. product = {
  209. "platform": self.platform,
  210. "item_id": item_id,
  211. "enterprise_id": self.company_id,
  212. "product_name": item.get("drugname", ""),
  213. "spec": item.get("specification", ""),
  214. "one_price": '',
  215. "detail_url": f"https://dian.ysbang.cn/#/drugInfo?wholesaleid={item_id}&trafficType=1",
  216. "shop_name": shop_name,
  217. "anonymous_store_name": "",
  218. "shop_url": f"https://dian.ysbang.cn/#/supplierstore?providerId={provider_id}&trafficType=4",
  219. "city_name": city,
  220. "city_id": city_id,
  221. "province_name": province,
  222. "province_id": province_id,
  223. "area_info": "",
  224. "factory_name": item.get("manufacturer", ""),
  225. "scrape_date": time.strftime("%Y-%m-%d"),
  226. "price": price,
  227. "sales": "",
  228. "stock_count": item.get("stockAvailable", ""),
  229. "snapshot_url": "",
  230. "approval_num": self.approval_num,
  231. "produced_time": item.get("prodDate", ""),
  232. "deadline": item.get("valid_date", ""),
  233. "update_time": now,
  234. "insert_time": now,
  235. "number": 1,
  236. "product_brand": brand,
  237. "collect_task_id": self.collect_task_id,
  238. "search_name": self.product,
  239. "company_name": "",
  240. "collect_config_info": json.dumps(
  241. {"sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time,
  242. "sampling_end_time": self.sampling_end_time}),
  243. "account_id": self.account_id,
  244. "collect_region_id": self.collect_region_id,
  245. "collect_round": self.collect_round,
  246. "is_sold_out": 0
  247. }
  248. return product
  249. def parse_detail(self, product_id, type_data):
  250. date_str = time.strftime("%Y-%m-%d %H:%M:%S")
  251. json_data = {
  252. 'platform': 'pc',
  253. 'version': '6.0.0',
  254. 'ua': 'Chrome146',
  255. 'ex': '{} drugInfo {} {}'.format(self.start_date, date_str, date_str),
  256. 'trafficType': 1,
  257. 'ex1': 'qtrcqlxew',
  258. 'wholesaleid': str(product_id),
  259. 'showRecentlyPurchasedFlag': True,
  260. 'isClinic': 0,
  261. 'scene': [1, ],
  262. 'adConfigId': 0,
  263. 'token': self.token,
  264. }
  265. if type_data in [7]:
  266. json_data["wholesaleId"] = str(product_id)
  267. url = "https://dian.ysbang.cn/wholesale-drug/api/teambuy/getActivityDetail/v4260"
  268. else:
  269. json_data['wholesaleid'] = str(product_id)
  270. url = 'https://dian.ysbang.cn/wholesale-drug/sales/getPreferenceDetail/v5280'
  271. time.sleep(random.uniform(1, 3))
  272. response = requests.post(url, headers=self.headers, json=json_data)
  273. data_json = response.json()
  274. data = data_json.get("data", {})
  275. self.approval_num = data.get("druginfo", {}).get("approval", "")
  276. if not self.approval_num:
  277. team_info = (data.get("teamBuyDetailDrugInfo") or {}).get(
  278. "teamBuyDetailSingleSaleTypeDrugInfo"
  279. ) or {}
  280. self.approval_num = team_info.get("approval", "") or ""
  281. city_data = data.get("delivery_policy") or ""
  282. city_re = re.search(r"\[(\w+)\w+\]", str(city_data))
  283. price_dict = data.get("disPriceInfo", {})
  284. if not price_dict:
  285. price_dict = data.get("teamBuyDetailInfo", {})
  286. price = price_dict.get("disPrice", "")
  287. if not price:
  288. price = price_dict.get("minprice", "")
  289. if not price:
  290. price = data.get("price", "")
  291. if city_re:
  292. city_str = city_re.group(1)
  293. else:
  294. city_str = ""
  295. return city_str, price
  296. def search_data(self):
  297. if not self.task_dict:
  298. return
  299. page = 1
  300. while page < 100:
  301. logger.info(f"药师帮爬取第{page}页")
  302. pair = self.gen_pair()
  303. payload = self.build_base_payload()
  304. payload["ex1"] = pair["ex1"]
  305. payload["o"] = pair["o"]
  306. payload["page"] = page
  307. response = None
  308. for attempt in range(3):
  309. try:
  310. response = requests.post(
  311. self.url, headers=self.headers, json=payload, timeout=30
  312. )
  313. if response.status_code == 200:
  314. break
  315. except Exception as e:
  316. logger.error("第%s页请求失败 (%s/3): %s", page, attempt + 1, e)
  317. response = None
  318. time.sleep(10)
  319. if not response or response.status_code != 200:
  320. self.is_success = False
  321. logger.error("第%s页请求失败,停止爬取", page)
  322. return
  323. try:
  324. data_json = response.json()
  325. except json.JSONDecodeError:
  326. logger.exception("第%s页响应不是合法 JSON", page)
  327. self.is_success = False
  328. return
  329. data_block = data_json.get("data") or {}
  330. if data_json.get("message", "") in ["该操作需要登录","需要前端行为验证!"] :
  331. logger.info("登录账号中。。。")
  332. YaoShiBangLogin().run()
  333. time.sleep(10)
  334. if not self.get_token():
  335. logger.error("登录后仍未从库中读到有效 Token,停止重试")
  336. self.is_success = False
  337. return
  338. logger.info("token 已刷新,重试第 %s 页", page)
  339. continue
  340. encrypted_o = data_block.get("o")
  341. if not encrypted_o:
  342. logger.warning("第%s页返回无加密 data.o: %s", page, data_json)
  343. break
  344. try:
  345. json_data = self.decrypt_payload(encrypted_o)
  346. except Exception as e:
  347. logger.exception("第%s页解密失败: %s", page, e)
  348. self.is_success = False
  349. return
  350. wholesales = json_data.get("wholesales", [])
  351. if not wholesales:
  352. logger.info(f"第{page}页无数据,停止")
  353. break
  354. # 获取国药准字
  355. for item in wholesales[0:5]:
  356. product_id = item.get("wholesaleid", "")
  357. type_data = item.get("activitytype", 0)
  358. self.parse_detail(product_id, type_data)
  359. if self.approval_num:
  360. break
  361. for item in wholesales:
  362. type_data = item.get("activitytype", 0)
  363. product = self.to_product(item, type_data)
  364. if not product.get("item_id"):
  365. continue
  366. title = product.get("product_name","")
  367. if self.brand not in title:
  368. self.is_product_count +=1
  369. if self.product not in title:
  370. self.is_product_count +=1
  371. continue
  372. if self.product in title and self.brand in title:
  373. self.is_product_count = 0
  374. try:
  375. self.pipeline.storge_data(product)
  376. logger.info("%s", json.dumps(product, ensure_ascii=False))
  377. except Exception as e:
  378. logger.exception("写入数据库失败: %s", e)
  379. if self.is_product_count >= 20:
  380. return
  381. page += 1
  382. def run(self):
  383. if not self.get_token():
  384. logger.error("启动失败:无可用 token")
  385. return 0, False
  386. try:
  387. self.search_data()
  388. except Exception as e:
  389. self.is_success = False
  390. logger.error(e)
  391. logger.info(f"爬取总数{self.pipeline.crawl_count}")
  392. return self.pipeline.crawl_count, self.is_success