import time import json import random import signal import sys from DrissionPage import ChromiumPage, ChromiumOptions import re import socket from commons.conn_mysql import MySQLPoolOnline import hashlib from commons.Logger import logger MAX_PAGES = 5 WAIT_BETWEEN_PAGES = (8, 15) # 页间等待时间范围(秒) SCROLL_DELAY = (0.3, 0.8) # 滚动延迟范围 CLICK_DELAY = (0.5, 1.2) # 点击延迟范围 BROWSE_TIME = (5, 10) # 浏览时间范围 chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class TaobaoAutoCrawl: def __init__(self, account_name, ip, key_word): self.driver = None self.register_signal_handler() self.db = MySQLPoolOnline() self.account_name = account_name self.ip = ip self.keyword = key_word @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_drissionpage(self): # 避免 auto_port 在部分环境下生成异常地址(无端口)导致 ChromiumPage 初始化失败 co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_argument(f"--remote-debugging-port={debug_port}") co.set_user_data_path(f"./spider/taobao/{self.account_name}") if self.ip: proxy = self.ip.strip() if not proxy.startswith(("http://", "https://")): proxy = f"http://{proxy}" co.set_argument(f"--proxy-server={proxy}") logger.info(f"启动浏览器: account={self.account_name}, debug_port={debug_port}") self.driver = ChromiumPage(co) def register_signal_handler(self): """ 非常必要,注册信号处理,确保状态保存""" def signal_handler(signum, frame): print("\n⚠️ 收到退出信号,正在保存状态并退出...") if self.driver: self.driver.quit() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Windows 上可能不支持 SIGTERM,做兼容处理 if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, signal_handler) # ==================== 人工行为模拟 ==================== def random_wait(self, min_sec, max_sec=None): """随机等待""" if max_sec is None: max_sec = min_sec time.sleep(random.uniform(min_sec, max_sec)) def move_mouse_to_element(self, element): """移动鼠标到元素""" if not element: return try: # 优先使用 DrissionPage 推荐方式 self.driver.actions.move_to(element) except Exception: # 兼容旧逻辑:按元素中心点移动(不同版本 move 参数可能不同) box = element.rect try: center_x = int(box.x + box.width / 2) center_y = int(box.y + box.height / 2) except Exception: center_x = int(box["x"] + box["width"] / 2) center_y = int(box["y"] + box["height"] / 2) try: self.driver.actions.move(center_x, center_y) except TypeError: # 某些版本仅支持关键字参数 self.driver.actions.move(offset_x=center_x, offset_y=center_y) self.random_wait(0.2, 0.5) def human_type(self, element, text): """模拟人类输入""" for char in text: element.send_keys(char) time.sleep(random.uniform(0.1, 0.3)) def login(self, username, password): self.driver.get("https://login.taobao.com") self.random_wait(5, 8) # 输入账号 login_name = self.driver.ele("xpath=//input[@name='fm-login-id']", timeout=30) if login_name: self.move_mouse_to_element(login_name) self.human_type(login_name, username) self.random_wait(1, 3) # 输入密码 login_pass = self.driver.ele("xpath=//input[@name='fm-login-password']", timeout=30) if login_pass: self.move_mouse_to_element(login_pass) self.human_type(login_pass, password) self.random_wait(1, 3) # 点击登录 login_button = self.driver.ele("xpath=//button[text()='登录']", timeout=30) if login_button: self.move_mouse_to_element(login_button) login_button.click() self.random_wait(1, 3) # 处理同意按钮 login_agree = self.driver.ele("xpath=//button[text()='同意']", timeout=5) if login_agree: self.move_mouse_to_element(login_agree) login_agree.click() self.random_wait(1, 3) # 等待登录结果 self.random_wait(10, 20) # 检查是否登录成功 user_info = self.driver.ele("xpath=//a[@class='site-nav-login-info-nick']", timeout=10) if user_info: print("登录成功!") else: print("登录失败,请检查账号密码或验证码") def get_search(self): url = "https://www.taobao.com" self.driver.get(url, timeout=30) time.sleep(30) time.sleep(random.uniform(3, 8)) # 刷新一次,否则可能未找到登录状态 self.driver.refresh() self.random_wait(5, 10) # login_name = self.driver.ele("xpath=//input[@name='fm-login-id']") # if login_name: # self.login("aqwwer","wewetrv") # self.driver.refresh() # self.random_wait(5, 10) ele_iframe = self.driver.ele("xpath=//iframe[@id='baxia-dialog-content']") if ele_iframe: update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `status`= %s WHERE `nickname` = %s; """ self.db.execute(update_sql, (1, self.account_name)) return False ele = self.driver.ele('xpath=//*[contains(@class,"site-nav-login-info-nick")]', timeout=30) if ele: cookies_list = self.driver.cookies() cookies_dict = {c['name']: c['value'] for c in cookies_list} timestamp = int(time.time()) # 保存 cookie 到文件 update_sql = f""" UPDATE `retrieve_collect_equipment_account` SET `update_time` = %s, `cookie_str`= %s,`status`= %s WHERE `username` = %s; """ self.db.execute(update_sql, (timestamp, json.dumps(cookies_dict), 0, self.account_name)) print(f"{self.account_name},获取 cookie 成功!") logger.info(f"{self.account_name},获取 cookie 成功!") self.random_wait(3, 5) return True else: return False def run(self): bool_login = False try: self.init_drissionpage() bool_login = self.get_search() except Exception as e: logger.exception(f"{self.account_name} 获取 cookie 异常: {e}") finally: if self.driver: self.driver.quit() return bool(bool_login)