import base64 import hashlib import json import random import signal import socket import sys import time from pathlib import Path from urllib.parse import quote import requests from Crypto.Cipher import AES from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader from pipelines.drug_pipelines import DrugPipeline from area_info.city_name_to_id import get_city from commons.config import YYC_ACCOUNT from Crypto.Util.Padding import unpad CAPTCHA_TOKEN = "zPzmt1mG1ouCU6GTzsZN2Lmm8pdZypapPcLJTBRETco" CAPTCHA_API_URL = "http://api.jfbym.com/api/YmServer/customApi" chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" # 项目根目录 → spiders/yaoex(与从哪执行脚本无关) PROJECT_ROOT = Path(__file__).resolve().parents[2] YAOEX_SPIDER_DIR = PROJECT_ROOT / "spiders" / "yaoex" BROWSER_PROFILE_SUBDIR = "chrome_profile" SLIDER_OFFSET_FIX = 10 DETAIL_GET_TIMEOUT = 15 DETAIL_URL_WAIT = 10 DETAIL_DOM_WAIT = 8 DETAIL_NAV_RETRIES = 3 DETAIL_CONTENT_XPATH = "xpath://div[contains(@class,'yaoex-product-detail__content')]" REQUEST_RETRY_COUNT = 3 REQUEST_TIMEOUT_SEC = 20 NOT_PRODUCT_BREAK = 15 headers = { "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Content-Type": "application/x-www-form-urlencoded", "Origin": "https://mall.yaoex.com", "Referer": "https://mall.yaoex.com/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/146.0.0.0 Safari/537.36" ), "X-Request-Agent": "Axios", "X-Requested-With": "XMLHttpRequest", "sec-ch-ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Google Chrome";v="146"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } class YaoexSnapshotCrawl: def __init__(self, drug_dict=None): self.driver = None self.platform = 6 self.pipeline = DrugPipeline("yaoex") self.task_dict = drug_dict or {} self.ossuploader = AliyunOSSUploader() self.start_page = 1 self.end_page = 1 self.account_name = YYC_ACCOUNT.get("username", "yyc_default") self._shop_cache = {} self._register_signal_handler() if self.task_dict: self.get_product_data() self.success = True self.is_not_product = 0 self.user_id = YYC_ACCOUNT["user_id"] self.token = YYC_ACCOUNT["token"] def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) self.start_page = self._parse_page(self.task_dict.get("start_page"), 1) self.end_page = max( self.start_page, self._parse_page(self.task_dict.get("end_page"), self.start_page), ) @staticmethod def _parse_page(value, default=1): try: page = int(value) return page if page >= 1 else default except (TypeError, ValueError): return default def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) @staticmethod def _timestamp_ms() -> str: return str(int(time.time() * 1000)) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def _resolve_browser_profile_dir(self): """ 浏览器数据固定落在 <项目根>/spiders/yaoex/ 下。 优先 chrome_profile/<账号>;若旧版直接在 yaoex/<账号> 已有登录态则继续沿用。 """ preferred = YAOEX_SPIDER_DIR / BROWSER_PROFILE_SUBDIR / self.account_name legacy_flat = YAOEX_SPIDER_DIR / self.account_name legacy_nested = YAOEX_SPIDER_DIR / "spiders" / "yaoex" / self.account_name for candidate in (preferred, legacy_flat, legacy_nested): if (candidate / "Default").is_dir() or (candidate / "Local State").is_file(): logger.info("使用已有浏览器配置目录: %s", candidate) return candidate preferred.parent.mkdir(parents=True, exist_ok=True) logger.info("新建浏览器配置目录: %s", preferred) return preferred def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() profile_dir = self._resolve_browser_profile_dir() profile_dir.mkdir(parents=True, exist_ok=True) co.set_user_data_path(str(profile_dir)) logger.info("浏览器用户目录(绝对路径): %s", profile_dir.resolve()) co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") co.set_argument("--disable-dev-shm-usage") co.set_argument("--start-maximized") co.set_argument("--no-first-run") co.set_argument("--no-default-browser-check") self.driver = ChromiumPage(co) def _is_logged_in(self): return bool(self.driver.ele("xpath=//a[@id='logout']", timeout=5)) def _call_captcha_api(self, image_bytes): try: b64 = base64.b64encode(image_bytes).decode() resp = requests.post( CAPTCHA_API_URL, json={"token": CAPTCHA_TOKEN, "type": "22222", "image": b64}, headers={"Content-Type": "application/json"}, timeout=15, ).json() logger.info("验证码 API 返回: %s", resp) return resp["data"]["data"] except Exception as e: logger.exception("验证码识别失败: %s", e) return None @staticmethod def _generate_human_track(distance): tracks = [] current = 0 mid = distance * 0.7 t = 0.2 v = 0 move_points = [] while current < mid: a = random.uniform(2, 4) v0 = v v = v0 + a * t move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) while current < distance: a = -random.uniform(0.5, 1.5) v0 = v v = v0 + a * t if v < 0.5: v = 0.5 move = v0 * t + 0.5 * a * t * t current += move move_points.append(move) total_points = len(move_points) for i, move in enumerate(move_points): y_offset = random.randint(-2, 2) if i % random.randint(2, 4) == 0 else 0 if i < total_points * 0.3: duration = random.uniform(0.01, 0.03) elif i > total_points * 0.7: duration = random.uniform(0.03, 0.08) else: duration = random.uniform(0.02, 0.05) if random.random() < 0.05: duration += random.uniform(0.05, 0.1) tracks.append((move, y_offset, duration)) if random.random() < 0.7: tracks.append((-random.randint(1, 3), 0, 0.05)) return tracks def _simulate_slider_drag(self, slider_element, target_distance): self.driver.actions.move_to(slider_element).hold() for offset_x, offset_y, duration in self._generate_human_track(target_distance): self.driver.actions.move(offset_x, offset_y, duration=duration / 1000) self.driver.actions.release() def _solve_slider_if_present(self): modal = self.driver.ele("xpath://div[@class='yidun_modal']", timeout=3) if not modal: return True logger.info("检测到滑块验证码,开始处理") jpg_bytes = modal.get_screenshot(as_bytes="jpg") distance = self._call_captcha_api(jpg_bytes) if distance is None: return False slider = self.driver.ele( "xpath://div[contains(@class,'yidun_slider--hover')]", timeout=5 ) if not slider: logger.error("未找到滑块元素") return False self._simulate_slider_drag(slider, float(distance) + SLIDER_OFFSET_FIX) time.sleep(3) return True def login(self): self.driver.get("https://mall.yaoex.com/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) input_name = self.driver.ele("xpath://input[@name='username']", timeout=5) if not input_name: logger.error("未找到用户名输入框") return False input_name.input(YYC_ACCOUNT["username"]) time.sleep(random.uniform(1.2, 2.0)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input(YYC_ACCOUNT["password"]) time.sleep(random.uniform(1.2, 2.0)) geetest_click = self.driver.ele( "xpath=//div[contains(@class,'geetest_btn_click')]", timeout=3 ) if geetest_click: geetest_click.click() time.sleep(1.5) login_button = self.driver.ele("xpath://input[@id='login-btn']", timeout=5) if not login_button: logger.error("未找到登录按钮") return False login_button.click() self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._solve_slider_if_present(): return False return self._is_logged_in() def decrypt_price(self, ciphertext_b64): if not ciphertext_b64 or not str(ciphertext_b64).strip(): return "" _KEY_FIXED = "GDLSAUO1KUMIIBCE" if not self.user_id: key = _KEY_FIXED.encode("utf-8") else: uid = str(self.user_id)[:6].rjust(6, "0") key = (_KEY_FIXED[:10] + uid).encode("utf-8") raw = base64.b64decode(ciphertext_b64.strip()) cipher = AES.new(key, AES.MODE_ECB) plain = unpad(cipher.decrypt(raw), AES.block_size) return plain.decode("utf-8") def _post_with_retry(self, url, payload, retries=REQUEST_RETRY_COUNT, timeout=REQUEST_TIMEOUT_SEC): last_err = None for attempt in range(1, retries + 1): try: resp = requests.post( url, headers=headers, data=payload, timeout=timeout, ) resp.raise_for_status() return resp except Exception as e: last_err = e if attempt < retries: logger.warning("请求失败,第%s/%s次重试: %s", attempt, retries, e) time.sleep(min(2 * attempt, 5)) else: logger.error("请求失败,已达最大重试次数(%s): %s", retries, e) raise last_err def _shop_payload(self, enterprise_id): return { "traderName": "yaoex_pc", "trader": "pc", "closesignature": "yes", "signature_method": "md5", "signature": "****", "timestamp": self._timestamp_ms(), "token": self.token, "userToken": self.token, "enterpriseId": enterprise_id, } def _list_payload(self, keyword, page): return { "traderName": "yaoex_pc", "trader": "pc", "closesignature": "yes", "signature_method": "md5", "signature": "****", "timestamp": self._timestamp_ms(), "token": self.token, "userToken": self.token, "userId": self.user_id, "roleId": "101", "userType": "下游客户", "buyerCode": self.user_id, "nowPage": str(page), "per": "20", "keyword": keyword, "catSearchId": "", "specs": "", "factoryIds": "", "sellerCodes": "", "sellerFilterMode": "0", "sortColumn": "default", "sortMode": "default", "ver": "1", "stock_mode": "1", "showExtendCard": "true", "needDinnerPrice": "true", "limitStart": "", "limitEnd": "", "deadLineStart": "", "deadLineEnd": "", "filterDtos": "", "showWholePurchase": "true", } def fetch_list_page(self, keyword, page): list_url = "https://gateway-b2b.fangkuaiyi.com/home/search/homeSearchList" resp = self._post_with_retry(list_url, self._list_payload(keyword, page)) return resp.json().get("data", {}).get("shopProducts", []) or [] def fetch_shop(self, seller_code): detail_url = "https://gateway-b2b.fangkuaiyi.com/ycapp/shop/enterpriseQualification" resp = self._post_with_retry(detail_url, self._shop_payload(seller_code)) shop_res = resp.json().get("data", {}) base_info = shop_res.get("baseInfo", {}) return base_info.get("address", ""), base_info.get("enterpriseName", "") def _get_shop_info(self, seller_code): if seller_code in self._shop_cache: return self._shop_cache[seller_code] try: shop_info = self.fetch_shop(seller_code) except Exception as e: logger.warning("fetch_shop 失败 seller_code=%s: %s", seller_code, e) shop_info = ("", "") self._shop_cache[seller_code] = shop_info return shop_info def _current_url(self): try: return self.driver.url or "" except Exception: return "" def _url_has_product(self, spu_code, seller_code): url = self._current_url() spu_code = str(spu_code or "") seller_code = str(seller_code or "") if spu_code and seller_code: return spu_code in url and seller_code in url return bool(spu_code and spu_code in url) def _wait_detail_ready(self, spu_code, seller_code, timeout=DETAIL_URL_WAIT): deadline = time.time() + timeout while time.time() < deadline: if self._url_has_product(spu_code, seller_code): if self.driver.ele(DETAIL_CONTENT_XPATH, timeout=1): time.sleep(0.3) return True time.sleep(0.4) return False def _build_detail_url(self, item): if not item.get("productId") and item.get("groupBuyProductDto"): item = item.get("groupBuyProductDto") or {} spu_code = item.get("spuCode", "") seller_code = item.get("sellerCode", "") group_buying_id = item.get("groupBuyingId", "") p_json = json.dumps( {"id": group_buying_id, "s": seller_code, "sp": spu_code}, separators=(",", ":"), ) detail_url = ( f"https://mall.yaoex.com/groupBuying/#/productDetail?p={quote(p_json)}" ) else: seller_code = item.get("sellerCode") spu_code = item.get("spuCode") detail_url = ( f"https://mall.yaoex.com/v2/product/#/spuCode/{spu_code}/sellerCode/{seller_code}" ) return item, detail_url, spu_code, seller_code def _goto_detail_page(self, detail_url, spu_code, seller_code): """get 后 refresh 一次,让 SPA 按当前 URL 重新渲染详情。""" for attempt in range(1, DETAIL_NAV_RETRIES + 1): try: self.driver.get(detail_url, timeout=DETAIL_GET_TIMEOUT) time.sleep(0.5) self.driver.refresh() time.sleep(2) return True except Exception as e: logger.warning( "跳转详情异常 spu=%s seller=%s attempt=%s: %s", spu_code, seller_code, attempt, e, ) time.sleep(random.uniform(0.8, 1.5)) return False def _take_snapshot(self, upload_key): time.sleep(1) try: detail_ele = self.driver.ele(DETAIL_CONTENT_XPATH, timeout=2) if detail_ele: jpg_bytes = detail_ele.get_screenshot(as_bytes="jpg") else: jpg_bytes = self.driver.get_screenshot(as_bytes="jpg") if not jpg_bytes: logger.warning("截图为空 upload_key=%s", upload_key) return "" img_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key)) except Exception: logger.exception("截图或 OSS 上传失败 upload_key=%s", upload_key) return "" if not img_url: logger.warning("OSS 未返回有效地址 upload_key=%s", upload_key) return "" logger.info("截图上传完成 upload_key=%s url=%s", upload_key, img_url) time.sleep(random.uniform(1, 2)) return img_url def parse_product(self, item, detail_url, snap_url): seller_code = item.get("sellerCode") spu_code = item.get("spuCode") name_part = (item.get("productName") or "").strip() short_part = (item.get("shortName") or "").strip() product_name = f"{name_part} {short_part}".strip() shop_url = f"https://mall.yaoex.com/v2/store/#/detail/{seller_code}/home" company_address, company_name = self._get_shop_info(seller_code) address = item.get("cityName", "") city_id = province_id = city = province = "" if address: city_id, province_id, city, province = get_city(address.split("市")[0]) price = self.decrypt_price(item.get("price")) hash_text = f"{seller_code}_{spu_code}_{price}" item_id = hashlib.md5(hash_text.encode("utf-8")).hexdigest() is_sold_out = 1 if "商品已售罄" in (item.get("statusDescription") or "") else 0 shop_name = item.get("storeName") or item.get("shopName") anonymous_store_name = "" if shop_name == "预约配送中心": anonymous_store_name = item.get("supplyName", "") inventory = item.get("currentInventory") or item.get("stockCount") now = time.strftime("%Y-%m-%d %H:%M:%S") return { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": product_name, "spec": item.get("spec"), "one_price": "", "detail_url": detail_url, "shop_name": shop_name, "anonymous_store_name": anonymous_store_name, "shop_url": shop_url, "city_name": city, "city_id": city_id, "province_name": province, "province_id": province_id, "shipment_city_name": "", "shipment_city_id": "", "shipment_province_name": "", "shipment_province_id": "", "area_info": company_address or "", "factory_name": item.get("factoryName"), "scrape_date": time.strftime("%Y-%m-%d"), "price": price, "sales": "", "stock_count": inventory, "snapshot_url": snap_url, "approval_num": item.get("approvalNum"), "produced_time": item.get("productionTime"), "deadline": item.get("deadLine"), "update_time": now, "insert_time": now, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": company_name, "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": is_sold_out, } def search(self): self.driver.get("https://mall.yaoex.com/", timeout=15) self.driver.wait.doc_loaded(timeout=10) if not self._is_logged_in(): if not self.login(): logger.error("登录失败") return False keyword = self.product if self.brand: keyword = (self.brand + " " + self.product).strip() if self.product_desc: keyword = (keyword + " " + self.product_desc).strip() for page in range(self.start_page, self.end_page + 1): logger.info("正在爬取 %s %s,第%s页", self.brand, self.product, page) page_items = self.fetch_list_page(keyword=keyword, page=page) if not page_items: logger.info("第%s页无数据,停止", page) break for item in page_items: item, detail_url, spu_code, seller_code = self._build_detail_url(item) name_part = (item.get("productName") or "").strip() short_part = (item.get("shortName") or "").strip() product_name = f"{name_part} {short_part}".strip() if self.product not in product_name: self.is_not_product += 1 continue if self.brand not in product_name: self.is_not_product += 1 continue self.is_not_product = 0 if not self._goto_detail_page(detail_url, spu_code, seller_code): logger.warning( "详情页跳转失败,跳过 spu=%s seller=%s url=%s", spu_code, seller_code, detail_url, ) continue upload_key = hashlib.md5(detail_url.encode("utf-8")).hexdigest() snap_url = self._take_snapshot(upload_key) product = self.parse_product(item, detail_url, snap_url) if not product.get("item_id"): continue try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) time.sleep(random.uniform(1, 2)) if self.is_not_product > NOT_PRODUCT_BREAK: logger.info("连续不匹配商品过多,停止搜索") break time.sleep(random.uniform(1, 3)) return True def run(self): try: self.init_browser() self.search() except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser()