Browse Source

淘宝快照

zhuoyuncheng 1 week ago
parent
commit
454e5117c1

+ 110 - 0
snapshot_start_run_taobao.py

@@ -0,0 +1,110 @@
+import json
+import random
+import time
+import requests
+from commons.Logger import logger
+from commons.conn_mysql import MySQLPoolOnline, MySQLPool39
+from spiders.taobao.snapshot_taobao_crawl import TaobaoCrawl
+import schedule
+from commons.feishu_webhook import send_text
+from commons.config import TB_DEVICE_ID
+
+platform_name = "淘宝"
+
+
+class TaobaoMain:
+    def __init__(self):
+        # self.db_online = MySQLPool39()
+        self.db_online = MySQLPoolOnline()
+        self.crawl_count = ""
+        self.task_id = ""
+        self.task_dict = None
+
+    def get_status(self, status):
+        if status not in (2, 3, 4):
+            logger.warning(f"未知状态值: {status}, 跳过状态上报")
+            return
+        if status == 2:
+            parmas = {
+                "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
+                "start_time": int(time.time())
+            }
+        if status == 3:
+            parmas = {
+                "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 1,
+                "real_count": self.crawl_count, "end_time": int(time.time()),
+            }
+        if status == 4:
+            parmas = {
+                "collect_task_allocate_id": self.task_id, "status": status, "finish_status": 0,
+                "end_time": int(time.time())}
+        # url = "http://scheduletest.dfwy.tech/api/collect_equipment_execute/result_report"
+        url = "http://scheduleapi.findit.ltd/api/collect_equipment_execute/result_report"
+
+        try:
+            res = requests.get(url, params=parmas, timeout=20)
+            res.raise_for_status()
+            logger.info("状态上报: %s", res.text[:500])
+        except Exception as e:
+            logger.warning(f"状态上报失败: {e}")
+
+    def heartbeat_task(self):
+        url = "https://scheduleapi.findit.ltd/api/collect_equipment_execute/heartbeat"
+        params = {
+            "collect_task_allocate_id": self.task_id,
+        }
+        try:
+            res = requests.get(url, params=params, timeout=20)
+            logger.info("心跳任务上报成功")
+        except Exception as e:
+            logger.info(f"心跳任务上报失败{str(e)}")
+
+    def get_task(self):
+        """获取当前设备绑定的京东待执行任务。"""
+        sql = """
+            SELECT t.*
+            FROM `retrieve_collect_task_allocate` t
+            INNER JOIN `retrieve_collect_equipment_account` a
+                ON t.`collect_equipment_account_id` = a.`id`
+            WHERE a.`device_id` = %s
+              AND t.`platform` = 1
+              AND t.`status` = 1
+            LIMIT 1
+        """
+        task_list = self.db_online.select_data(sql, (TB_DEVICE_ID,))
+
+        if not task_list:
+            return {}
+
+        task_dict = task_list[0]
+        self.task_id = task_dict["id"]
+        return task_dict
+
+    def run(self):
+        self.task_dict = self.get_task()
+        if not self.task_dict:
+            logger.info(f"{platform_name}暂无任务")
+            return
+        # logger.info(self.task_dict)
+        self.get_status(2)
+        self.crawl_count, is_success = TaobaoCrawl(self.task_dict).run()
+        self.heartbeat_task()
+
+        send_text(
+            f"""{str(time.strftime("%Y-%m-%d %H:%M:%S"))} 通知:\n平台: {platform_name}, 药品: {self.task_dict.get("product_name")}, 爬取数据: {self.crawl_count}条""")
+
+        if is_success:
+            self.get_status(3)
+        else:
+            self.get_status(4)
+
+
+if __name__ == '__main__':
+    # 每10分钟执行一次
+    while True:
+        TaobaoMain().run()
+        interval_time = random.randint(1200, 1800)
+        interval_time = random.randint(200, 300)
+
+        logger.info(f"程序睡眠{interval_time}秒后继续执行")
+        time.sleep(interval_time)

+ 609 - 0
spiders/taobao/snapshot_taobao_crawl.py

@@ -0,0 +1,609 @@
+import hashlib
+import json
+import random
+import re
+import time
+from decimal import Decimal, InvalidOperation
+from curl_cffi import requests
+from lxml import etree
+from commons.Logger import get_spider_logger
+from commons.conn_mysql import MySQLPoolOnline
+from pipelines.drug_pipelines import DrugPipeline
+from spiders.taobao.snapshot_taobao_login import (TaobaoAutoCrawl)
+from area_info.city_name_to_id import get_city
+from oss_upload.oss_upload import AliyunOSSUploader
+
+logger = get_spider_logger("taobao")
+from urllib.parse import quote
+
+
+
+MTOP_APP_KEY = "12574478"
+MTOP_APP_ID = "34385"
+SEARCH_MAX_PAGE = 20
+REQUEST_RETRY_COUNT = 3
+COOKIE_MAX_AGE_SEC = 3600
+
+
+def build_taobao_search_url(keyword: str, page: int = 1) -> str:
+    """
+    构建淘宝搜索URL
+
+    参数:
+        keyword: 搜索关键词 (例如: "999 玉屏风口服液 10支")
+        page: 页码,从1开始
+
+    返回:
+        完整的淘宝搜索URL字符串
+    """
+    # 对关键词进行URL编码(空格转为%20)
+    encoded_keyword = quote(keyword, safe='').replace(' ', '%20')
+
+    # 固定参数
+    fixed_params = {
+        "_input_charset": "utf-8",
+        "commend": "all",
+        "ie": "utf8",
+        "preLoadOrigin": "https://www.taobao.com",
+        "search_type": "item",
+        "source": "suggest",
+        "sourceId": "tb.index",
+        "spm": "a21bo.jianhua/a.search_history.d1",
+        "ssid": "s5-e",
+        "tab": "all",
+        "suggest_query": "",
+    }
+
+    # 动态参数
+    dynamic_params = {
+        "q": encoded_keyword,
+        "page": str(page),
+    }
+
+    # 合并参数
+    all_params = {**fixed_params, **dynamic_params}
+
+    # 构建查询字符串并返回完整URL
+    query_string = "&".join([f"{k}={v}" for k, v in all_params.items()])
+    return f"https://s.taobao.com/search?{query_string}"
+
+def extract_item_data(item_element):
+    """
+    从商品元素中提取数据
+    """
+    result = {
+        "item_id": "",
+        "title": "",
+        "price": "",
+        "realSales": "",
+        "shopInfo": {"title": ""},
+        "procity": "",
+        "auctionURL": ""
+    }
+
+    # 1. 提取 item_id - 从 a 标签的 id 属性
+    a_elem = item_element.ele('xpath=.//a[contains(@id, "item_id_")]')
+    if a_elem:
+        item_id_full = a_elem.attr('id')
+        if item_id_full:
+            result["item_id"] = item_id_full.replace("item_id_", "")
+
+    # 2. 提取 title - 从 div 的 title 属性
+    title_elem = item_element.ele('xpath=.//div[contains(@class, "title--")]')
+    if title_elem:
+        title = title_elem.attr('title')
+        if not title:
+            # 如果没有 title 属性,取文本内容
+            title = title_elem.text
+        result["title"] = title
+
+    # 3. 提取 price - 整数部分 + 小数部分
+    price_int = item_element.ele('xpath=.//div[contains(@class, "priceInt--")]')
+    price_float = item_element.ele('xpath=.//div[contains(@class, "priceFloat--")]')
+    if price_int and price_float:
+        result["price"] = f"{price_int.text}.{price_float.text.replace('.', '')}"
+
+    # 4. 提取 realSales (销量)
+    sales_elem = item_element.ele('xpath=.//span[contains(@class, "realSales--")]')
+    if sales_elem:
+        result["realSales"] = sales_elem.text
+
+    # 5. 提取 shopInfo.title (店铺名称)
+    shop_elem = item_element.ele('xpath=.//span[contains(@class, "shopNameText--")]')
+    if shop_elem:
+        result["shopInfo"]["title"] = shop_elem.text
+
+    # 6. 提取 procity (发货地)
+    procity_elem = item_element.ele('xpath=.//div[contains(@class, "procity--")]/span')
+    if procity_elem:
+        result["procity"] = procity_elem.text
+
+    # 7. 提取 auctionURL
+    if a_elem:
+        href = a_elem.attr('href')
+        if href:
+            result["auctionURL"] = href
+        elif result["item_id"]:
+            result["auctionURL"] = f"https://item.taobao.com/item.htm?id={result['item_id']}"
+
+    return result
+
+
+headers = {
+    "accept": "*/*",
+    "accept-language": "zh-CN,zh;q=0.9",
+    "referer": "https://s.taobao.com/search?page=1&q=999%E6%84%9F%E5%86%92%E7%81%B5&spm=a21bo.jianhua%2Fa.201867-main.d4_first.42f72a89n1ITMs&tab=mall",
+    "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
+    "sec-ch-ua-mobile": "?0",
+    "sec-ch-ua-platform": '"Windows"',
+    "sec-fetch-dest": "script",
+    "sec-fetch-mode": "no-cors",
+    "sec-fetch-site": "same-site",
+    "user-agent": (
+        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
+        "(KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
+    ),
+}
+
+MTOP_URL = (
+    "https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/"
+)
+
+
+
+
+
+class TaobaoCrawl:
+    def __init__(self, drug_dict=None):
+        self.cookies = None
+        self.db = MySQLPoolOnline()
+        self.pipeline = DrugPipeline("taobao")
+        self.session = None
+        self.proxies = None
+        self.account_name = None
+        self.ip = None
+        self.cookie_stamp = None
+        self.platform = 1
+        self.task_dict = drug_dict or {}
+        self.collect_task_id = None
+        self.success = True
+        if self.task_dict:
+            self.get_product_data()
+        self.is_no_product = 0
+        self.driver=''
+        self.ossuploader = AliyunOSSUploader()
+
+    def get_product_data(self):
+        self.task_id = self.task_dict["id"]
+        self.company_id = self.task_dict["company_id"]
+        self.product = self.task_dict["product_name"]
+        self.product_desc = self.task_dict.get("product_specs", "")
+        self.brand = self.task_dict.get("product_brand", "")
+        self.product_keyword = self.task_dict.get("product_keyword", "")
+        self.collect_task_id = self.task_dict.get("collect_task_id", "")
+        self.sampling_cycle = self.task_dict.get("sampling_cycle", "")
+        self.sampling_start_time = self.task_dict.get("sampling_start_time", "")
+        self.sampling_end_time = self.task_dict.get("sampling_end_time", "")
+        self.collect_equipment_id = self.task_dict.get("collect_equipment_id", "")
+        self.account_id = self.task_dict.get("collect_equipment_account_id", "")
+        self.collect_region_id = self.task_dict.get("collect_region_id", "")
+        self.collect_round = self.task_dict.get("collect_round", 1)
+
+    @staticmethod
+    def _normalize_url(url):
+        if not url:
+            return ""
+        url = str(url)
+        if url.startswith("//"):
+            return "https:" + url
+        return url
+
+    @staticmethod
+    def _extract_shop_id(shop_url):
+        if not shop_url:
+            return ""
+        shop_re = re.search(r"appUid=(\w+)", shop_url)
+        if shop_re:
+            return shop_re.group(1)
+        return hashlib.md5(shop_url.encode("utf-8")).hexdigest()
+
+    @staticmethod
+    def _sql_literal(value):
+        """避免拼接账号名时单引号打断 SQL(非完整防注入,仅兜底)。"""
+        if value is None:
+            return ""
+        return str(value).replace("'", "''").replace("\\", "\\\\")
+
+    def init_session(self):
+        self.session = requests.Session(impersonate="chrome124")
+        self.session.cookies.update(self.cookies or {})
+        self.session.headers.update(headers)
+        if self.proxies:
+            self.session.proxies.update(self.proxies)
+
+    @staticmethod
+    def _is_transport_error(err):
+        msg = str(err or "")
+        return ("curl: (16)" in msg) or ("Failed to perform" in msg)
+
+    def get_token(self, t, app_key, data_str):
+        _m_h5_tk = (self.cookies or {}).get("_m_h5_tk", "")
+        token = _m_h5_tk.split("_")[0] if _m_h5_tk else ""
+        text = f"{token}&{t}&{app_key}&{data_str}"
+        return hashlib.md5(text.encode()).hexdigest()
+
+    def get_html_content(self, res_html):
+        if not res_html:
+            return ""
+        ele_html = etree.HTML(res_html)
+        if ele_html is None:
+            return str(res_html)
+        text_list = ele_html.xpath(".//text()")
+        return "".join(text_list)
+
+    def _build_search_payload(self, keyword, page, page_size=50):
+        return {
+            "appId": MTOP_APP_ID,
+            "params": {
+                "device": "HMA-AL00",
+                "isBeta": "false",
+                "grayHair": "false",
+                "from": "nt_history",
+                "brand": "HUAWEI",
+                "info": "wifi",
+                "index": "4",
+                "rainbow": "",
+                "schemaType": "auction",
+                "elderHome": "false",
+                "isEnterSrpSearch": "true",
+                "newSearch": "false",
+                "network": "wifi",
+                "subtype": "",
+                "hasPreposeFilter": "false",
+                "prepositionVersion": "v2",
+                "client_os": "Android",
+                "gpsEnabled": "false",
+                "searchDoorFrom": "srp",
+                "debug_rerankNewOpenCard": "false",
+                "homePageVersion": "v7",
+                "searchElderHomeOpen": "false",
+                "search_action": "initiative",
+                "sugg": "_4_1",
+                "sversion": "13.6",
+                "style": "list",
+                "ttid": "600000@taobao_pc_10.7.0",
+                "needTabs": "true",
+                "areaCode": "CN",
+                "vm": "nw",
+                "countryNum": "156",
+                "m": "pc",
+                "page": page,
+                "n": 48,
+                "q": keyword,
+                "qSource": "url",
+                "pageSource": "",
+                "channelSrp": "",
+                "tab": "all",
+                "pageSize": str(page_size),
+                "sourceS": "2",
+                "ntoffset": "0",
+                "filterTag": "",
+                "service": "",
+                "prop": "",
+                "loc": "",
+                "categoryp": "",
+                "screenResolution": "1920x1080",
+                "viewResolution": "1092x4722",
+                "userAgent": headers["user-agent"],
+                "couponUnikey": "",
+                "subTabId": "",
+                "np": "",
+                "clientType": "h5",
+                "isNewDomainAb": "false",
+                "forceOldDomain": "false",
+            },
+        }
+
+    def _request_search_page(self, keyword, page):
+        t = str(int(time.time() * 1000))
+        data = self._build_search_payload(keyword, page)
+        data_str = json.dumps(data, separators=(",", ":"))
+        sign = self.get_token(t, MTOP_APP_KEY, data_str)
+        params = {
+            "jsv": "2.7.4",
+            "appKey": MTOP_APP_KEY,
+            "t": t,
+            "sign": sign,
+            "api": "mtop.relationrecommend.wirelessrecommend.recommend",
+            "v": "2.0",
+            "timeout": "10000",
+            "type": "jsonp",
+            "dataType": "jsonp",
+            "callback": "",
+            "data": data_str,
+        }
+        return self.session.get(MTOP_URL, params=params, timeout=30)
+
+    def _parse_jsonp_body(self, res_text):
+        res_text = (res_text or "").strip()
+        json_str = res_text
+        m = re.match(r"^[^(]*\((.*)\)\s*;?\s*$", res_text, re.DOTALL)
+        if m:
+            json_str = m.group(1)
+        return json.loads(json_str)
+
+    def get_search(self):
+        keyword = self.product
+        if self.brand:
+            keyword = (self.brand + " " + self.product).strip()
+
+        if self.product_desc:
+            keyword = (keyword + " " + self.product_desc).strip()
+
+        for page in range(1, SEARCH_MAX_PAGE + 1):
+            logger.info(f"正在爬取关键词:{keyword},{page}页数据")
+
+            # input_box = self.driver.ele('xpath=//*[@id="q"]')
+            # input_box.input(keyword)
+            # time.sleep(1)
+            # button = self.driver.ele('xpath=//*[@id="J_TSearchForm"]/div[2]/button')
+            # button.click()
+            # time.sleep(2)
+            base_url = build_taobao_search_url(keyword)
+            tab = self.driver.latest_tab
+
+            tab.listen.start('https://h5api.m.taobao.com/h5/mtop.relationrecommend.wirelessrecommend.recommend/2.0/')  # 开始监听,指定获取包含该文本的数据包
+            if page==1:
+                tab.get(base_url)
+            else:
+                next_btn = tab.ele('xpath=//button[contains(@aria-label, "下一页")]')
+                if not next_btn.click():
+                    break
+            for _ in range(5):
+                response = tab.listen.wait()  # 等待并获取一个数据包
+                if len(response.url)>3000:
+                    res = response.response.raw_body
+                    break
+                else:
+                    continue
+            time.sleep(1.5)
+            try:
+                json_data = self._parse_jsonp_body(res)
+                item_array = json_data.get("data", {}).get("itemsArray", [])
+            except Exception as e:
+                logger.warning(
+                    "解析数据异常,%s 账号可能退出登录,尝试重新登录: %s",
+                    self.account_name,
+                    e,
+                )
+                break
+
+            if not item_array:
+                logger.warning("关键词 %s 第 %s 页未获取到商品数据", keyword, page)
+                return
+            elems = tab.eles('xpath=//*[@id="content_items_wrapper"]/div')
+
+            for s,raw in enumerate(item_array):
+                try:
+                    item_id = raw.get("item_id", "")
+
+                    if not item_id:
+                        continue
+
+                    item_title = self.get_html_content(raw.get("title") or "")
+
+                    if self.brand not in item_title:
+                        self.is_no_product += 1
+                        continue
+                    if self.product not in item_title:
+                        self.is_no_product += 1
+                        continue
+                    if "+" in item_title:
+                        continue
+
+                    if self.product_desc:
+                        if self.product_desc in item_title:
+                            crawl_product_desc = self.product_desc
+                        else:
+                            crawl_product_desc = ""
+                    else:
+                        crawl_product_desc = ""
+                    self.is_no_product = 0
+                    status = 1
+                    if self.product_keyword:
+                        search_keyword_list = self.product_keyword.split(",")
+                        for search_keyword in search_keyword_list:
+                            if search_keyword.strip() not in item_title:
+                                status = 0
+                    if status == 0:
+                        continue
+
+                    item_price = raw.get("price")
+                    item_price_show = raw.get("priceShow", {}).get("price", 0)
+                    item_sales = raw.get("realSales") or ""
+
+                    sale_num = ""
+                    sales_m = re.search(r"(.*?)人付款", item_sales)
+                    if sales_m:
+                        sale_num = sales_m.group(1)
+
+                    item_url = self._normalize_url(raw.get("auctionURL"))
+                    shop_name = raw.get("shopInfo", {}).get("title", "")
+
+                    area_str = (raw.get("procity", "") or "").strip()
+                    city_id, province_id, city, province = get_city(area_str)
+
+                    shop_url = self._normalize_url(
+                        raw.get("shopInfo", {}).get("url", "")
+                    )
+
+                    structured_list = raw.get("structuredUSPInfo",{})
+                    for structured in structured_list:
+                        if structured.get("propertyName","") == "规格":
+                            crawl_product_desc = structured.get("propertyValueName","")
+
+                    pic_path = raw.get("pic_path", "")
+                    raw_price = item_price_show
+                    if raw_price in (None, ""):
+                        price = Decimal("0.00")
+                    else:
+                        try:
+                            price = Decimal(str(raw_price)).quantize(Decimal("0.00"))
+                        except (InvalidOperation, ValueError):
+                            price = Decimal("0.00")
+                    upload_key = hashlib.md5(item_url.encode("utf-8")).hexdigest()
+                    try:
+                        jpg_bytes = elems[s].get_screenshot(as_bytes="jpg")
+                        snapshot_url = self.ossuploader.upload_from_bytes(jpg_bytes, str(upload_key))
+                    except:
+                        snapshot_url=''
+                    scrape_date = time.strftime("%Y-%m-%d")
+                    update_time = time.strftime("%Y-%m-%d %H:%M:%S")
+                    #snapshot_url = self._normalize_url(pic_path) if pic_path else ""
+
+                    # 字段与 yaofangwang_crawl 对齐;键顺序须与 commons.sql_data.RETRIEVE_SCRAPE_INSERT_COLUMNS 一致
+                    product = {
+                        "platform": self.platform,
+                        "item_id": item_id,
+                        "enterprise_id": self.company_id,
+                        "product_name": item_title,
+                        "spec": crawl_product_desc,
+                        "one_price": "",
+                        "detail_url": item_url,
+                        "shop_name": shop_name,
+                        "anonymous_store_name": "",
+                        "shop_url": shop_url,
+                        "city_name": "",
+                        "city_id": "",
+                        "province_name": "",
+                        "province_id": "",
+                        "shipment_city_name": city,
+                        "shipment_city_id": city_id,
+                        "shipment_province_name": province,
+                        "shipment_province_id": province_id,
+                        "area_info": area_str,
+                        "factory_name": "",
+                        "scrape_date": scrape_date,
+                        "price": price,
+                        "sales": sale_num,
+                        "stock_count": "",
+                        "snapshot_url": snapshot_url,
+                        "approval_num": "",
+                        "produced_time": "",
+                        "deadline": "",
+                        "update_time": update_time,
+                        "insert_time": update_time,
+                        "number": 1,
+                        "product_brand": self.brand or "",
+                        "collect_task_id": self.collect_task_id,
+                        "search_name": self.product,
+                        "company_name": "",
+                        "collect_config_info": json.dumps(
+                            {
+                                "sampling_cycle": self.sampling_cycle,
+                                "sampling_start_time": self.sampling_start_time,
+                                "sampling_end_time": self.sampling_end_time,
+                            }
+                        ),
+                        "account_id": self.account_id,
+                        "collect_region_id": self.collect_region_id,
+                        "collect_round": self.collect_round,
+                        "is_sold_out": 0
+
+                    }
+
+                    try:
+                        self.pipeline.storge_data(product)
+                        logger.info("%s", json.dumps(product, ensure_ascii=False, default=str))
+                    except Exception as e:
+                        logger.exception("写入数据库失败: %s", e)
+                except:
+                    continue
+
+
+            logger.info(
+                "关键词 %s 第 %s 页爬取完成",
+                keyword,
+                page,
+            )
+
+            total_page = (
+                (json_data or {}).get("data", {}).get("mainInfo", {}).get("totalPage")
+            )
+
+            try:
+                total_page_int = int(total_page) if total_page is not None else 50
+            except (TypeError, ValueError):
+                total_page_int = 50
+
+            if page >= total_page_int:
+                break
+            if self.is_no_product > 20:
+                break
+            sleep_second = random.uniform(30, 60)
+            logger.info("第 %s 页爬取完成,休息 %.1fs", page, sleep_second)
+            time.sleep(sleep_second)
+
+
+    def update_cookie(self):
+        taobao_auto = TaobaoAutoCrawl(self.account_name, self.ip, self.product)
+        self.driver=taobao_auto.run()
+        if not self.driver :
+            return False
+
+        return True
+
+    def get_account(self):
+
+        sql_account = """
+                      SELECT * FROM `retrieve_collect_equipment_account` WHERE `id` = %s and `status` = 0
+                      """
+
+        account_list = self.db.select_data(sql_account,self.account_id)
+        if not account_list:
+            return False
+
+        account_dict = account_list[0]
+        self.ip = account_dict.get("ip")
+        cookie_str = account_dict.get("cookie_str")
+
+        self.ip = account_dict.get("ip")
+        self.account_name = account_dict.get("username")
+        self.login_username = account_dict.get("phone", "")
+        self.login_password = account_dict.get("password", "")
+        self.cookie_stamp = account_dict.get("update_time")
+        if self.ip:
+            account_proxy = f"http://{self.ip}"
+            self.proxies = {"http": account_proxy, "https": account_proxy}
+        else:
+            self.proxies = None
+
+        need_refresh = (
+                not cookie_str
+                or int(time.time()) - int(self.cookie_stamp or 0) > COOKIE_MAX_AGE_SEC
+        )
+        if 1:
+            if not self.update_cookie():
+                return False
+
+
+        logger.info("获取到账号: %s, ip: %s", self.account_name, self.ip)
+        return True
+
+    def run(self):
+        if not self.get_account():
+            logger.info("==================当前无账号可用==================")
+            self.success = False
+            return self.pipeline.crawl_count, self.success
+
+        logger.info("获取到账号:%s,代理ip:%s", self.account_name, self.ip)
+
+        self.get_search()
+        self.driver.quit()
+        logger.info(
+            "任务id:%s, 任务状态已更新, 产品名称:%s, 爬取数据:%s条",
+            self.task_id,
+            self.product,
+            self.pipeline.crawl_count,
+        )
+        return self.pipeline.crawl_count, self.success

