import hashlib import json import random import re import time from decimal import Decimal, InvalidOperation from curl_cffi import requests from lxml import etree from commons.Logger import get_spider_logger from commons.conn_mysql import MySQLPoolOnline from pipelines.drug_pipelines import DrugPipeline from spiders.taobao.taobao_login import (TaobaoAutoCrawl) from area_info.city_name_to_id import get_city logger = get_spider_logger("taobao") MTOP_APP_KEY = "12574478" MTOP_APP_ID = "34385" SEARCH_MAX_PAGE = 20 REQUEST_RETRY_COUNT = 3 COOKIE_MAX_AGE_SEC = 3600 headers = { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "script", "sec-fetch-mode": "no-cors", "sec-fetch-site": "same-site", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36" ), } MTOP_URL = ( "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/" ) class TaobaoCrawl: def __init__(self, drug_dict=None): self.cookies = None self.db = MySQLPoolOnline() self.pipeline = DrugPipeline("taobao") self.session = None self.proxies = None self.account_name = None self.ip = None self.cookie_stamp = None self.platform = 1 self.task_dict = drug_dict or {} self.collect_task_id = None self.success = True if self.task_dict: self.get_product_data() self.is_no_product = 0 def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) @staticmethod def _normalize_url(url): if not url: return "" url = str(url) if url.startswith("//"): return "https:" + url return url @staticmethod def _extract_shop_id(shop_url): if not shop_url: return "" shop_re = re.search(r"appUid=(\w+)", shop_url) if shop_re: return shop_re.group(1) return hashlib.md5(shop_url.encode("utf-8")).hexdigest() @staticmethod def _sql_literal(value): """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。""" if value is None: return "" return str(value).replace("'", "''").replace("\\", "\\\\") def init_session(self): self.session = requests.Session(impersonate="chrome124") self.session.cookies.update(self.cookies or {}) self.session.headers.update(headers) if self.proxies: self.session.proxies.update(self.proxies) @staticmethod def _is_transport_error(err): msg = str(err or "") return ("curl: (16)" in msg) or ("Failed to perform" in msg) def get_token(self, t, app_key, data_str): _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "") token = _m_h5_tk.split("_")[0] if _m_h5_tk else "" text = f"{token}&{t}&{app_key}&{data_str}" return hashlib.md5(text.encode()).hexdigest() def get_html_content(self, res_html): if not res_html: return "" ele_html = etree.HTML(res_html) if ele_html is None: return str(res_html) text_list = ele_html.xpath(".//text()") return "".join(text_list) def _build_search_payload(self, keyword, page, page_size=50): return { "appId": MTOP_APP_ID, "params": { "device": "HMA-AL00", "isBeta": "false", "grayHair": "false", "from": "nt_history", "brand": "HUAWEI", "info": "wifi", "index": "4", "rainbow": "", "schemaType": "auction", "elderHome": "false", "isEnterSrpSearch": "true", "newSearch": "false", "network": "wifi", "subtype": "", "hasPreposeFilter": "false", "prepositionVersion": "v2", "client_os": "Android", "gpsEnabled": "false", "searchDoorFrom": "srp", "debug_rerankNewOpenCard": "false", "homePageVersion": "v7", "searchElderHomeOpen": "false", "search_action": "initiative", "sugg": "_4_1", "sversion": "13.6", "style": "list", "ttid": "600000@taobao_pc_10.7.0", "needTabs": "true", "areaCode": "CN", "vm": "nw", "countryNum": "156", "m": "pc", "page": page, "n": 48, "q": keyword, "qSource": "url", "pageSource": "", "channelSrp": "", "tab": "all", "pageSize": str(page_size), "sourceS": "2", "ntoffset": "0", "filterTag": "", "service": "", "prop": "", "loc": "", "categoryp": "", "screenResolution": "1920x1080", "viewResolution": "1092x4722", "userAgent": headers["user-agent"], "couponUnikey": "", "subTabId": "", "np": "", "clientType": "h5", "isNewDomainAb": "false", "forceOldDomain": "false", }, } def _request_search_page(self, keyword, page): t = str(int(time.time() * 1000)) data = self._build_search_payload(keyword, page) data_str = json.dumps(data, separators=(",", ":")) sign = self.get_token(t, MTOP_APP_KEY, data_str) params = { "jsv": "2.7.4", "appKey": MTOP_APP_KEY, "t": t, "sign": sign, "api": "mtop.relationrecommend.wirelessrecommend.recommend", "v": "2.0", "timeout": "10000", "type": "jsonp", "dataType": "jsonp", "callback": "", "data": data_str, } return self.session.get(MTOP_URL, params=params, timeout=30) def _parse_jsonp_body(self, res_text): res_text = (res_text or "").strip() json_str = res_text m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL) if m: json_str = m.group(1) return json.loads(json_str) def get_search(self): keyword = self.product if self.brand: keyword = (self.brand + " " + self.product).strip() if self.product_desc: keyword = (keyword + " " + self.product_desc).strip() for page in range(1, SEARCH_MAX_PAGE + 1): logger.info(f"正在爬取关键词:{keyword},{page}页数据") for attempt in range(1, REQUEST_RETRY_COUNT + 1): try: if not self.session: self.init_session() res = self._request_search_page(keyword, page) if res.status_code != 200: logger.warning( "请求失败 HTTP %s,第%s/%s 次重试", res.status_code, attempt, REQUEST_RETRY_COUNT, ) time.sleep(random.randint(3, 8)) continue except Exception as e: if self._is_transport_error(e): logger.warning( "检测到网络传输异常(curl),重建会话后重试: %s", e, ) self.init_session() logger.warning( "请求异常,第%s/%s 次重试: %s", attempt, REQUEST_RETRY_COUNT, e, ) # 指数退避,避免连续瞬时失败 time.sleep(min(3 * attempt, 10)) continue try: json_data = self._parse_jsonp_body(res.text) item_array = json_data.get("data", {}).get("itemsArray", []) break except Exception as e: logger.warning( "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s", self.account_name, e, ) if self.update_cookie(): self.init_session() else: return else: logger.warning("关键词 %s 第 %s 页连续重试失败", keyword, page) continue if not item_array: logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page) return for raw in item_array: item_id = raw.get("item_id", "") if not item_id: continue item_title = self.get_html_content(raw.get("title") or "") if self.brand not in item_title: self.is_no_product += 1 continue if self.product not in item_title: self.is_no_product += 1 continue if "+" in item_title: continue if self.product_desc: if self.product_desc in item_title: crawl_product_desc = self.product_desc else: crawl_product_desc = "" else: crawl_product_desc = "" self.is_no_product = 0 status = 1 if self.product_keyword: search_keyword_list = self.product_keyword.split(",") for search_keyword in search_keyword_list: if search_keyword.strip() not in item_title: status = 0 if status == 0: continue item_price = raw.get("price") item_price_show = raw.get("priceShow", {}).get("price", 0) item_sales = raw.get("realSales") or "" sale_num = "" sales_m = re.search(r"(.*?)人付款", item_sales) if sales_m: sale_num = sales_m.group(1) item_url = self._normalize_url(raw.get("auctionURL")) shop_name = raw.get("shopInfo", {}).get("title", "") area_str = (raw.get("procity", "") or "").strip() city_id, province_id, city, province = get_city(area_str) shop_url = self._normalize_url( raw.get("shopInfo", {}).get("url", "") ) structured_list = raw.get("structuredUSPInfo",{}) for structured in structured_list: if structured.get("propertyName","") == "规格": crawl_product_desc = structured.get("propertyValueName","") pic_path = raw.get("pic_path", "") raw_price = item_price_show if raw_price in (None, ""): price = Decimal("0.00") else: try: price = Decimal(str(raw_price)).quantize(Decimal("0.00")) except (InvalidOperation, ValueError): price = Decimal("0.00") scrape_date = time.strftime("%Y-%m-%d") update_time = time.strftime("%Y-%m-%d %H:%M:%S") snapshot_url = self._normalize_url(pic_path) if pic_path else "" # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致 product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": item_title, "spec": crawl_product_desc, "one_price": "", "detail_url": item_url, "shop_name": shop_name, "anonymous_store_name": "", "shop_url": shop_url, "city_name": "", "city_id": "", "province_name": "", "province_id": "", "shipment_city_name": city, "shipment_city_id": city_id, "shipment_province_name": province, "shipment_province_id": province_id, "area_info": area_str, "factory_name": "", "scrape_date": scrape_date, "price": price, "sales": sale_num, "stock_count": "", "snapshot_url": "", "approval_num": "", "produced_time": "", "deadline": "", "update_time": update_time, "insert_time": update_time, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 0 } try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) logger.info( "关键词 %s 第 %s 页爬取完成", keyword, page, ) total_page = ( (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage") ) try: total_page_int = int(total_page) if total_page is not None else 50 except (TypeError, ValueError): total_page_int = 50 if page >= total_page_int: break if self.is_no_product > 20: break sleep_second = random.uniform(30, 60) logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second) time.sleep(sleep_second) def update_cookie(self): taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product) if not taobao_auto.run(): return False safe_name = self._sql_literal(self.account_name) sql_account = ( f"select * from `retrieve_collect_equipment_account` where `username`='{safe_name}'" ) account_list = self.db.select_data(sql_account) if not account_list: logger.error("账号 %s 未查询到 cookie 信息", self.account_name) return False cookie_str = account_list[0].get("cookie_str") if not cookie_str: logger.error("账号 %s cookie 为空", self.account_name) return False try: self.cookies = json.loads(cookie_str) except Exception as e: logger.error("账号 %s cookie 解析失败: %s", self.account_name, e) return False return True def get_account(self): sql_account = """ SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0 """ account_list = self.db.select_data(sql_account,self.account_id) if not account_list: return False account_dict = account_list[0] self.ip = account_dict.get("ip") cookie_str = account_dict.get("cookie_str") self.ip = account_dict.get("ip") self.account_name = account_dict.get("username") self.login_username = account_dict.get("phone", "") self.login_password = account_dict.get("password", "") self.cookie_stamp = account_dict.get("update_time") if self.ip: account_proxy = f"http://{self.ip}" self.proxies = {"http": account_proxy, "https": account_proxy} else: self.proxies = None need_refresh = ( not cookie_str or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC ) print(account_dict) if need_refresh: if not self.update_cookie(): return False else: try: self.cookies = json.loads(cookie_str) except Exception as e: logger.error("cookie 解析失败,尝试刷新: %s", e) if not self.update_cookie(): return False logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip) self.init_session() return True def run(self): if not self.get_account(): logger.info("==================当前无账号可用==================") self.success = False return self.pipeline.crawl_count, self.success logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip) self.get_search() logger.info( "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条", self.task_id, self.product, self.pipeline.crawl_count, ) return self.pipeline.crawl_count, self.success