import logging import os from typing import Any, Callable, Iterable, List, Optional, TypeVar import pymysql from dbutils.pooled_db import PooledDB from pymysql.cursors import DictCursor logger = logging.getLogger(__name__) T = TypeVar("T") # 连接池公共参数(可自行调大 maxconnections 等) _POOL_COMMON = { "maxconnections": 10, "mincached": 2, "maxcached": 5, "blocking": True, } def _create_pool( host: str, port: int, user: str, password: str, database: str, ) -> PooledDB: return PooledDB( creator=pymysql, cursorclass=DictCursor, charset="utf8mb4", host=host, port=port, user=user, password=password, database=database, **_POOL_COMMON, ) class _MySQLPoolBase: """共用的取连接、写库、读库逻辑,避免两套类复制粘贴。""" def __init__(self, pool: PooledDB) -> None: self.pool = pool def get_conn(self): return self.pool.connection() def _close(self, conn, cursor) -> None: try: if cursor: cursor.close() finally: if conn: conn.close() def _write(self, fn: Callable[..., T], *args, **kwargs) -> Optional[T]: conn = self.get_conn() cursor = conn.cursor() try: result = fn(cursor, *args, **kwargs) conn.commit() return result except Exception as e: logger.error("SQL 执行错误: %s", e) conn.rollback() return None finally: self._close(conn, cursor) def execute_many(self, sql: str, data: Iterable) -> None: rows = list(data) if not rows: return def _do(c, s, d): c.executemany(s, d) self._write(_do, sql, rows) def execute_one(self, sql: str, data) -> None: def _do(c, s, d): c.execute(s, d) self._write(_do, sql, data) def update_data(self, sql: str, data) -> None: self.execute_one(sql, data) def select_data(self, sql: str) -> List[Any]: """只读查询,不应 commit。""" conn = self.get_conn() cursor = conn.cursor() try: cursor.execute(sql) return cursor.fetchall() except Exception as e: logger.error("SQL 查询错误: %s", e) return [] finally: self._close(conn, cursor) def execute(self, sql: str, data: Any = None) -> int: """通用写操作,返回受影响行数;失败返回 0。""" def _do(c, s, d): if d is not None: return c.execute(s, d) return c.execute(s) result = self._write(_do, sql, data) return result if isinstance(result, int) else 0 class MySQLPool(_MySQLPoolBase): """test2 库(可通过环境变量覆盖,便于部署;未设置则沿用原默认)。""" def __init__(self) -> None: pool = _create_pool( host=os.environ.get("MYSQL_TEST2_HOST", "47.119.164.65"), port=int(os.environ.get("MYSQL_TEST2_PORT", "3306")), user=os.environ.get("MYSQL_TEST2_USER", "test_c"), password=os.environ.get("MYSQL_TEST2_PASSWORD", "Dfwy@2025"), database=os.environ.get("MYSQL_TEST2_DB", "test2"), ) super().__init__(pool) class MySQLPool39(_MySQLPoolBase): """drug_retrieve 库。""" def __init__(self) -> None: pool = _create_pool( host=os.environ.get("MYSQL_DRUG_HOST", "39.108.116.125"), port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")), user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"), password=os.environ.get("MYSQL_DRUG_PASSWORD", "Pem287cwM58jNpe2"), database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"), ) super().__init__(pool) class MySQLPoolOnline(_MySQLPoolBase): """drug_retrieve 库。""" def __init__(self) -> None: pool = _create_pool( host=os.environ.get("MYSQL_DRUG_HOST", "120.24.49.2"), port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")), user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"), password=os.environ.get("MYSQL_DRUG_PASSWORD", "ksCt3xm6chzdkafj"), database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"), ) super().__init__(pool) if __name__ == "__main__": pool = MySQLPool() rows = pool.select_data("SELECT 1 AS one") print(rows)