| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- import json
- import random
- import time
- import requests
- from commons.Logger import get_spider_logger
- import base64
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import unpad
- from pipelines.drug_pipelines import DrugPipeline
- from area_info.city_name_to_id import get_city
- import hashlib
- logger = get_spider_logger("yaoex")
- TOKEN = "Sm45MzRmREtiaStVTnJORXEySHhYYzNwUmQ2RUprWXlwelRDem4wV2RZUCtUUU5jMGVCVTRYYjNLVjdNSnFWSjg1YStxWllGQ2RQSExjaEVqU0dOaDFJczl4bTB1V09CZHZzVml2dU0xazd3UDdla3FTUzZBZlZkMHFSVHlaaDhDcFp3SWNDb3JNSDhuNC9vUzI1RVdEaU01YjcxQW5TS21Sdy90ZDRENi9VR2E0SW5wOWF4UE1VZ0poTDhhVkJtP2FwcElkPTEyNTAma2V5SWQ9MTI1MA=="
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/x-www-form-urlencoded",
- "Origin": "https://mall.yaoex.com",
- "Referer": "https://mall.yaoex.com/",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "cross-site",
- "User-Agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/146.0.0.0 Safari/537.36"
- ),
- "X-Request-Agent": "Axios",
- "X-Requested-With": "XMLHttpRequest",
- "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- }
- REQUEST_RETRY_COUNT = 3
- REQUEST_TIMEOUT_SEC = 20
- class YaoexCrawler:
- def __init__(self, drug_dict=None):
- self.token = TOKEN
- self.user_id = "181680"
- self.platform = 6
- self.task_dict = drug_dict or {}
- self.collect_task_id = None
- self.account_name = None
- self.pipeline = DrugPipeline("yaoex")
- if self.task_dict:
- self.get_product_data()
- self.is_success = True
- self.is_not_product = 0
- def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC):
- last_err = None
- for attempt in range(1, retries + 1):
- try:
- resp = requests.post(
- url,
- headers=headers,
- data=payload,
- timeout=timeout,
- )
- resp.raise_for_status()
- return resp
- except Exception as e:
- last_err = e
- if attempt < retries:
- logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e)
- time.sleep(min(2 * attempt, 5))
- else:
- logger.error("请求失败,已达最大重试次数(%s): %s", retries, e)
- raise last_err
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- @staticmethod
- def _timestamp_ms() -> str:
- return str(int(time.time() * 1000))
- def _list_payload(self, keyword, page):
- return {
- "traderName": "yaoex_pc",
- "trader": "pc",
- "closesignature": "yes",
- "signature_method": "md5",
- "signature": "****",
- "timestamp": self._timestamp_ms(),
- "token": self.token,
- "userToken": self.token,
- "userId": self.user_id,
- "roleId": "101",
- "userType": "下游客户",
- "buyerCode": self.user_id,
- "nowPage": str(page),
- "per": "20",
- "keyword": keyword,
- "catSearchId": "",
- "specs": "",
- "factoryIds": "",
- "sellerCodes": "",
- "sellerFilterMode": "0",
- "sortColumn": "default",
- "sortMode": "default",
- "ver": "1",
- "stock_mode": "1",
- "showExtendCard": "true",
- "needDinnerPrice": "true",
- "limitStart": "",
- "limitEnd": "",
- "deadLineStart": "",
- "deadLineEnd": "",
- "filterDtos": "",
- "showWholePurchase": "true",
- }
- def _detail_payload(self, spu_code, seller_code):
- return {
- "traderName": "yaoex_pc",
- "trader": "pc",
- "closesignature": "yes",
- "signature_method": "md5",
- "signature": "****",
- "timestamp": self._timestamp_ms(),
- "token": self.token,
- "userToken": self.token,
- "spuCode": str(spu_code),
- "sellerCode": str(seller_code),
- }
- def _shop_payload(self, enterprise_id):
- return {
- 'traderName': 'yaoex_pc',
- 'trader': 'pc',
- 'closesignature': 'yes',
- 'signature_method': 'md5',
- 'signature': '****',
- 'timestamp': self._timestamp_ms(),
- 'token': TOKEN,
- 'userToken': TOKEN,
- 'enterpriseId': enterprise_id,
- }
- def fetch_list_page(self, keyword, page):
- payload = self._list_payload(keyword, page)
- list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList"
- resp = self._post_with_retry(list_url, payload)
- data = resp.json()
- return data.get("data", {}).get("shopProducts", []) or []
- def fetch_detail(self, spu_code, seller_code):
- payload = self._detail_payload(spu_code, seller_code)
- detail_url = "https://gateway-b2b.fangkuaiyi.com/product/detail"
- resp = self._post_with_retry(detail_url, payload)
- return resp.json().get("data", {}) or {}
- def fetch_shop(self, seller_code):
- payload = self._shop_payload(seller_code)
- detail_url = 'https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification'
- resp = self._post_with_retry(detail_url, payload)
- shop_res = resp.json().get("data", {})
- base_info = shop_res.get("baseInfo", {})
- address = base_info.get("address", "")
- company_name = base_info.get("enterpriseName", "")
- return address, company_name
- def parse_product(self, item):
- seller_code = item.get("sellerCode")
- spu_code = item.get("spuCode")
- name_part = (item.get("productName") or "").strip()
- short_part = (item.get("shortName") or "").strip()
- product_name = f"{name_part} {short_part}".strip()
- shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home"
- # 这里读取数据库,获取城市
- company_adress, company_name = self.fetch_shop(seller_code)
- detail_json = self.fetch_detail(spu_code, seller_code)
- address = detail_json.get("enterpriseIntroduce", {}).get("address", "")
- city_id = province_id = city = province = ""
- if address:
- city_id, province_id, city, province = get_city(address.split("市")[0])
- raw_price = item.get("price")
- price = self.decrypt_price(raw_price)
- hash_text = str(seller_code)+str(price)
- item_id = hashlib.md5(hash_text.encode('utf-8')).hexdigest()
- is_sold_out = 0
- is_sold_out_text = item.get("statusDescription", "")
- if "商品已售罄" in is_sold_out_text:
- is_sold_out= 1
- shop_name = item.get("storeName")
- if not shop_name:
- shop_name = item.get("shopName")
- anonymous_store_name = ""
- if shop_name == "预约配送中心":
- anonymous_store_name = item.get("supplyName", "")
- inventory = item.get("currentInventory")
- if not inventory:
- inventory = item.get("stockCount")
- now = time.strftime("%Y-%m-%d %H:%M:%S")
- # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline)
- product = {
- "platform": self.platform,
- "item_id": item_id,
- "enterprise_id": self.company_id,
- "product_name": product_name,
- "spec": item.get("spec"),
- "one_price": "",
- "detail_url": f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}",
- "shop_name": shop_name,
- "anonymous_store_name": anonymous_store_name,
- "shop_url": shop_url,
- "city_name": city,
- "city_id": city_id,
- "province_name": province,
- "province_id": province_id,
- "shipment_city_name": "",
- "shipment_city_id": "",
- "shipment_province_name": "",
- "shipment_province_id": "",
- "area_info": company_adress if company_adress else address,
- "factory_name": item.get("factoryName"),
- "scrape_date": time.strftime("%Y-%m-%d"),
- "price": price,
- "sales": "",
- "stock_count": inventory,
- "snapshot_url": "",
- "approval_num": item.get("approvalNum"),
- "produced_time": item.get("productionTime"),
- "deadline": item.get("deadLine"),
- "update_time": now,
- "insert_time": now,
- "number": 1,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": company_name,
- "collect_config_info": json.dumps(
- {
- "sampling_cycle": self.sampling_cycle,
- "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time,
- }
- ),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": is_sold_out
- }
- return product
- def decrypt_price(self, ciphertext_b64):
- if not ciphertext_b64 or not str(ciphertext_b64).strip():
- return ""
- _KEY_FIXED = "GDLSAUO1KUMIIBCE"
- if not self.user_id:
- key = _KEY_FIXED.encode("utf-8")
- else:
- uid = str(self.user_id)[:6].rjust(6, "0")
- key = (_KEY_FIXED[:10] + uid).encode("utf-8")
- raw = base64.b64decode(ciphertext_b64.strip())
- cipher = AES.new(key, AES.MODE_ECB)
- plain = unpad(cipher.decrypt(raw), AES.block_size)
- price = plain.decode("utf-8")
- return price
- def search_data(self):
- keyword = ""
- if self.brand:
- keyword = self.brand + " " + self.product
- if self.product_desc:
- keyword = keyword + " " + self.product_desc
- for page in range(1, 100):
- logger.info("正在爬取%s %s,第%s页数据", self.brand, self.product, page)
- page_items = self.fetch_list_page(keyword=keyword, page=page)
- if not page_items:
- break
- for item in page_items:
- if not item.get("productId") and item.get("groupBuyProductDto"):
- item = item.get("groupBuyProductDto") or {}
- name_part = (item.get("productName") or "").strip()
- short_part = (item.get("shortName") or "").strip()
- product_name = f"{name_part} {short_part}".strip()
- if self.product not in product_name:
- self.is_not_product += 1
- continue
- # if self.brand not in product_name:
- # self.is_not_product += 1
- # continue
- self.is_not_product = 0
- product = self.parse_product(item)
- if not product.get("item_id"):
- continue
- try:
- self.pipeline.storge_data(product)
- logger.info("%s", json.dumps(product, ensure_ascii=False))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- time.sleep(random.randint(1, 3))
- if self.is_not_product > 15:
- break
- time.sleep(random.randint(2, 5))
- def run(self):
- self.search_data()
- # try:
- # self.search_data()
- # except Exception as e:
- # logger.error(e)
- # self.is_success = False
- logger.info(f"爬取总数{self.pipeline.crawl_count}")
- return self.pipeline.crawl_count, self.is_success
|