| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import time
- import json
- import random
- import signal
- import sys
- from DrissionPage import ChromiumPage, ChromiumOptions
- import re
- import socket
- from commons.conn_mysql import MySQLPool
- import hashlib
- from commons.Logger import logger
- MAX_PAGES = 5
- WAIT_BETWEEN_PAGES = (8, 15) # 页间等待时间范围(秒)
- SCROLL_DELAY = (0.3, 0.8) # 滚动延迟范围
- CLICK_DELAY = (0.5, 1.2) # 点击延迟范围
- BROWSE_TIME = (5, 10) # 浏览时间范围
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class TaobaoAutoCrawl:
- def __init__(self, account_name, ip, key_word):
- self.driver = None
- self.register_signal_handler()
- self.db = MySQLPool()
- self.account_name = account_name
- self.ip = ip
- self.keyword = key_word
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_drissionpage(self):
- # 避免 auto_port 在部分环境下生成异常地址(无端口)导致 ChromiumPage 初始化失败
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_user_data_path(f"./spider/taobao/{self.account_name}")
- if self.ip:
- proxy = self.ip.strip()
- if not proxy.startswith(("http://", "https://")):
- proxy = f"http://{proxy}"
- co.set_argument(f"--proxy-server={proxy}")
- logger.info(f"启动浏览器: account={self.account_name}, debug_port={debug_port}")
- self.driver = ChromiumPage(co)
- def register_signal_handler(self):
- """ 非常必要,注册信号处理,确保状态保存"""
- def signal_handler(signum, frame):
- print("\n⚠️ 收到退出信号,正在保存状态并退出...")
- if self.driver:
- self.driver.quit()
- sys.exit(0)
- signal.signal(signal.SIGINT, signal_handler)
- # Windows 上可能不支持 SIGTERM,做兼容处理
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, signal_handler)
- # ==================== 人工行为模拟 ====================
- def random_wait(self, min_sec, max_sec=None):
- """随机等待"""
- if max_sec is None:
- max_sec = min_sec
- time.sleep(random.uniform(min_sec, max_sec))
- def move_mouse_to_element(self, element):
- """移动鼠标到元素"""
- if not element:
- return
- try:
- # 优先使用 DrissionPage 推荐方式
- self.driver.actions.move_to(element)
- except Exception:
- # 兼容旧逻辑:按元素中心点移动(不同版本 move 参数可能不同)
- box = element.rect
- try:
- center_x = int(box.x + box.width / 2)
- center_y = int(box.y + box.height / 2)
- except Exception:
- center_x = int(box["x"] + box["width"] / 2)
- center_y = int(box["y"] + box["height"] / 2)
- try:
- self.driver.actions.move(center_x, center_y)
- except TypeError:
- # 某些版本仅支持关键字参数
- self.driver.actions.move(offset_x=center_x, offset_y=center_y)
- self.random_wait(0.2, 0.5)
- def human_type(self, element, text):
- """模拟人类输入"""
- for char in text:
- element.send_keys(char)
- time.sleep(random.uniform(0.1, 0.3))
- def login(self, username, password):
- self.driver.get("https://login.taobao.com")
- self.random_wait(5, 8)
- # 输入账号
- login_name = self.driver.ele("xpath=//input[@name='fm-login-id']", timeout=30)
- if login_name:
- self.move_mouse_to_element(login_name)
- self.human_type(login_name, username)
- self.random_wait(1, 3)
- # 输入密码
- login_pass = self.driver.ele("xpath=//input[@name='fm-login-password']", timeout=30)
- if login_pass:
- self.move_mouse_to_element(login_pass)
- self.human_type(login_pass, password)
- self.random_wait(1, 3)
- # 点击登录
- login_button = self.driver.ele("xpath=//button[text()='登录']", timeout=30)
- if login_button:
- self.move_mouse_to_element(login_button)
- login_button.click()
- self.random_wait(1, 3)
- # 处理同意按钮
- login_agree = self.driver.ele("xpath=//button[text()='同意']", timeout=5)
- if login_agree:
- self.move_mouse_to_element(login_agree)
- login_agree.click()
- self.random_wait(1, 3)
- # 等待登录结果
- self.random_wait(10, 20)
- # 检查是否登录成功
- user_info = self.driver.ele("xpath=//a[@class='site-nav-login-info-nick']", timeout=10)
- if user_info:
- print("登录成功!")
- else:
- print("登录失败,请检查账号密码或验证码")
- def get_search(self):
- url = "https://www.taobao.com"
- self.driver.get(url, timeout=30)
- time.sleep(30)
- time.sleep(random.uniform(3, 8))
- # 刷新一次,否则可能未找到登录状态
- self.driver.refresh()
- self.random_wait(5, 10)
- # login_name = self.driver.ele("xpath=//input[@name='fm-login-id']")
- # if login_name:
- # self.login("aqwwer","wewetrv")
- # self.driver.refresh()
- # self.random_wait(5, 10)
- ele_iframe = self.driver.ele("xpath=//iframe[@id='baxia-dialog-content']")
- if ele_iframe:
- update_sql = f""" UPDATE `accounts_platform` SET `status`= %s WHERE `nickname` = %s; """
- self.db.execute(update_sql, (0, self.account_name))
- return False
- ele = self.driver.ele('xpath=//*[contains(@class,"site-nav-login-info-nick")]', timeout=30)
- if ele:
- cookies_list = self.driver.cookies()
- cookies_dict = {c['name']: c['value'] for c in cookies_list}
- timestamp = int(time.time())
- next_update_time = timestamp + random.randint(3600, 7200)
- # 保存 cookie 到文件
- update_sql = f""" UPDATE `accounts_platform` SET `cookie_timestamp` = %s, `cookie_str`= %s,`cookie_update_time` = %s, `status`= %s WHERE `nickname` = %s; """
- self.db.execute(update_sql, (timestamp, json.dumps(cookies_dict), next_update_time, 1, self.account_name))
- print(f"{self.account_name},获取 cookie 成功!")
- logger.info(f"{self.account_name},获取 cookie 成功!")
- self.random_wait(3, 5)
- return True
- else:
- return False
- def run(self):
- bool_login = False
- try:
- self.init_drissionpage()
- bool_login = self.get_search()
- except Exception as e:
- logger.exception(f"{self.account_name} 获取 cookie 异常: {e}")
- finally:
- if self.driver:
- self.driver.quit()
- return bool(bool_login)
- if __name__ == '__main__':
- account_name = "tb_account10"
- ip = ""
- keyword = "手机"
- taobao_crawl = TaobaoAutoCrawl(account_name, ip, keyword)
- taobao_crawl.run()
|