import json import random import time import requests from commons.Logger import get_spider_logger import base64 from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city import hashlib logger = get_spider_logger("yaoex") TOKEN = "Sm45MzRmREtiaStVTnJORXEySHhYYzNwUmQ2RUprWXlwelRDem4wV2RZUCtUUU5jMGVCVTRYYjNLVjdNSnFWSjg1YStxWllGQ2RQSExjaEVqU0dOaDFJczl4bTB1V09CZHZzVml2dU0xazd3UDdla3FTUzZBZlZkMHFSVHlaaDhDcFp3SWNDb3JNSDhuNC9vUzI1RVdEaU01YjcxQW5TS21Sdy90ZDRENi9VR2E0SW5wOWF4UE1VZ0poTDhhVkJtP2FwcElkPTEyNTAma2V5SWQ9MTI1MA==" headers = { "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/x-www-form-urlencoded", "Origin": "https://mall.yaoex.com", "Referer": "https://mall.yaoex.com/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/146.0.0.0 Safari/537.36" ), "X-Request-Agent": "Axios", "X-Requested-With": "XMLHttpRequest", "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } REQUEST_RETRY_COUNT = 3 REQUEST_TIMEOUT_SEC = 20 class YaoexCrawler: def __init__(self, drug_dict=None): self.token = TOKEN self.user_id = "181680" self.platform = 6 self.task_dict = drug_dict or {} self.collect_task_id = None self.account_name = None self.pipeline = DrugPipeline("yaoex") if self.task_dict: self.get_product_data() self.is_success = True self.is_not_product = 0 def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC): last_err = None for attempt in range(1, retries + 1): try: resp = requests.post( url, headers=headers, data=payload, timeout=timeout, ) resp.raise_for_status() return resp except Exception as e: last_err = e if attempt < retries: logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e) time.sleep(min(2 * attempt, 5)) else: logger.error("请求失败,已达最大重试次数(%s): %s", retries, e) raise last_err def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) @staticmethod def _timestamp_ms() -> str: return str(int(time.time() * 1000)) def _list_payload(self, keyword, page): return { "traderName": "yaoex_pc", "trader": "pc", "closesignature": "yes", "signature_method": "md5", "signature": "****", "timestamp": self._timestamp_ms(), "token": self.token, "userToken": self.token, "userId": self.user_id, "roleId": "101", "userType": "下游客户", "buyerCode": self.user_id, "nowPage": str(page), "per": "20", "keyword": keyword, "catSearchId": "", "specs": "", "factoryIds": "", "sellerCodes": "", "sellerFilterMode": "0", "sortColumn": "default", "sortMode": "default", "ver": "1", "stock_mode": "1", "showExtendCard": "true", "needDinnerPrice": "true", "limitStart": "", "limitEnd": "", "deadLineStart": "", "deadLineEnd": "", "filterDtos": "", "showWholePurchase": "true", } def _detail_payload(self, spu_code, seller_code): return { "traderName": "yaoex_pc", "trader": "pc", "closesignature": "yes", "signature_method": "md5", "signature": "****", "timestamp": self._timestamp_ms(), "token": self.token, "userToken": self.token, "spuCode": str(spu_code), "sellerCode": str(seller_code), } def _shop_payload(self, enterprise_id): return { 'traderName': 'yaoex_pc', 'trader': 'pc', 'closesignature': 'yes', 'signature_method': 'md5', 'signature': '****', 'timestamp': self._timestamp_ms(), 'token': TOKEN, 'userToken': TOKEN, 'enterpriseId': enterprise_id, } def fetch_list_page(self, keyword, page): payload = self._list_payload(keyword, page) list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList" resp = self._post_with_retry(list_url, payload) data = resp.json() return data.get("data", {}).get("shopProducts", []) or [] def fetch_detail(self, spu_code, seller_code): payload = self._detail_payload(spu_code, seller_code) detail_url = "https://gateway-b2b.fangkuaiyi.com/product/detail" resp = self._post_with_retry(detail_url, payload) return resp.json().get("data", {}) or {} def fetch_shop(self, seller_code): payload = self._shop_payload(seller_code) detail_url = 'https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification' resp = self._post_with_retry(detail_url, payload) shop_res = resp.json().get("data", {}) base_info = shop_res.get("baseInfo", {}) address = base_info.get("address", "") company_name = base_info.get("enterpriseName", "") return address, company_name def parse_product(self, item): seller_code = item.get("sellerCode") spu_code = item.get("spuCode") name_part = (item.get("productName") or "").strip() short_part = (item.get("shortName") or "").strip() product_name = f"{name_part} {short_part}".strip() shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home" # 这里读取数据库,获取城市 company_adress, company_name = self.fetch_shop(seller_code) detail_json = self.fetch_detail(spu_code, seller_code) address = detail_json.get("enterpriseIntroduce", {}).get("address", "") city_id = province_id = city = province = "" if address: city_id, province_id, city, province = get_city(address.split("市")[0]) raw_price = item.get("price") price = self.decrypt_price(raw_price) hash_text = str(seller_code)+str(price) item_id = hashlib.md5(hash_text.encode('utf-8')).hexdigest() is_sold_out = 0 is_sold_out_text = item.get("statusDescription", "") if "商品已售罄" in is_sold_out_text: is_sold_out= 1 shop_name = item.get("storeName") if not shop_name: shop_name = item.get("shopName") anonymous_store_name = "" if shop_name == "预约配送中心": anonymous_store_name = item.get("supplyName", "") inventory = item.get("currentInventory") if not inventory: inventory = item.get("stockCount") now = time.strftime("%Y-%m-%d %H:%M:%S") # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline) product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": product_name, "spec": item.get("spec"), "one_price": "", "detail_url": f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}", "shop_name": shop_name, "anonymous_store_name": anonymous_store_name, "shop_url": shop_url, "city_name": city, "city_id": city_id, "province_name": province, "province_id": province_id, "shipment_city_name": "", "shipment_city_id": "", "shipment_province_name": "", "shipment_province_id": "", "area_info": company_adress if company_adress else address, "factory_name": item.get("factoryName"), "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": "", "stock_count": inventory, "snapshot_url": "", "approval_num": item.get("approvalNum"), "produced_time": item.get("productionTime"), "deadline": item.get("deadLine"), "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": company_name, "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": is_sold_out } return product def decrypt_price(self, ciphertext_b64): if not ciphertext_b64 or not str(ciphertext_b64).strip(): return "" _KEY_FIXED = "GDLSAUO1KUMIIBCE" if not self.user_id: key = _KEY_FIXED.encode("utf-8") else: uid = str(self.user_id)[:6].rjust(6, "0") key = (_KEY_FIXED[:10] + uid).encode("utf-8") raw = base64.b64decode(ciphertext_b64.strip()) cipher = AES.new(key, AES.MODE_ECB) plain = unpad(cipher.decrypt(raw), AES.block_size) price = plain.decode("utf-8") return price def search_data(self): keyword = self.product if self.brand: keyword = self.brand + " " + self.product if self.product_desc: keyword = keyword + " " + self.product_desc for page in range(1, 100): logger.info("正在爬取%s %s,第%s页数据", self.brand, self.product, page) page_items = self.fetch_list_page(keyword=keyword, page=page) if not page_items: break for item in page_items: if not item.get("productId") and item.get("groupBuyProductDto"): item = item.get("groupBuyProductDto") or {} name_part = (item.get("productName") or "").strip() short_part = (item.get("shortName") or "").strip() product_name = f"{name_part} {short_part}".strip() if self.product not in product_name: self.is_not_product += 1 continue # if self.brand not in product_name: # self.is_not_product += 1 # continue self.is_not_product = 0 product = self.parse_product(item) if not product.get("item_id"): continue logger.info(f"爬取到数据{json.dumps(product,ensure_ascii=False)}") try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False)) except Exception as e: logger.exception("写入数据库失败: %s", e) time.sleep(random.randint(1, 3)) if self.is_not_product > 15: break time.sleep(random.randint(2, 5)) def run(self): try: self.search_data() except Exception as e: logger.error(e) self.is_success = False logger.info(f"爬取总数{self.pipeline.crawl_count}") return self.pipeline.crawl_count, self.is_success