import logging import os import pymysql from dbutils.pooled_db import PooledDB from pymysql.cursors import DictCursor logger = logging.getLogger(__name__) # 连接池公共参数(可自行调大 maxconnections 等) _POOL_COMMON = { "maxconnections": 10, "mincached": 2, "maxcached": 5, "blocking": True, # 取出连接前 ping,避免服务端超时断开后仍使用死连接(pymysql 支持) "ping": 1, } def _env_port(name, default="3306"): """环境变量端口;空串/非数字时回退默认值,避免 int() 抛错导致池子无法创建。""" raw = os.environ.get(name) if raw is None or not str(raw).strip(): return int(default) try: return int(raw) except ValueError: logger.warning("环境变量 %s=%r 不是合法端口,使用默认 %s", name, raw, default) return int(default) def _create_pool( host, port, user, password, database, ): if not host or not str(host).strip(): raise ValueError("MySQL host 不能为空") if not database or not str(database).strip(): raise ValueError("MySQL database 不能为空") return PooledDB( creator=pymysql, cursorclass=DictCursor, charset="utf8mb4", host=host, port=int(port), user=user, password=password, database=database, **_POOL_COMMON, ) class _MySQLPoolBase: """共用的取连接、写库、读库逻辑,避免两套类复制粘贴。""" def __init__(self, pool): self.pool = pool def get_conn(self): return self.pool.connection() def _close(self, conn, cursor) -> None: try: if cursor: cursor.close() finally: if conn: conn.close() def _write(self, fn, *args, **kwargs): conn = self.get_conn() cursor = conn.cursor() try: result = fn(cursor, *args, **kwargs) conn.commit() return result except Exception as e: logger.error("SQL 执行错误: %s", e) conn.rollback() return None finally: self._close(conn, cursor) def execute_many(self, sql, data): rows = list(data) if not rows: return def _do(c, s, d): c.executemany(s, d) self._write(_do, sql, rows) def execute_one(self, sql, data): def _do(c, s, d): c.execute(s, d) self._write(_do, sql, data) def update_data(self, sql, data): self.execute_one(sql, data) def select_data(self, sql,data=None): """只读查询,不应 commit。""" conn = self.get_conn() cursor = conn.cursor() try: if data: cursor.execute(sql,data) return cursor.fetchall() else: cursor.execute(sql) return cursor.fetchall() except Exception as e: logger.error("SQL 查询错误: %s", e) return [] finally: self._close(conn, cursor) def execute(self, sql, data=None): """通用写操作,返回受影响行数;失败返回 0。""" def _do(c, s, d): if d is not None: return c.execute(s, d) return c.execute(s) result = self._write(_do, sql, data) return result if isinstance(result, int) else 0 class MySQLPool(_MySQLPoolBase): """test2 库(可通过环境变量覆盖,便于部署;未设置则沿用原默认)。""" def __init__(self): pool = _create_pool( host=os.environ.get("MYSQL_TEST2_HOST", "47.119.164.65"), port=_env_port("MYSQL_TEST2_PORT", "3306"), user=os.environ.get("MYSQL_TEST2_USER", "test_c"), password=os.environ.get("MYSQL_TEST2_PASSWORD", "Dfwy@2025"), database=os.environ.get("MYSQL_TEST2_DB", "test2"), ) super().__init__(pool) class MySQLPool39(_MySQLPoolBase): """drug_retrieve 库。""" def __init__(self): pool = _create_pool( host=os.environ.get("MYSQL_DRUG_HOST", "39.108.116.125"), port=_env_port("MYSQL_DRUG_PORT", "3306"), user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"), password=os.environ.get("MYSQL_DRUG_PASSWORD", "Pem287cwM58jNpe2"), database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"), ) super().__init__(pool) class MySQLPoolOnline(_MySQLPoolBase): """drug_retrieve 库。""" def __init__(self): pool = _create_pool( host=os.environ.get("MYSQL_DRUG_HOST", "120.24.49.2"), port=_env_port("MYSQL_DRUG_PORT", "3306"), user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"), password=os.environ.get("MYSQL_DRUG_PASSWORD", "ksCt3xm6chzdkafj"), database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"), ) super().__init__(pool) if __name__ == "__main__": pool = MySQLPool() rows = pool.select_data("SELECT 1 AS one") print(rows)