| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- import hashlib
- import json
- import random
- import re
- import time
- from decimal import Decimal, InvalidOperation
- from curl_cffi import requests
- from lxml import etree
- from commons.Logger import get_spider_logger
- from commons.conn_mysql import MySQLPoolOnline
- from pipelines.drug_pipelines import DrugPipeline
- from spiders.taobao.taobao_login import (TaobaoAutoCrawl)
- from area_info.city_name_to_id import get_city
- logger = get_spider_logger("taobao")
- MTOP_APP_KEY = "12574478"
- MTOP_APP_ID = "34385"
- SEARCH_MAX_PAGE = 20
- REQUEST_RETRY_COUNT = 3
- COOKIE_MAX_AGE_SEC = 1800
- headers = {
- "accept": "*/*",
- "accept-language": "zh-CN,zh;q=0.9",
- "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall",
- "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": '"Windows"',
- "sec-fetch-dest": "script",
- "sec-fetch-mode": "no-cors",
- "sec-fetch-site": "same-site",
- "user-agent": (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
- "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
- ),
- }
- MTOP_URL = (
- "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/"
- )
- class TaobaoCrawl:
- def __init__(self, drug_dict=None):
- self.cookies = None
- self.db = MySQLPoolOnline()
- self.pipeline = DrugPipeline("taobao")
- self.session = None
- self.proxies = None
- self.account_name = None
- self.ip = None
- self.cookie_stamp = None
- self.platform = 1
- self.task_dict = drug_dict or {}
- self.collect_task_id = None
- self.success = True
- if self.task_dict:
- self.get_product_data()
- self.is_no_product = 0
- def get_product_data(self):
- self.task_id = self.task_dict["id"]
- self.company_id = self.task_dict["company_id"]
- self.product = self.task_dict["product_name"]
- self.product_desc = self.task_dict.get("product_specs", "")
- self.brand = self.task_dict.get("product_brand", "")
- self.product_keyword = self.task_dict.get("product_keyword", "")
- self.collect_task_id = self.task_dict.get("collect_task_id", "")
- self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
- self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
- self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
- self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
- self.account_id = self.task_dict.get("collect_equipment_account_id", "")
- self.collect_region_id = self.task_dict.get("collect_region_id", "")
- self.collect_round = self.task_dict.get("collect_round", 1)
- @staticmethod
- def _normalize_url(url):
- if not url:
- return ""
- url = str(url)
- if url.startswith("//"):
- return "https:" + url
- return url
- @staticmethod
- def _extract_shop_id(shop_url):
- if not shop_url:
- return ""
- shop_re = re.search(r"appUid=(\w+)", shop_url)
- if shop_re:
- return shop_re.group(1)
- return hashlib.md5(shop_url.encode("utf-8")).hexdigest()
- @staticmethod
- def _sql_literal(value):
- """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。"""
- if value is None:
- return ""
- return str(value).replace("'", "''").replace("\\", "\\\\")
- def init_session(self):
- self.session = requests.Session(impersonate="chrome124")
- self.session.cookies.update(self.cookies or {})
- self.session.headers.update(headers)
- if self.proxies:
- self.session.proxies.update(self.proxies)
- @staticmethod
- def _is_transport_error(err):
- msg = str(err or "")
- return ("curl: (16)" in msg) or ("Failed to perform" in msg)
- def get_token(self, t, app_key, data_str):
- _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "")
- token = _m_h5_tk.split("_")[0] if _m_h5_tk else ""
- text = f"{token}&{t}&{app_key}&{data_str}"
- return hashlib.md5(text.encode()).hexdigest()
- def get_html_content(self, res_html):
- if not res_html:
- return ""
- ele_html = etree.HTML(res_html)
- if ele_html is None:
- return str(res_html)
- text_list = ele_html.xpath(".//text()")
- return "".join(text_list)
- def _build_search_payload(self, keyword, page, page_size=50):
- return {
- "appId": MTOP_APP_ID,
- "params": {
- "device": "HMA-AL00",
- "isBeta": "false",
- "grayHair": "false",
- "from": "nt_history",
- "brand": "HUAWEI",
- "info": "wifi",
- "index": "4",
- "rainbow": "",
- "schemaType": "auction",
- "elderHome": "false",
- "isEnterSrpSearch": "true",
- "newSearch": "false",
- "network": "wifi",
- "subtype": "",
- "hasPreposeFilter": "false",
- "prepositionVersion": "v2",
- "client_os": "Android",
- "gpsEnabled": "false",
- "searchDoorFrom": "srp",
- "debug_rerankNewOpenCard": "false",
- "homePageVersion": "v7",
- "searchElderHomeOpen": "false",
- "search_action": "initiative",
- "sugg": "_4_1",
- "sversion": "13.6",
- "style": "list",
- "ttid": "600000@taobao_pc_10.7.0",
- "needTabs": "true",
- "areaCode": "CN",
- "vm": "nw",
- "countryNum": "156",
- "m": "pc",
- "page": page,
- "n": 48,
- "q": keyword,
- "qSource": "url",
- "pageSource": "",
- "channelSrp": "",
- "tab": "all",
- "pageSize": str(page_size),
- "sourceS": "2",
- "ntoffset": "0",
- "filterTag": "",
- "service": "",
- "prop": "",
- "loc": "",
- "categoryp": "",
- "screenResolution": "1920x1080",
- "viewResolution": "1092x4722",
- "userAgent": headers["user-agent"],
- "couponUnikey": "",
- "subTabId": "",
- "np": "",
- "clientType": "h5",
- "isNewDomainAb": "false",
- "forceOldDomain": "false",
- },
- }
- def _request_search_page(self, keyword, page):
- t = str(int(time.time() * 1000))
- data = self._build_search_payload(keyword, page)
- data_str = json.dumps(data, separators=(",", ":"))
- sign = self.get_token(t, MTOP_APP_KEY, data_str)
- params = {
- "jsv": "2.7.4",
- "appKey": MTOP_APP_KEY,
- "t": t,
- "sign": sign,
- "api": "mtop.relationrecommend.wirelessrecommend.recommend",
- "v": "2.0",
- "timeout": "10000",
- "type": "jsonp",
- "dataType": "jsonp",
- "callback": "",
- "data": data_str,
- }
- return self.session.get(MTOP_URL, params=params, timeout=30)
- def _parse_jsonp_body(self, res_text):
- res_text = (res_text or "").strip()
- json_str = res_text
- m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL)
- if m:
- json_str = m.group(1)
- return json.loads(json_str)
- def get_search(self):
- keyword = self.product
- if self.brand:
- keyword = (self.brand + " " + self.product).strip()
- if self.product_desc:
- keyword = (keyword + " " + self.product_desc).strip()
- for page in range(1, SEARCH_MAX_PAGE + 1):
- logger.info(f"正在爬取关键词:{keyword},{page}页数据")
- for attempt in range(1, REQUEST_RETRY_COUNT + 1):
- try:
- if not self.session:
- self.init_session()
- res = self._request_search_page(keyword, page)
- if res.status_code != 200:
- logger.warning(
- "请求失败 HTTP %s,第%s/%s 次重试",
- res.status_code,
- attempt,
- REQUEST_RETRY_COUNT,
- )
- time.sleep(random.randint(3, 8))
- continue
- except Exception as e:
- if self._is_transport_error(e):
- logger.warning(
- "检测到网络传输异常(curl),重建会话后重试: %s",
- e,
- )
- self.init_session()
- logger.warning(
- "请求异常,第%s/%s 次重试: %s",
- attempt,
- REQUEST_RETRY_COUNT,
- e,
- )
- # 指数退避,避免连续瞬时失败
- time.sleep(min(3 * attempt, 10))
- continue
- try:
- json_data = self._parse_jsonp_body(res.text)
- item_array = json_data.get("data", {}).get("itemsArray", [])
- break
- except Exception as e:
- logger.warning(
- "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s",
- self.account_name,
- e,
- )
- if self.update_cookie():
- self.init_session()
- else:
- return
- else:
- logger.warning("关键词 %s 第 %s 页连续重试失败", keyword, page)
- continue
- if not item_array:
- logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page)
- return
- for raw in item_array:
- item_id = raw.get("item_id", "")
- if not item_id:
- continue
- item_title = self.get_html_content(raw.get("title") or "")
- if self.brand not in item_title:
- self.is_no_product += 1
- continue
- if self.product not in item_title:
- self.is_no_product += 1
- continue
- if "+" in item_title:
- continue
- if self.product_desc:
- if self.product_desc in item_title:
- crawl_product_desc = self.product_desc
- else:
- crawl_product_desc = ""
- else:
- crawl_product_desc = ""
- self.is_no_product = 0
- status = 1
- if self.product_keyword:
- search_keyword_list = self.product_keyword.split(",")
- for search_keyword in search_keyword_list:
- if search_keyword.strip() not in item_title:
- status = 0
- if status == 0:
- continue
- item_price = raw.get("price")
- item_price_show = raw.get("priceShow", {}).get("price", 0)
- item_sales = raw.get("realSales") or ""
- sale_num = ""
- sales_m = re.search(r"(.*?)人付款", item_sales)
- if sales_m:
- sale_num = sales_m.group(1)
- item_url = self._normalize_url(raw.get("auctionURL"))
- shop_name = raw.get("shopInfo", {}).get("title", "")
- area_str = (raw.get("procity", "") or "").strip()
- city_id, province_id, city, province = get_city(area_str)
- shop_url = self._normalize_url(
- raw.get("shopInfo", {}).get("url", "")
- )
- pic_path = raw.get("pic_path", "")
- raw_price = item_price_show
- if raw_price in (None, ""):
- price = Decimal("0.00")
- else:
- try:
- price = Decimal(str(raw_price)).quantize(Decimal("0.00"))
- except (InvalidOperation, ValueError):
- price = Decimal("0.00")
- scrape_date = time.strftime("%Y-%m-%d")
- update_time = time.strftime("%Y-%m-%d %H:%M:%S")
- snapshot_url = self._normalize_url(pic_path) if pic_path else ""
- # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
- product = {
- "platform": self.platform,
- "item_id": item_id,
- "enterprise_id": self.company_id,
- "product_name": item_title,
- "spec": crawl_product_desc,
- "one_price": "",
- "detail_url": item_url,
- "shop_name": shop_name,
- "anonymous_store_name": "",
- "shop_url": shop_url,
- "city_name": "",
- "city_id": "",
- "province_name": "",
- "province_id": "",
- "shipment_city_name": city,
- "shipment_city_id": city_id,
- "shipment_province_name": province,
- "shipment_province_id": province_id,
- "area_info": area_str,
- "factory_name": "",
- "scrape_date": scrape_date,
- "price": price,
- "sales": sale_num,
- "stock_count": "",
- "snapshot_url": "",
- "approval_num": "",
- "produced_time": "",
- "deadline": "",
- "update_time": update_time,
- "insert_time": update_time,
- "number": 1,
- "product_brand": self.brand or "",
- "collect_task_id": self.collect_task_id,
- "search_name": self.product,
- "company_name": "",
- "collect_config_info": json.dumps(
- {
- "sampling_cycle": self.sampling_cycle,
- "sampling_start_time": self.sampling_start_time,
- "sampling_end_time": self.sampling_end_time,
- }
- ),
- "account_id": self.account_id,
- "collect_region_id": self.collect_region_id,
- "collect_round": self.collect_round,
- "is_sold_out": 0
- }
- try:
- self.pipeline.storge_data(product)
- logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
- except Exception as e:
- logger.exception("写入数据库失败: %s", e)
- logger.info(
- "关键词 %s 第 %s 页爬取完成",
- keyword,
- page,
- )
- total_page = (
- (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage")
- )
- try:
- total_page_int = int(total_page) if total_page is not None else 50
- except (TypeError, ValueError):
- total_page_int = 50
- if page >= total_page_int:
- break
- if self.is_no_product > 20:
- break
- sleep_second = random.uniform(30, 60)
- logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second)
- time.sleep(sleep_second)
- def update_cookie(self):
- taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product)
- if not taobao_auto.run():
- return False
- safe_name = self._sql_literal(self.account_name)
- sql_account = (
- f"select * from `retrieve_collect_equipment_account` where `username`='{safe_name}'"
- )
- account_list = self.db.select_data(sql_account)
- if not account_list:
- logger.error("账号 %s 未查询到 cookie 信息", self.account_name)
- return False
- cookie_str = account_list[0].get("cookie_str")
- if not cookie_str:
- logger.error("账号 %s cookie 为空", self.account_name)
- return False
- try:
- self.cookies = json.loads(cookie_str)
- except Exception as e:
- logger.error("账号 %s cookie 解析失败: %s", self.account_name, e)
- return False
- return True
- def get_account(self):
- sql_account = """
- SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0
- """
- account_list = self.db.select_data(sql_account,self.account_id)
- if not account_list:
- return False
- account_dict = account_list[0]
- self.ip = account_dict.get("ip")
- cookie_str = account_dict.get("cookie_str")
- self.ip = account_dict.get("ip")
- self.account_name = account_dict.get("username")
- self.login_username = account_dict.get("phone", "")
- self.login_password = account_dict.get("password", "")
- self.cookie_stamp = account_dict.get("update_time")
- if self.ip:
- account_proxy = f"http://{self.ip}"
- self.proxies = {"http": account_proxy, "https": account_proxy}
- else:
- self.proxies = None
- need_refresh = (
- not cookie_str
- or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
- )
- print(account_dict)
- if need_refresh:
- if not self.update_cookie():
- return False
- else:
- try:
- self.cookies = json.loads(cookie_str)
- except Exception as e:
- logger.error("cookie 解析失败,尝试刷新: %s", e)
- if not self.update_cookie():
- return False
- logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
- self.init_session()
- return True
- def run(self):
- if not self.get_account():
- logger.info("==================当前无账号可用==================")
- self.success = False
- return self.pipeline.crawl_count, self.success
- logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
- self.get_search()
- logger.info(
- "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条",
- self.task_id,
- self.product,
- self.pipeline.crawl_count,
- )
- return self.pipeline.crawl_count, self.success
|