import hashlib import json import random import re import time from decimal import Decimal, InvalidOperation from curl_cffi import requests from lxml import etree from commons.Logger import get_spider_logger from commons.conn_mysql import MySQLPoolOnline from pipelines.drug_pipelines import DrugPipeline from spiders.taobao.snapshot_taobao_login import (TaobaoAutoCrawl) from area_info.city_name_to_id import get_city from oss_upload.oss_upload import AliyunOSSUploader logger = get_spider_logger("taobao") from urllib.parse import quote MTOP_APP_KEY = "12574478" MTOP_APP_ID = "34385" SEARCH_MAX_PAGE = 20 REQUEST_RETRY_COUNT = 3 COOKIE_MAX_AGE_SEC = 3600 def build_taobao_search_url(keyword: str, page: int = 1) -> str: """ 构建淘宝搜索URL 参数: keyword: 搜索关键词 (例如: "999 玉屏风口服液 10支") page: 页码,从1开始 返回: 完整的淘宝搜索URL字符串 """ # 对关键词进行URL编码(空格转为%20) encoded_keyword = quote(keyword, safe='').replace(' ', '%20') # 固定参数 fixed_params = { "_input_charset": "utf-8", "commend": "all", "ie": "utf8", "preLoadOrigin": "https://www.taobao.com", "search_type": "item", "source": "suggest", "sourceId": "tb.index", "spm": "a21bo.jianhua/a.search_history.d1", "ssid": "s5-e", "tab": "all", "suggest_query": "", } # 动态参数 dynamic_params = { "q": encoded_keyword, "page": str(page), } # 合并参数 all_params = {**fixed_params, **dynamic_params} # 构建查询字符串并返回完整URL query_string = "&".join([f"{k}={v}" for k, v in all_params.items()]) return f"https://s.taobao.com/search?{query_string}" def extract_item_data(item_element): """ 从商品元素中提取数据 """ result = { "item_id": "", "title": "", "price": "", "realSales": "", "shopInfo": {"title": ""}, "procity": "", "auctionURL": "" } # 1. 提取 item_id - 从 a 标签的 id 属性 a_elem = item_element.ele('xpath=.//a[contains(@id, "item_id_")]') if a_elem: item_id_full = a_elem.attr('id') if item_id_full: result["item_id"] = item_id_full.replace("item_id_", "") # 2. 提取 title - 从 div 的 title 属性 title_elem = item_element.ele('xpath=.//div[contains(@class, "title--")]') if title_elem: title = title_elem.attr('title') if not title: # 如果没有 title 属性,取文本内容 title = title_elem.text result["title"] = title # 3. 提取 price - 整数部分 + 小数部分 price_int = item_element.ele('xpath=.//div[contains(@class, "priceInt--")]') price_float = item_element.ele('xpath=.//div[contains(@class, "priceFloat--")]') if price_int and price_float: result["price"] = f"{price_int.text}.{price_float.text.replace('.', '')}" # 4. 提取 realSales (销量) sales_elem = item_element.ele('xpath=.//span[contains(@class, "realSales--")]') if sales_elem: result["realSales"] = sales_elem.text # 5. 提取 shopInfo.title (店铺名称) shop_elem = item_element.ele('xpath=.//span[contains(@class, "shopNameText--")]') if shop_elem: result["shopInfo"]["title"] = shop_elem.text # 6. 提取 procity (发货地) procity_elem = item_element.ele('xpath=.//div[contains(@class, "procity--")]/span') if procity_elem: result["procity"] = procity_elem.text # 7. 提取 auctionURL if a_elem: href = a_elem.attr('href') if href: result["auctionURL"] = href elif result["item_id"]: result["auctionURL"] = f"https://item.taobao.com/item.htm?id={result['item_id']}" return result headers = { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "script", "sec-fetch-mode": "no-cors", "sec-fetch-site": "same-site", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36" ), } MTOP_URL = ( "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/" ) class TaobaoCrawl: def __init__(self, drug_dict=None): self.cookies = None self.db = MySQLPoolOnline() self.pipeline = DrugPipeline("taobao") self.session = None self.proxies = None self.account_name = None self.ip = None self.cookie_stamp = None self.platform = 1 self.task_dict = drug_dict or {} self.collect_task_id = None self.success = True if self.task_dict: self.get_product_data() self.is_no_product = 0 self.driver='' self.ossuploader = AliyunOSSUploader() def get_product_data(self): self.task_id = self.task_dict["id"] self.company_id = self.task_dict["company_id"] self.product = self.task_dict["product_name"] self.product_desc = self.task_dict.get("product_specs", "") self.brand = self.task_dict.get("product_brand", "") self.product_keyword = self.task_dict.get("product_keyword", "") self.collect_task_id = self.task_dict.get("collect_task_id", "") self.sampling_cycle = self.task_dict.get("sampling_cycle", "") self.sampling_start_time = self.task_dict.get("sampling_start_time", "") self.sampling_end_time = self.task_dict.get("sampling_end_time", "") self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "") self.account_id = self.task_dict.get("collect_equipment_account_id", "") self.collect_region_id = self.task_dict.get("collect_region_id", "") self.collect_round = self.task_dict.get("collect_round", 1) @staticmethod def _normalize_url(url): if not url: return "" url = str(url) if url.startswith("//"): return "https:" + url return url @staticmethod def _extract_shop_id(shop_url): if not shop_url: return "" shop_re = re.search(r"appUid=(\w+)", shop_url) if shop_re: return shop_re.group(1) return hashlib.md5(shop_url.encode("utf-8")).hexdigest() @staticmethod def _sql_literal(value): """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。""" if value is None: return "" return str(value).replace("'", "''").replace("\\", "\\\\") def init_session(self): self.session = requests.Session(impersonate="chrome124") self.session.cookies.update(self.cookies or {}) self.session.headers.update(headers) if self.proxies: self.session.proxies.update(self.proxies) @staticmethod def _is_transport_error(err): msg = str(err or "") return ("curl: (16)" in msg) or ("Failed to perform" in msg) def get_token(self, t, app_key, data_str): _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "") token = _m_h5_tk.split("_")[0] if _m_h5_tk else "" text = f"{token}&{t}&{app_key}&{data_str}" return hashlib.md5(text.encode()).hexdigest() def get_html_content(self, res_html): if not res_html: return "" ele_html = etree.HTML(res_html) if ele_html is None: return str(res_html) text_list = ele_html.xpath(".//text()") return "".join(text_list) def _build_search_payload(self, keyword, page, page_size=50): return { "appId": MTOP_APP_ID, "params": { "device": "HMA-AL00", "isBeta": "false", "grayHair": "false", "from": "nt_history", "brand": "HUAWEI", "info": "wifi", "index": "4", "rainbow": "", "schemaType": "auction", "elderHome": "false", "isEnterSrpSearch": "true", "newSearch": "false", "network": "wifi", "subtype": "", "hasPreposeFilter": "false", "prepositionVersion": "v2", "client_os": "Android", "gpsEnabled": "false", "searchDoorFrom": "srp", "debug_rerankNewOpenCard": "false", "homePageVersion": "v7", "searchElderHomeOpen": "false", "search_action": "initiative", "sugg": "_4_1", "sversion": "13.6", "style": "list", "ttid": "600000@taobao_pc_10.7.0", "needTabs": "true", "areaCode": "CN", "vm": "nw", "countryNum": "156", "m": "pc", "page": page, "n": 48, "q": keyword, "qSource": "url", "pageSource": "", "channelSrp": "", "tab": "all", "pageSize": str(page_size), "sourceS": "2", "ntoffset": "0", "filterTag": "", "service": "", "prop": "", "loc": "", "categoryp": "", "screenResolution": "1920x1080", "viewResolution": "1092x4722", "userAgent": headers["user-agent"], "couponUnikey": "", "subTabId": "", "np": "", "clientType": "h5", "isNewDomainAb": "false", "forceOldDomain": "false", }, } def _request_search_page(self, keyword, page): t = str(int(time.time() * 1000)) data = self._build_search_payload(keyword, page) data_str = json.dumps(data, separators=(",", ":")) sign = self.get_token(t, MTOP_APP_KEY, data_str) params = { "jsv": "2.7.4", "appKey": MTOP_APP_KEY, "t": t, "sign": sign, "api": "mtop.relationrecommend.wirelessrecommend.recommend", "v": "2.0", "timeout": "10000", "type": "jsonp", "dataType": "jsonp", "callback": "", "data": data_str, } return self.session.get(MTOP_URL, params=params, timeout=30) def _parse_jsonp_body(self, res_text): res_text = (res_text or "").strip() json_str = res_text m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL) if m: json_str = m.group(1) return json.loads(json_str) def get_search(self): keyword = self.product if self.brand: keyword = (self.brand + " " + self.product).strip() if self.product_desc: keyword = (keyword + " " + self.product_desc).strip() for page in range(1, SEARCH_MAX_PAGE + 1): logger.info(f"正在爬取关键词:{keyword},{page}页数据") # input_box = self.driver.ele('xpath=//*[@id="q"]') # input_box.input(keyword) # time.sleep(1) # button = self.driver.ele('xpath=//*[@id="J_TSearchForm"]/div[2]/button') # button.click() # time.sleep(2) base_url = build_taobao_search_url(keyword) tab = self.driver.latest_tab tab.listen.start('https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/') # 开始监听,指定获取包含该文本的数据包 if page==1: tab.get(base_url) else: next_btn = tab.ele('xpath=//button[contains(@aria-label, "下一页")]') if not next_btn.click(): break for _ in range(5): response = tab.listen.wait() # 等待并获取一个数据包 if len(response.url)>3000: res = response.response.raw_body break else: continue time.sleep(1.5) try: json_data = self._parse_jsonp_body(res) item_array = json_data.get("data", {}).get("itemsArray", []) except Exception as e: logger.warning( "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s", self.account_name, e, ) break if not item_array: logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page) return elems = tab.eles('xpath=//*[@id="content_items_wrapper"]/div') for s,raw in enumerate(item_array): try: item_id = raw.get("item_id", "") if not item_id: continue item_title = self.get_html_content(raw.get("title") or "") if self.brand not in item_title: self.is_no_product += 1 continue if self.product not in item_title: self.is_no_product += 1 continue if "+" in item_title: continue if self.product_desc: if self.product_desc in item_title: crawl_product_desc = self.product_desc else: crawl_product_desc = "" else: crawl_product_desc = "" self.is_no_product = 0 status = 1 if self.product_keyword: search_keyword_list = self.product_keyword.split(",") for search_keyword in search_keyword_list: if search_keyword.strip() not in item_title: status = 0 if status == 0: continue item_price = raw.get("price") item_price_show = raw.get("priceShow", {}).get("price", 0) item_sales = raw.get("realSales") or "" sale_num = "" sales_m = re.search(r"(.*?)人付款", item_sales) if sales_m: sale_num = sales_m.group(1) item_url = self._normalize_url(raw.get("auctionURL")) shop_name = raw.get("shopInfo", {}).get("title", "") area_str = (raw.get("procity", "") or "").strip() city_id, province_id, city, province = get_city(area_str) shop_url = self._normalize_url( raw.get("shopInfo", {}).get("url", "") ) structured_list = raw.get("structuredUSPInfo",{}) for structured in structured_list: if structured.get("propertyName","") == "规格": crawl_product_desc = structured.get("propertyValueName","") pic_path = raw.get("pic_path", "") raw_price = item_price_show if raw_price in (None, ""): price = Decimal("0.00") else: try: price = Decimal(str(raw_price)).quantize(Decimal("0.00")) except (InvalidOperation, ValueError): price = Decimal("0.00") upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest() try: jpg_bytes = elems[s].get_screenshot(as_bytes="jpg") snapshot_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key)) except: snapshot_url='' scrape_date = time.strftime("%Y-%m-%d") update_time = time.strftime("%Y-%m-%d %H:%M:%S") #snapshot_url = self._normalize_url(pic_path) if pic_path else "" # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致 product = { "platform": self.platform, "item_id": item_id, "enterprise_id": self.company_id, "product_name": item_title, "spec": crawl_product_desc, "one_price": "", "detail_url": item_url, "shop_name": shop_name, "anonymous_store_name": "", "shop_url": shop_url, "city_name": "", "city_id": "", "province_name": "", "province_id": "", "shipment_city_name": city, "shipment_city_id": city_id, "shipment_province_name": province, "shipment_province_id": province_id, "area_info": area_str, "factory_name": "", "scrape_date": scrape_date, "price": price, "sales": sale_num, "stock_count": "", "snapshot_url": snapshot_url, "approval_num": "", "produced_time": "", "deadline": "", "update_time": update_time, "insert_time": update_time, "number": 1, "product_brand": self.brand or "", "collect_task_id": self.collect_task_id, "search_name": self.product, "company_name": "", "collect_config_info": json.dumps( { "sampling_cycle": self.sampling_cycle, "sampling_start_time": self.sampling_start_time, "sampling_end_time": self.sampling_end_time, } ), "account_id": self.account_id, "collect_region_id": self.collect_region_id, "collect_round": self.collect_round, "is_sold_out": 0 } try: self.pipeline.storge_data(product) logger.info("%s", json.dumps(product, ensure_ascii=False, default=str)) except Exception as e: logger.exception("写入数据库失败: %s", e) except: continue logger.info( "关键词 %s 第 %s 页爬取完成", keyword, page, ) total_page = ( (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage") ) try: total_page_int = int(total_page) if total_page is not None else 50 except (TypeError, ValueError): total_page_int = 50 if page >= total_page_int: break if self.is_no_product > 20: break sleep_second = random.uniform(30, 60) logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second) time.sleep(sleep_second) def update_cookie(self): taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product) self.driver=taobao_auto.run() if not self.driver : return False return True def get_account(self): sql_account = """ SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0 """ account_list = self.db.select_data(sql_account,self.account_id) if not account_list: return False account_dict = account_list[0] self.ip = account_dict.get("ip") cookie_str = account_dict.get("cookie_str") self.ip = account_dict.get("ip") self.account_name = account_dict.get("username") self.login_username = account_dict.get("phone", "") self.login_password = account_dict.get("password", "") self.cookie_stamp = account_dict.get("update_time") if self.ip: account_proxy = f"http://{self.ip}" self.proxies = {"http": account_proxy, "https": account_proxy} else: self.proxies = None need_refresh = ( not cookie_str or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC ) if 1: if not self.update_cookie(): return False logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip) return True def run(self): if not self.get_account(): logger.info("==================当前无账号可用==================") self.success = False return self.pipeline.crawl_count, self.success logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip) self.get_search() self.driver.quit() logger.info( "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条", self.task_id, self.product, self.pipeline.crawl_count, ) return self.pipeline.crawl_count, self.success