yaoex_crawl.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. import json
  2. import random
  3. import time
  4. import requests
  5. from commons.Logger import get_spider_logger
  6. import base64
  7. from Crypto.Cipher import AES
  8. from Crypto.Util.Padding import unpad
  9. from pipelines.drug_pipelines import DrugPipeline
  10. from area_info.city_name_to_id import get_city
  11. import hashlib
  12. logger = get_spider_logger("yaoex")
  13. TOKEN = "Sm45MzRmREtiaStVTnJORXEySHhYYzNwUmQ2RUprWXlwelRDem4wV2RZUCtUUU5jMGVCVTRYYjNLVjdNSnFWSjg1YStxWllGQ2RQSExjaEVqU0dOaDFJczl4bTB1V09CZHZzVml2dU0xazd3UDdla3FTUzZBZlZkMHFSVHlaaDhDcFp3SWNDb3JNSDhuNC9vUzI1RVdEaU01YjcxQW5TS21Sdy90ZDRENi9VR2E0SW5wOWF4UE1VZ0poTDhhVkJtP2FwcElkPTEyNTAma2V5SWQ9MTI1MA=="
  14. headers = {
  15. "Accept": "application/json, text/plain, */*",
  16. "Accept-Language": "zh-CN,zh;q=0.9",
  17. "Connection": "keep-alive",
  18. "Content-Type": "application/x-www-form-urlencoded",
  19. "Origin": "https://mall.yaoex.com",
  20. "Referer": "https://mall.yaoex.com/",
  21. "Sec-Fetch-Dest": "empty",
  22. "Sec-Fetch-Mode": "cors",
  23. "Sec-Fetch-Site": "cross-site",
  24. "User-Agent": (
  25. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  26. "AppleWebKit/537.36 (KHTML, like Gecko) "
  27. "Chrome/146.0.0.0 Safari/537.36"
  28. ),
  29. "X-Request-Agent": "Axios",
  30. "X-Requested-With": "XMLHttpRequest",
  31. "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
  32. "sec-ch-ua-mobile": "?0",
  33. "sec-ch-ua-platform": '"Windows"',
  34. }
  35. REQUEST_RETRY_COUNT = 3
  36. REQUEST_TIMEOUT_SEC = 20
  37. class YaoexCrawler:
  38. def __init__(self, drug_dict=None):
  39. self.token = TOKEN
  40. self.user_id = "181680"
  41. self.platform = 6
  42. self.task_dict = drug_dict or {}
  43. self.collect_task_id = None
  44. self.account_name = None
  45. self.pipeline = DrugPipeline("yaoex")
  46. if self.task_dict:
  47. self.get_product_data()
  48. self.is_success = True
  49. self.is_not_product = 0
  50. def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC):
  51. last_err = None
  52. for attempt in range(1, retries + 1):
  53. try:
  54. resp = requests.post(
  55. url,
  56. headers=headers,
  57. data=payload,
  58. timeout=timeout,
  59. )
  60. resp.raise_for_status()
  61. return resp
  62. except Exception as e:
  63. last_err = e
  64. if attempt < retries:
  65. logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e)
  66. time.sleep(min(2 * attempt, 5))
  67. else:
  68. logger.error("请求失败,已达最大重试次数(%s): %s", retries, e)
  69. raise last_err
  70. def get_product_data(self):
  71. self.task_id = self.task_dict["id"]
  72. self.company_id = self.task_dict["company_id"]
  73. self.product = self.task_dict["product_name"]
  74. self.product_desc = self.task_dict.get("product_specs", "")
  75. self.brand = self.task_dict.get("product_brand", "")
  76. self.product_keyword = self.task_dict.get("product_keyword", "")
  77. self.collect_task_id = self.task_dict.get("collect_task_id", "")
  78. self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
  79. self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
  80. self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
  81. self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
  82. self.account_id = self.task_dict.get("collect_equipment_account_id", "")
  83. self.collect_region_id = self.task_dict.get("collect_region_id", "")
  84. self.collect_round = self.task_dict.get("collect_round", 1)
  85. @staticmethod
  86. def _timestamp_ms() -> str:
  87. return str(int(time.time() * 1000))
  88. def _list_payload(self, keyword, page):
  89. return {
  90. "traderName": "yaoex_pc",
  91. "trader": "pc",
  92. "closesignature": "yes",
  93. "signature_method": "md5",
  94. "signature": "****",
  95. "timestamp": self._timestamp_ms(),
  96. "token": self.token,
  97. "userToken": self.token,
  98. "userId": self.user_id,
  99. "roleId": "101",
  100. "userType": "下游客户",
  101. "buyerCode": self.user_id,
  102. "nowPage": str(page),
  103. "per": "20",
  104. "keyword": keyword,
  105. "catSearchId": "",
  106. "specs": "",
  107. "factoryIds": "",
  108. "sellerCodes": "",
  109. "sellerFilterMode": "0",
  110. "sortColumn": "default",
  111. "sortMode": "default",
  112. "ver": "1",
  113. "stock_mode": "1",
  114. "showExtendCard": "true",
  115. "needDinnerPrice": "true",
  116. "limitStart": "",
  117. "limitEnd": "",
  118. "deadLineStart": "",
  119. "deadLineEnd": "",
  120. "filterDtos": "",
  121. "showWholePurchase": "true",
  122. }
  123. def _detail_payload(self, spu_code, seller_code):
  124. return {
  125. "traderName": "yaoex_pc",
  126. "trader": "pc",
  127. "closesignature": "yes",
  128. "signature_method": "md5",
  129. "signature": "****",
  130. "timestamp": self._timestamp_ms(),
  131. "token": self.token,
  132. "userToken": self.token,
  133. "spuCode": str(spu_code),
  134. "sellerCode": str(seller_code),
  135. }
  136. def _shop_payload(self, enterprise_id):
  137. return {
  138. 'traderName': 'yaoex_pc',
  139. 'trader': 'pc',
  140. 'closesignature': 'yes',
  141. 'signature_method': 'md5',
  142. 'signature': '****',
  143. 'timestamp': self._timestamp_ms(),
  144. 'token': TOKEN,
  145. 'userToken': TOKEN,
  146. 'enterpriseId': enterprise_id,
  147. }
  148. def fetch_list_page(self, keyword, page):
  149. payload = self._list_payload(keyword, page)
  150. list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList"
  151. resp = self._post_with_retry(list_url, payload)
  152. data = resp.json()
  153. return data.get("data", {}).get("shopProducts", []) or []
  154. def fetch_detail(self, spu_code, seller_code):
  155. payload = self._detail_payload(spu_code, seller_code)
  156. detail_url = "https://gateway-b2b.fangkuaiyi.com/product/detail"
  157. resp = self._post_with_retry(detail_url, payload)
  158. return resp.json().get("data", {}) or {}
  159. def fetch_shop(self, seller_code):
  160. payload = self._shop_payload(seller_code)
  161. detail_url = 'https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification'
  162. resp = self._post_with_retry(detail_url, payload)
  163. shop_res = resp.json().get("data", {})
  164. base_info = shop_res.get("baseInfo", {})
  165. address = base_info.get("address", "")
  166. company_name = base_info.get("enterpriseName", "")
  167. return address, company_name
  168. def parse_product(self, item):
  169. seller_code = item.get("sellerCode")
  170. spu_code = item.get("spuCode")
  171. name_part = (item.get("productName") or "").strip()
  172. short_part = (item.get("shortName") or "").strip()
  173. product_name = f"{name_part} {short_part}".strip()
  174. shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home"
  175. # 这里读取数据库,获取城市
  176. company_adress, company_name = self.fetch_shop(seller_code)
  177. detail_json = self.fetch_detail(spu_code, seller_code)
  178. address = detail_json.get("enterpriseIntroduce", {}).get("address", "")
  179. city_id = province_id = city = province = ""
  180. if address:
  181. city_id, province_id, city, province = get_city(address.split("市")[0])
  182. raw_price = item.get("price")
  183. price = self.decrypt_price(raw_price)
  184. hash_text = str(seller_code)+str(price)
  185. item_id = hashlib.md5(hash_text.encode('utf-8')).hexdigest()
  186. is_sold_out = 0
  187. is_sold_out_text = item.get("statusDescription", "")
  188. if "商品已售罄" in is_sold_out_text:
  189. is_sold_out= 1
  190. shop_name = item.get("storeName")
  191. if not shop_name:
  192. shop_name = item.get("shopName")
  193. anonymous_store_name = ""
  194. if shop_name == "预约配送中心":
  195. anonymous_store_name = item.get("supplyName", "")
  196. inventory = item.get("currentInventory")
  197. if not inventory:
  198. inventory = item.get("stockCount")
  199. now = time.strftime("%Y-%m-%d %H:%M:%S")
  200. # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline)
  201. product = {
  202. "platform": self.platform,
  203. "item_id": item_id,
  204. "enterprise_id": self.company_id,
  205. "product_name": product_name,
  206. "spec": item.get("spec"),
  207. "one_price": "",
  208. "detail_url": f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}",
  209. "shop_name": shop_name,
  210. "anonymous_store_name": anonymous_store_name,
  211. "shop_url": shop_url,
  212. "city_name": city,
  213. "city_id": city_id,
  214. "province_name": province,
  215. "province_id": province_id,
  216. "shipment_city_name": "",
  217. "shipment_city_id": "",
  218. "shipment_province_name": "",
  219. "shipment_province_id": "",
  220. "area_info": company_adress if company_adress else address,
  221. "factory_name": item.get("factoryName"),
  222. "scrape_date": time.strftime("%Y-%m-%d"),
  223. "price": price,
  224. "sales": "",
  225. "stock_count": inventory,
  226. "snapshot_url": "",
  227. "approval_num": item.get("approvalNum"),
  228. "produced_time": item.get("productionTime"),
  229. "deadline": item.get("deadLine"),
  230. "update_time": now,
  231. "insert_time": now,
  232. "number": 1,
  233. "product_brand": self.brand or "",
  234. "collect_task_id": self.collect_task_id,
  235. "search_name": self.product,
  236. "company_name": company_name,
  237. "collect_config_info": json.dumps(
  238. {
  239. "sampling_cycle": self.sampling_cycle,
  240. "sampling_start_time": self.sampling_start_time,
  241. "sampling_end_time": self.sampling_end_time,
  242. }
  243. ),
  244. "account_id": self.account_id,
  245. "collect_region_id": self.collect_region_id,
  246. "collect_round": self.collect_round,
  247. "is_sold_out": is_sold_out
  248. }
  249. return product
  250. def decrypt_price(self, ciphertext_b64):
  251. if not ciphertext_b64 or not str(ciphertext_b64).strip():
  252. return ""
  253. _KEY_FIXED = "GDLSAUO1KUMIIBCE"
  254. if not self.user_id:
  255. key = _KEY_FIXED.encode("utf-8")
  256. else:
  257. uid = str(self.user_id)[:6].rjust(6, "0")
  258. key = (_KEY_FIXED[:10] + uid).encode("utf-8")
  259. raw = base64.b64decode(ciphertext_b64.strip())
  260. cipher = AES.new(key, AES.MODE_ECB)
  261. plain = unpad(cipher.decrypt(raw), AES.block_size)
  262. price = plain.decode("utf-8")
  263. return price
  264. def search_data(self):
  265. keyword = self.product
  266. if self.brand:
  267. keyword = self.brand + " " + self.product
  268. if self.product_desc:
  269. keyword = keyword + " " + self.product_desc
  270. for page in range(1, 100):
  271. logger.info("正在爬取%s %s,第%s页数据", self.brand, self.product, page)
  272. page_items = self.fetch_list_page(keyword=keyword, page=page)
  273. if not page_items:
  274. break
  275. for item in page_items:
  276. if not item.get("productId") and item.get("groupBuyProductDto"):
  277. item = item.get("groupBuyProductDto") or {}
  278. name_part = (item.get("productName") or "").strip()
  279. short_part = (item.get("shortName") or "").strip()
  280. product_name = f"{name_part} {short_part}".strip()
  281. if self.product not in product_name:
  282. self.is_not_product += 1
  283. continue
  284. # if self.brand not in product_name:
  285. # self.is_not_product += 1
  286. # continue
  287. self.is_not_product = 0
  288. product = self.parse_product(item)
  289. if not product.get("item_id"):
  290. continue
  291. logger.info(f"爬取到数据{json.dumps(product,ensure_ascii=False)}")
  292. try:
  293. self.pipeline.storge_data(product)
  294. logger.info("%s", json.dumps(product, ensure_ascii=False))
  295. except Exception as e:
  296. logger.exception("写入数据库失败: %s", e)
  297. time.sleep(random.randint(1, 3))
  298. if self.is_not_product > 15:
  299. break
  300. time.sleep(random.randint(2, 5))
  301. def run(self):
  302. try:
  303. self.search_data()
  304. except Exception as e:
  305. logger.error(e)
  306. self.is_success = False
  307. logger.info(f"爬取总数{self.pipeline.crawl_count}")
  308. return self.pipeline.crawl_count, self.is_success