| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import random
- import signal
- import socket
- import sys
- import time
- from commons.conn_mysql import MySQLPoolOnline
- from DrissionPage import ChromiumPage, ChromiumOptions
- from commons.Logger import logger
- from oss_upload.oss_upload import AliyunOSSUploader
- from commons.config import YSB_ACCOUNT
- from spiders.yaoshibang.yidun_slider_captcha import solve_slider_captcha
- import json
- chrome_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
- class YaoShiBangLogin:
- def __init__(self, product=None):
- self.product = product
- self.driver = None
- self.account_name = None
- self.platform = 5
- self.phone = None
- self.password = None
- self.db_online = MySQLPoolOnline()
- self.ossuploader = AliyunOSSUploader()
- self._register_signal_handler()
- def _register_signal_handler(self):
- def handler(signum, frame):
- logger.info("收到退出信号,正在关闭浏览器...")
- self._quit_browser()
- sys.exit(0)
- signal.signal(signal.SIGINT, handler)
- if hasattr(signal, "SIGTERM"):
- signal.signal(signal.SIGTERM, handler)
- def _quit_browser(self):
- if self.driver:
- try:
- self.driver.quit()
- except Exception:
- pass
- self.driver = None
- @staticmethod
- def _get_free_port():
- """获取一个当前可用的本地端口,供 Chrome 调试使用。"""
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(("127.0.0.1", 0))
- return s.getsockname()[1]
- def init_browser(self):
- co = ChromiumOptions().set_browser_path(chrome_path)
- debug_port = self._get_free_port()
- co.set_user_data_path(f"./spiders/yaoshibang/{self.account_name}")
- co.set_local_port(debug_port)
- co.set_argument(f"--remote-debugging-port={debug_port}")
- co.set_argument("--remote-debugging-address=127.0.0.1")
- # co.set_argument("--disable-blink-features=AutomationControlled")
- co.set_argument("--disable-dev-shm-usage")
- co.set_argument("--start-maximized")
- co.set_argument("--no-first-run") # 避免首次运行弹窗
- co.set_argument("--no-default-browser-check") # 避免默认浏览器检查
- self.driver = ChromiumPage(co)
- def _is_logged_in(self):
- # 与当前账号店铺展示文案一致;换店后需同步修改或改为配置项
- title = self.driver.ele(
- "xpath=//span[@class='logout']",
- timeout=8,
- )
- return bool(title)
- def login(self, account_dict):
- logger.info("开始登录药师帮")
- self.driver.get("https://dian.ysbang.cn/#/login", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- input_name = self.driver.ele("xpath://input[@name='userAccount']", timeout=5)
- if not input_name:
- logger.error("未找到账号输入框")
- return False
- input_name.input(account_dict["phone"])
- time.sleep(random.uniform(1.5, 2.5))
- input_pass = self.driver.ele("xpath://input[@name='password']", timeout=5)
- if not input_pass:
- logger.error("未找到密码输入框")
- return False
- input_pass.input(account_dict["password"])
- time.sleep(random.uniform(1.5, 2.5))
- login_btn = self.driver.ele("xpath://button[text()='登录']", timeout=5)
- if not login_btn:
- logger.error("未找到登录按钮")
- return False
- login_btn.click()
- time.sleep(3)
- for i in range(3):
- solve_slider_captcha(self.driver)
- time.sleep(3)
- if self._is_logged_in():
- time.sleep(3)
- logger.info("登录成功")
- cookies_list = self.driver.cookies()
- self.save_cookies(cookies_list)
- return True
- logger.error("登录后未检测到目标店铺名,登录可能失败")
- return False
- def save_cookies(self, cookies_list):
- cookies_dict = {c['name']: c['value'] for c in cookies_list}
- timestamp = int(time.time())
- next_update_time = timestamp + random.randint(3600, 7200)
- # 保存 cookie 到文件
- update_sql = f""" UPDATE `accounts_platform` SET `cookie_timestamp` = %s, `cookie_str`= %s,`cookie_update_time` = %s, `status`= %s WHERE `name` = %s; """
- self.db_online.execute(update_sql,
- (timestamp, json.dumps(cookies_dict), next_update_time, 1, self.account_name))
- logger.info("cookie已保存成功")
- def search(self, account_dict):
- self.driver.get("https://dian.ysbang.cn/#/home", timeout=15)
- self.driver.wait.doc_loaded(timeout=10)
- time.sleep(2)
- if not self._is_logged_in():
- if not self.login(account_dict):
- return False
- else:
- cookies_list = self.driver.cookies()
- self.save_cookies(cookies_list)
- def get_account(self):
- sql_account = f""" select `id`,`name`,`ip`,`phone`,`password`,`cookie_timestamp`,`cookie_str` from `accounts_platform` where `platform`=5 and `status`=1 and `equipment_id`=3 order by `cookie_timestamp` asc limit 1 """
- account_list = self.db_online.select_data(sql_account)
- if not account_list:
- return {}
- account_dict = account_list[0]
- self.account_name = account_dict["name"]
- self.phone = account_dict["phone"]
- self.password = account_dict["password"]
- return account_dict
- def run(self):
- account_dict = self.get_account()
- if not account_dict:
- logger.error("ysb无账号可用")
- print(account_dict)
- try:
- self.init_browser()
- self.search(account_dict)
- except Exception as e:
- logger.exception("运行异常: %s", e)
- finally:
- self._quit_browser()
- if __name__ == "__main__":
- YaoShiBangLogin().run()
|