yaoex_crawl.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. import json
  2. import random
  3. import time
  4. import requests
  5. from commons.Logger import get_spider_logger
  6. import base64
  7. from Crypto.Cipher import AES
  8. from Crypto.Util.Padding import unpad
  9. from pipelines.drug_pipelines import DrugPipeline
  10. from area_info.city_name_to_id import get_city
  11. import hashlib
  12. logger = get_spider_logger("yaoex")
  13. TOKEN = "Sm45MzRmREtiaStVTnJORXEySHhYYzNwUmQ2RUprWXlwelRDem4wV2RZUCtUUU5jMGVCVTRYYjNLVjdNSnFWSjg1YStxWllGQ2RQSExjaEVqU0dOaDFJczl4bTB1V09CZHZzVml2dU0xazd3UDdla3FTUzZBZlZkMHFSVHlaaDhDcFp3SWNDb3JNSDhuNC9vUzI1RVdEaU01YjcxQW5TS21Sdy90ZDRENi9VR2E0SW5wOWF4UE1VZ0poTDhhVkJtP2FwcElkPTEyNTAma2V5SWQ9MTI1MA=="
  14. headers = {
  15. "Accept": "application/json, text/plain, */*",
  16. "Accept-Language": "zh-CN,zh;q=0.9",
  17. "Connection": "keep-alive",
  18. "Content-Type": "application/x-www-form-urlencoded",
  19. "Origin": "https://mall.yaoex.com",
  20. "Referer": "https://mall.yaoex.com/",
  21. "Sec-Fetch-Dest": "empty",
  22. "Sec-Fetch-Mode": "cors",
  23. "Sec-Fetch-Site": "cross-site",
  24. "User-Agent": (
  25. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  26. "AppleWebKit/537.36 (KHTML, like Gecko) "
  27. "Chrome/146.0.0.0 Safari/537.36"
  28. ),
  29. "X-Request-Agent": "Axios",
  30. "X-Requested-With": "XMLHttpRequest",
  31. "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
  32. "sec-ch-ua-mobile": "?0",
  33. "sec-ch-ua-platform": '"Windows"',
  34. }
  35. REQUEST_RETRY_COUNT = 3
  36. REQUEST_TIMEOUT_SEC = 20
  37. class YaoexCrawler:
  38. def __init__(self, drug_dict=None):
  39. self.token = TOKEN
  40. self.user_id = "181680"
  41. self.platform = 6
  42. self.task_dict = drug_dict or {}
  43. self.collect_task_id = None
  44. self.account_name = None
  45. self.pipeline = DrugPipeline("yaoex")
  46. if self.task_dict:
  47. self.get_product_data()
  48. self.is_success = True
  49. self.is_not_product = 0
  50. def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC):
  51. last_err = None
  52. for attempt in range(1, retries + 1):
  53. try:
  54. resp = requests.post(
  55. url,
  56. headers=headers,
  57. data=payload,
  58. timeout=timeout,
  59. )
  60. resp.raise_for_status()
  61. return resp
  62. except Exception as e:
  63. last_err = e
  64. if attempt < retries:
  65. logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e)
  66. time.sleep(min(2 * attempt, 5))
  67. else:
  68. logger.error("请求失败,已达最大重试次数(%s): %s", retries, e)
  69. raise last_err
  70. def get_product_data(self):
  71. self.task_id = self.task_dict["id"]
  72. self.company_id = self.task_dict["company_id"]
  73. self.product = self.task_dict["product_name"]
  74. self.product_desc = self.task_dict.get("product_specs", "")
  75. self.brand = self.task_dict.get("product_brand", "")
  76. self.product_keyword = self.task_dict.get("product_keyword", "")
  77. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  78. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  79. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  80. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  81. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  82. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  83. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  84. self.collect_round = self.task_dict.get("collect_round", 1)
  85. @staticmethod
  86. def _timestamp_ms() -> str:
  87. return str(int(time.time() * 1000))
  88. def _list_payload(self, keyword, page):
  89. return {
  90. "traderName": "yaoex_pc",
  91. "trader": "pc",
  92. "closesignature": "yes",
  93. "signature_method": "md5",
  94. "signature": "****",
  95. "timestamp": self._timestamp_ms(),
  96. "token": self.token,
  97. "userToken": self.token,
  98. "userId": self.user_id,
  99. "roleId": "101",
  100. "userType": "下游客户",
  101. "buyerCode": self.user_id,
  102. "nowPage": str(page),
  103. "per": "20",
  104. "keyword": keyword,
  105. "catSearchId": "",
  106. "specs": "",
  107. "factoryIds": "",
  108. "sellerCodes": "",
  109. "sellerFilterMode": "0",
  110. "sortColumn": "default",
  111. "sortMode": "default",
  112. "ver": "1",
  113. "stock_mode": "1",
  114. "showExtendCard": "true",
  115. "needDinnerPrice": "true",
  116. "limitStart": "",
  117. "limitEnd": "",
  118. "deadLineStart": "",
  119. "deadLineEnd": "",
  120. "filterDtos": "",
  121. "showWholePurchase": "true",
  122. }
  123. def _detail_payload(self, spu_code, seller_code):
  124. return {
  125. "traderName": "yaoex_pc",
  126. "trader": "pc",
  127. "closesignature": "yes",
  128. "signature_method": "md5",
  129. "signature": "****",
  130. "timestamp": self._timestamp_ms(),
  131. "token": self.token,
  132. "userToken": self.token,
  133. "spuCode": str(spu_code),
  134. "sellerCode": str(seller_code),
  135. }
  136. def _shop_payload(self, enterprise_id):
  137. return {
  138. 'traderName': 'yaoex_pc',
  139. 'trader': 'pc',
  140. 'closesignature': 'yes',
  141. 'signature_method': 'md5',
  142. 'signature': '****',
  143. 'timestamp': self._timestamp_ms(),
  144. 'token': TOKEN,
  145. 'userToken': TOKEN,
  146. 'enterpriseId': enterprise_id,
  147. }
  148. def fetch_list_page(self, keyword, page):
  149. payload = self._list_payload(keyword, page)
  150. list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList"
  151. resp = self._post_with_retry(list_url, payload)
  152. data = resp.json()
  153. recall_status = data.get("data", {}).get("recallStatus",0)
  154. if int(recall_status)==1:
  155. return data.get("data", {}).get("shopProducts", []) or []
  156. else:
  157. return []
  158. def fetch_detail(self, spu_code, seller_code):
  159. payload = self._detail_payload(spu_code, seller_code)
  160. detail_url = "https://gateway-b2b.fangkuaiyi.com/product/detail"
  161. resp = self._post_with_retry(detail_url, payload)
  162. return resp.json().get("data", {}) or {}
  163. def fetch_shop(self, seller_code):
  164. payload = self._shop_payload(seller_code)
  165. detail_url = 'https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification'
  166. resp = self._post_with_retry(detail_url, payload)
  167. shop_res = resp.json().get("data", {})
  168. base_info = shop_res.get("baseInfo", {})
  169. address = base_info.get("address", "")
  170. company_name = base_info.get("enterpriseName", "")
  171. return address, company_name
  172. def parse_product(self, item):
  173. seller_code = item.get("sellerCode")
  174. spu_code = item.get("spuCode")
  175. name_part = (item.get("productName") or "").strip()
  176. short_part = (item.get("shortName") or "").strip()
  177. product_name = f"{name_part} {short_part}".strip()
  178. shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home"
  179. # 这里读取数据库,获取城市
  180. company_adress, company_name = self.fetch_shop(seller_code)
  181. detail_json = self.fetch_detail(spu_code, seller_code)
  182. address = detail_json.get("enterpriseIntroduce", {}).get("address", "")
  183. city_id = province_id = city = province = ""
  184. if address:
  185. city_id, province_id, city, province = get_city(address.split("市")[0])
  186. raw_price = item.get("price")
  187. price = self.decrypt_price(raw_price)
  188. hash_text = str(seller_code)+str(price)
  189. item_id = hashlib.md5(hash_text.encode('utf-8')).hexdigest()
  190. is_sold_out = 0
  191. is_sold_out_text = item.get("statusDescription", "")
  192. if "商品已售罄" in is_sold_out_text:
  193. is_sold_out= 1
  194. shop_name = item.get("storeName")
  195. if not shop_name:
  196. shop_name = item.get("shopName")
  197. anonymous_store_name = ""
  198. if shop_name == "预约配送中心":
  199. anonymous_store_name = item.get("supplyName", "")
  200. inventory = item.get("currentInventory")
  201. if not inventory:
  202. inventory = item.get("stockCount")
  203. now = time.strftime("%Y-%m-%d %H:%M:%S")
  204. # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline)
  205. product = {
  206. "platform": self.platform,
  207. "item_id": item_id,
  208. "enterprise_id": self.company_id,
  209. "product_name": product_name,
  210. "spec": item.get("spec"),
  211. "one_price": "",
  212. "detail_url": f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}",
  213. "shop_name": shop_name,
  214. "anonymous_store_name": anonymous_store_name,
  215. "shop_url": shop_url,
  216. "city_name": city,
  217. "city_id": city_id,
  218. "province_name": province,
  219. "province_id": province_id,
  220. "shipment_city_name": "",
  221. "shipment_city_id": "",
  222. "shipment_province_name": "",
  223. "shipment_province_id": "",
  224. "area_info": company_adress if company_adress else address,
  225. "factory_name": item.get("factoryName"),
  226. "scrape_date": time.strftime("%Y-%m-%d"),
  227. "price": price,
  228. "sales": "",
  229. "stock_count": inventory,
  230. "snapshot_url": "",
  231. "approval_num": item.get("approvalNum"),
  232. "produced_time": item.get("productionTime"),
  233. "deadline": item.get("deadLine"),
  234. "update_time": now,
  235. "insert_time": now,
  236. "number": 1,
  237. "product_brand": self.brand or "",
  238. "collect_task_id": self.collect_task_id,
  239. "search_name": self.product,
  240. "company_name": company_name,
  241. "collect_config_info": json.dumps(
  242. {
  243. "sampling_cycle": self.sampling_cycle,
  244. "sampling_start_time": self.sampling_start_time,
  245. "sampling_end_time": self.sampling_end_time,
  246. }
  247. ),
  248. "account_id": self.account_id,
  249. "collect_region_id": self.collect_region_id,
  250. "collect_round": self.collect_round,
  251. "is_sold_out": is_sold_out
  252. }
  253. return product
  254. def decrypt_price(self, ciphertext_b64):
  255. if not ciphertext_b64 or not str(ciphertext_b64).strip():
  256. return ""
  257. _KEY_FIXED = "GDLSAUO1KUMIIBCE"
  258. if not self.user_id:
  259. key = _KEY_FIXED.encode("utf-8")
  260. else:
  261. uid = str(self.user_id)[:6].rjust(6, "0")
  262. key = (_KEY_FIXED[:10] + uid).encode("utf-8")
  263. raw = base64.b64decode(ciphertext_b64.strip())
  264. cipher = AES.new(key, AES.MODE_ECB)
  265. plain = unpad(cipher.decrypt(raw), AES.block_size)
  266. price = plain.decode("utf-8")
  267. return price
  268. def search_data(self):
  269. keyword = self.product
  270. if self.brand:
  271. keyword = self.brand + " " + self.product
  272. if self.product_desc:
  273. keyword = keyword + " " + self.product_desc
  274. for page in range(1, 100):
  275. logger.info("正在爬取%s %s,第%s页数据", self.brand, self.product, page)
  276. page_items = self.fetch_list_page(keyword=keyword, page=page)
  277. if not page_items:
  278. break
  279. for item in page_items:
  280. if not item.get("productId") and item.get("groupBuyProductDto"):
  281. item = item.get("groupBuyProductDto") or {}
  282. name_part = (item.get("productName") or "").strip()
  283. short_part = (item.get("shortName") or "").strip()
  284. product_name = f"{name_part} {short_part}".strip()
  285. if self.product not in product_name:
  286. self.is_not_product += 1
  287. continue
  288. if self.brand not in product_name:
  289. self.is_not_product += 1
  290. continue
  291. self.is_not_product = 0
  292. product = self.parse_product(item)
  293. if not product.get("item_id"):
  294. continue
  295. logger.info(f"爬取到数据{json.dumps(product,ensure_ascii=False)}")
  296. try:
  297. self.pipeline.storge_data(product)
  298. logger.info("%s", json.dumps(product, ensure_ascii=False))
  299. except Exception as e:
  300. logger.exception("写入数据库失败: %s", e)
  301. time.sleep(random.randint(1, 3))
  302. if self.is_not_product > 15:
  303. break
  304. time.sleep(random.randint(2, 5))
  305. def run(self):
  306. try:
  307. self.search_data()
  308. except Exception as e:
  309. logger.error(e)
  310. self.is_success = False
  311. logger.info(f"爬取总数{self.pipeline.crawl_count}")
  312. return self.pipeline.crawl_count, self.is_success