+ 198 - 0
spiders/taobao/snapshot_taobao_login.py

@@ -0,0 +1,198 @@
+import time
+import json
+import random
+import signal
+import sys
+from DrissionPage import ChromiumPage, ChromiumOptions
+import re
+import socket
+from commons.conn_mysql import MySQLPoolOnline
+import hashlib
+from commons.Logger import logger
+
+MAX_PAGES = 5
+WAIT_BETWEEN_PAGES = (8, 15)  # 页间等待时间范围(秒)
+SCROLL_DELAY = (0.3, 0.8)  # 滚动延迟范围
+CLICK_DELAY = (0.5, 1.2)  # 点击延迟范围
+BROWSE_TIME = (5, 10)  # 浏览时间范围
+chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
+
+
+class TaobaoAutoCrawl:
+    def __init__(self, account_name, ip, key_word):
+        self.driver = None
+        self.register_signal_handler()
+        self.db = MySQLPoolOnline()
+        self.account_name = account_name
+        self.ip = ip
+        self.keyword = key_word
+
+    @staticmethod
+    def _get_free_port():
+        """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.bind(("127.0.0.1", 0))
+            return s.getsockname()[1]
+
+    def init_drissionpage(self):
+        # 避免 auto_port 在部分环境下生成异常地址(无端口)导致 ChromiumPage 初始化失败
+        co = ChromiumOptions().set_browser_path(chrome_path)
+        debug_port = self._get_free_port()
+        co.set_argument(f"--remote-debugging-port={debug_port}")
+        co.set_user_data_path(f"./spider/taobao/{self.account_name}")
+        if self.ip:
+            proxy = self.ip.strip()
+            if not proxy.startswith(("http://", "https://")):
+                proxy = f"http://{proxy}"
+            co.set_argument(f"--proxy-server={proxy}")
+        logger.info(f"启动浏览器: account={self.account_name}, debug_port={debug_port}")
+        self.driver = ChromiumPage(co)
+
+    def register_signal_handler(self):
+        """ 非常必要,注册信号处理,确保状态保存"""
+
+        def signal_handler(signum, frame):
+            print("\n⚠️ 收到退出信号,正在保存状态并退出...")
+            if self.driver:
+                self.driver.quit()
+            sys.exit(0)
+
+        signal.signal(signal.SIGINT, signal_handler)
+        # Windows 上可能不支持 SIGTERM,做兼容处理
+        if hasattr(signal, "SIGTERM"):
+            signal.signal(signal.SIGTERM, signal_handler)
+
+    # ==================== 人工行为模拟 ====================
+    def random_wait(self, min_sec, max_sec=None):
+        """随机等待"""
+        if max_sec is None:
+            max_sec = min_sec
+        time.sleep(random.uniform(min_sec, max_sec))
+
+    def move_mouse_to_element(self, element):
+        """移动鼠标到元素"""
+        if not element:
+            return
+        try:
+            # 优先使用 DrissionPage 推荐方式
+            self.driver.actions.move_to(element)
+        except Exception:
+            # 兼容旧逻辑:按元素中心点移动(不同版本 move 参数可能不同)
+            box = element.rect
+            try:
+                center_x = int(box.x + box.width / 2)
+                center_y = int(box.y + box.height / 2)
+            except Exception:
+                center_x = int(box["x"] + box["width"] / 2)
+                center_y = int(box["y"] + box["height"] / 2)
+            try:
+                self.driver.actions.move(center_x, center_y)
+            except TypeError:
+                # 某些版本仅支持关键字参数
+                self.driver.actions.move(offset_x=center_x, offset_y=center_y)
+        self.random_wait(0.2, 0.5)
+
+    def human_type(self, element, text):
+        """模拟人类输入"""
+        for char in text:
+            element.send_keys(char)
+            time.sleep(random.uniform(0.1, 0.3))
+
+    def login(self, username, password):
+        self.driver.get("https://login.taobao.com")
+        self.random_wait(5, 8)
+
+        # 输入账号
+        login_name = self.driver.ele("xpath=//input[@name='fm-login-id']", timeout=30)
+        if login_name:
+            self.move_mouse_to_element(login_name)
+            self.human_type(login_name, username)
+            self.random_wait(1, 3)
+
+        # 输入密码
+        login_pass = self.driver.ele("xpath=//input[@name='fm-login-password']", timeout=30)
+        if login_pass:
+            self.move_mouse_to_element(login_pass)
+            self.human_type(login_pass, password)
+            self.random_wait(1, 3)
+
+        # 点击登录
+        login_button = self.driver.ele("xpath=//button[text()='登录']", timeout=30)
+        if login_button:
+            self.move_mouse_to_element(login_button)
+            login_button.click()
+            self.random_wait(1, 3)
+
+        # 处理同意按钮
+        login_agree = self.driver.ele("xpath=//button[text()='同意']", timeout=5)
+        if login_agree:
+            self.move_mouse_to_element(login_agree)
+            login_agree.click()
+            self.random_wait(1, 3)
+
+        # 等待登录结果
+        self.random_wait(10, 20)
+        # 检查是否登录成功
+        user_info = self.driver.ele("xpath=//a[@class='site-nav-login-info-nick']", timeout=10)
+        if user_info:
+            print("登录成功!")
+        else:
+            print("登录失败,请检查账号密码或验证码")
+
+    def get_search(self):
+        url = "https://www.taobao.com"
+        self.driver.get(url, timeout=30)
+        time.sleep(30)
+        time.sleep(random.uniform(3, 8))
+
+        # 刷新一次,否则可能未找到登录状态
+        self.driver.refresh()
+        self.random_wait(5, 10)
+        # login_name = self.driver.ele("xpath=//input[@name='fm-login-id']")
+        # if login_name:
+        #     self.login("aqwwer","wewetrv")
+        #     self.driver.refresh()
+        #     self.random_wait(5, 10)
+
+        ele_iframe = self.driver.ele("xpath=//iframe[@id='baxia-dialog-content']")
+        if ele_iframe:
+            update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `nickname` = %s; """
+            self.db.execute(update_sql, (1, self.account_name))
+            return False
+
+        ele = self.driver.ele('xpath=//*[contains(@class,"site-nav-login-info-nick")]', timeout=30)
+
+        if ele:
+            cookies_list = self.driver.cookies()
+            cookies_dict = {c['name']: c['value'] for c in cookies_list}
+            timestamp = int(time.time())
+            # 保存 cookie 到文件
+            update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `update_time` = %s, `cookie_str`= %s,`status`= %s WHERE `username` = %s; """
+            self.db.execute(update_sql, (timestamp, json.dumps(cookies_dict), 0, self.account_name))
+            print(f"{self.account_name},获取 cookie 成功!")
+            logger.info(f"{self.account_name},获取 cookie 成功!")
+
+            self.random_wait(3, 5)
+            return True
+        else:
+            return False
+
+    def run(self):
+        bool_login = False
+        try:
+            self.init_drissionpage()
+            bool_login = self.get_search()
+        except Exception as e:
+            logger.exception(f"{self.account_name} 获取 cookie 异常: {e}")
+        finally:
+            if self.driver:
+                return self.driver
+        return bool(bool_login)
+
+
+if __name__ == '__main__':
+    account_name = "tb_account10"
+    ip = ""
+    keyword = "手机"
+    taobao_crawl = TaobaoAutoCrawl(account_name, ip, keyword)
+    taobao_crawl.run()