import random import signal import socket import sys import time from commons.conn_mysql import MySQLPoolOnline from DrissionPage import ChromiumPage, ChromiumOptions from commons.Logger import logger from oss_upload.oss_upload import AliyunOSSUploader from commons.config import YSB_ACCOUNT from spiders.yaoshibang.yidun_slider_captcha import solve_slider_captcha import json chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" class YaoShiBangLogin: def __init__(self, product=None): self.product = product self.driver = None self.account_name = None self.platform = 5 self.phone = None self.password = None self.db_online = MySQLPoolOnline() self.ossuploader = AliyunOSSUploader() self._register_signal_handler() def _register_signal_handler(self): def handler(signum, frame): logger.info("收到退出信号,正在关闭浏览器...") self._quit_browser() sys.exit(0) signal.signal(signal.SIGINT, handler) if hasattr(signal, "SIGTERM"): signal.signal(signal.SIGTERM, handler) def _quit_browser(self): if self.driver: try: self.driver.quit() except Exception: pass self.driver = None @staticmethod def _get_free_port(): """获取一个当前可用的本地端口,供 Chrome 调试使用。""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] def init_browser(self): co = ChromiumOptions().set_browser_path(chrome_path) debug_port = self._get_free_port() co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}") co.set_local_port(debug_port) co.set_argument(f"--remote-debugging-port={debug_port}") co.set_argument("--remote-debugging-address=127.0.0.1") # co.set_argument("--disable-blink-features=AutomationControlled") co.set_argument("--disable-dev-shm-usage") co.set_argument("--start-maximized") co.set_argument("--no-first-run") # 避免首次运行弹窗 co.set_argument("--no-default-browser-check") # 避免默认浏览器检查 self.driver = ChromiumPage(co) def _is_logged_in(self): # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项 title = self.driver.ele( "xpath=//span[@class='logout']", timeout=8, ) return bool(title) def login(self, account_dict): logger.info("开始登录药师帮") self.driver.get("https://dian.ysbang.cn/#/login", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5) if not input_name: logger.error("未找到账号输入框") return False input_name.input(account_dict["phone"]) time.sleep(random.uniform(1.5, 2.5)) input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5) if not input_pass: logger.error("未找到密码输入框") return False input_pass.input(account_dict["password"]) time.sleep(random.uniform(1.5, 2.5)) login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5) if not login_btn: logger.error("未找到登录按钮") return False login_btn.click() time.sleep(3) for i in range(3): solve_slider_captcha(self.driver) time.sleep(3) if self._is_logged_in(): time.sleep(3) logger.info("登录成功") cookies_list = self.driver.cookies() self.save_cookies(cookies_list) return True logger.error("登录后未检测到目标店铺名,登录可能失败") return False def save_cookies(self, cookies_list): cookies_dict = {c['name']: c['value'] for c in cookies_list} timestamp = int(time.time()) next_update_time = timestamp + random.randint(3600, 7200) # 保存 cookie 到文件 update_sql = f""" UPDATE `accounts_platform` SET `cookie_timestamp` = %s, `cookie_str`= %s,`cookie_update_time` = %s, `status`= %s WHERE `name` = %s; """ self.db_online.execute(update_sql, (timestamp, json.dumps(cookies_dict), next_update_time, 1, self.account_name)) logger.info("cookie已保存成功") def search(self, account_dict): self.driver.get("https://dian.ysbang.cn/#/home", timeout=15) self.driver.wait.doc_loaded(timeout=10) time.sleep(2) if not self._is_logged_in(): if not self.login(account_dict): return False else: cookies_list = self.driver.cookies() self.save_cookies(cookies_list) def get_account(self): sql_account = f""" select `id`,`name`,`ip`,`phone`,`password`,`cookie_timestamp`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=3 order by `cookie_timestamp` asc limit 1 """ account_list = self.db_online.select_data(sql_account) if not account_list: return {} account_dict = account_list[0] self.account_name = account_dict["name"] self.phone = account_dict["phone"] self.password = account_dict["password"] return account_dict def run(self): account_dict = self.get_account() if not account_dict: logger.error("ysb无账号可用") print(account_dict) try: self.init_browser() self.search(account_dict) except Exception as e: logger.exception("运行异常: %s", e) finally: self._quit_browser() if __name__ == "__main__": YaoShiBangLogin().run()