import time import requests import json from commons.Logger import get_spider_logger from pipelines.drug_pipelines import DrugPipeline logger = get_spider_logger("yaoyigou") user_name = '青羊新洋诊所' password = '123321' headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'zh-CN,zh;q=0.9', 'content-type': 'application/json;charset=UTF-8', 'origin': 'https://www.hezongyy.com', 'priority': 'u=1, i', 'referer': 'https://www.hezongyy.com/', 'sec-ch-ua': '"Google Chrome";v="147", "Not.A/Brand";v="8", "Chromium";v="147"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-site', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/147.0.0.0 Safari/537.36', } class Yaoyigou: def __init__(self, task_dict=None): self.hesytoken = None self.headers = headers.copy() self.platform = 8 self.collect_task_id = None self.pipeline = DrugPipeline("yaoyigou") self.company_id = 0 self.brand = "" self.is_success = True self.task_dict = task_dict if self.task_dict: self.get_product_data() def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round",1) def _post_json_with_retry(self, url, payload, max_retry=1, timeout=20): """ 请求失败后重试一次(默认总尝试 2 次)。 """ for attempt in range(max_retry + 1): try: response = requests.post( url, headers=self.headers, json=payload, timeout=timeout, ) response.raise_for_status() return response.json() except Exception as e: if attempt < max_retry: logger.warning("请求失败,准备重试(%s/%s): %s", attempt + 1, max_retry + 1, e) time.sleep(2) continue logger.error("请求失败,已达最大重试次数: %s", e) raise def login_yaoyigou(self): json_data = { 'username': user_name, 'password': password, 'channel': 1, 'timeout': 604800, 'fingerprint': 'b86bae775df98bc20c5199dab0ac4edb', 'companyLimited': 0, } try: data_json = self._post_json_with_retry( 'https://newapi.hezongyy.com/users/UserLogin/login', json_data, max_retry=1, timeout=20, ) self.hesytoken = data_json.get('content', "") except Exception as e: logger.error("登录失败: %s", e) self.hesytoken = "" return self.hesytoken def parse_product(self, item): item_id = item.get("id") now = time.strftime("%Y-%m-%d %H:%M:%S") safe_item_id = str(item_id).strip() if item_id not in (None, "") else "" detail_url = "" if safe_item_id: detail_url = f"https://www.hezongyy.com/#/goodsDetails?id={safe_item_id}" # 字段与 yaofangwang_crawl 中 product 对齐(供 DrugPipeline) product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": item.get("name", ""), "spec": item.get("specification", ""), "one_price": "", "detail_url": detail_url, "shop_name": "", "anonymous_store_name": "", "shop_url": "", "city_name": "", "city_id": "", "province_name": "", "province_id": "", "shipment_city_name": "", "shipment_city_id": "", "shipment_province_name": "", "shipment_province_id": "", "area_info": "", "factory_name": item.get("manufacturerName", ""), "scrape_date": time.strftime("%Y-%m-%d"), "price": item.get("sellingPrice"), "sales": "", "stock_count": item.get("quantity", ""), "snapshot_url": "", "approval_num": item.get("licenseNumber"), "produced_time": "", "deadline": item.get("expirationDate", ""), "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 1 } return product def get_good_ids(self, page_num): keyword = self.product_keyword or self.product json_data = { 'channelType': 0, 'columnType': 0, 'searchType': 0, 'pageNumber': page_num, 'pageSize': 20, 'keyWords': keyword, 'newCategoryLabelId': '-1', } good_ids = [] data_json = self._post_json_with_retry( 'https://newapi.hezongyy.com/goods/search/onCondition', json_data, max_retry=1, timeout=20, ) list_data = data_json.get("content", {}).get("list", []) for list_item in list_data: good_ids.append(str(list_item.get("goodsId")) + '0') return good_ids def search_data(self): for page_num in range(1, 20): good_ids = self.get_good_ids(page_num) if not good_ids: break json_data = { 'goodsIdList': good_ids, 'limitSize': 20, 'limitStart': (page_num - 1) * 20, 'keyword': self.product_keyword or self.product, 'clientType': 1, } json_data = self._post_json_with_retry( 'https://newapi.hezongyy.com/goods/goods/listNormal', json_data, max_retry=1, timeout=20, ) contents = json_data.get("content", []) if not contents: break for content in contents: company = content.get("manufacturerName", "") if self.brand not in company: continue try: product = self.parse_product(content) if not product.get("item_id"): continue self.pipeline.storge_data(product) logger.info(f"入库成功: {json.dumps(product, ensure_ascii=False)}") except Exception as e: logger.exception("入库失败: %s", e) def run(self): self.login_yaoyigou() if not self.hesytoken: logger.error(f"请检查账号是否出现问题") self.is_success = False return self.pipeline.crawl_count, self.is_success self.headers["hesytoken"] = self.hesytoken try: self.search_data() except Exception as e: logger.error(f"爬取搜索失败{str(e)}") self.is_success = False return self.pipeline.crawl_count, self.is_success