conn_mysql.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import logging
  2. import os
  3. import pymysql
  4. from dbutils.pooled_db import PooledDB
  5. from pymysql.cursors import DictCursor
  6. logger = logging.getLogger(__name__)
  7. # 连接池公共参数(可自行调大 maxconnections 等)
  8. _POOL_COMMON = {
  9. "maxconnections": 10,
  10. "mincached": 2,
  11. "maxcached": 5,
  12. "blocking": True,
  13. # 取出连接前 ping,避免服务端超时断开后仍使用死连接(pymysql 支持)
  14. "ping": 1,
  15. }
  16. def _env_port(name, default="3306"):
  17. """环境变量端口;空串/非数字时回退默认值,避免 int() 抛错导致池子无法创建。"""
  18. raw = os.environ.get(name)
  19. if raw is None or not str(raw).strip():
  20. return int(default)
  21. try:
  22. return int(raw)
  23. except ValueError:
  24. logger.warning("环境变量 %s=%r 不是合法端口,使用默认 %s", name, raw, default)
  25. return int(default)
  26. def _create_pool(
  27. host,
  28. port,
  29. user,
  30. password,
  31. database,
  32. ):
  33. if not host or not str(host).strip():
  34. raise ValueError("MySQL host 不能为空")
  35. if not database or not str(database).strip():
  36. raise ValueError("MySQL database 不能为空")
  37. return PooledDB(
  38. creator=pymysql,
  39. cursorclass=DictCursor,
  40. charset="utf8mb4",
  41. host=host,
  42. port=int(port),
  43. user=user,
  44. password=password,
  45. database=database,
  46. **_POOL_COMMON,
  47. )
  48. class _MySQLPoolBase:
  49. """共用的取连接、写库、读库逻辑,避免两套类复制粘贴。"""
  50. def __init__(self, pool):
  51. self.pool = pool
  52. def get_conn(self):
  53. return self.pool.connection()
  54. def _close(self, conn, cursor) -> None:
  55. try:
  56. if cursor:
  57. cursor.close()
  58. finally:
  59. if conn:
  60. conn.close()
  61. def _write(self, fn, *args, **kwargs):
  62. conn = self.get_conn()
  63. cursor = conn.cursor()
  64. try:
  65. result = fn(cursor, *args, **kwargs)
  66. conn.commit()
  67. return result
  68. except Exception as e:
  69. logger.error("SQL 执行错误: %s", e)
  70. conn.rollback()
  71. return None
  72. finally:
  73. self._close(conn, cursor)
  74. def execute_many(self, sql, data):
  75. rows = list(data)
  76. if not rows:
  77. return
  78. def _do(c, s, d):
  79. c.executemany(s, d)
  80. self._write(_do, sql, rows)
  81. def execute_one(self, sql, data):
  82. def _do(c, s, d):
  83. c.execute(s, d)
  84. self._write(_do, sql, data)
  85. def update_data(self, sql, data):
  86. self.execute_one(sql, data)
  87. def select_data(self, sql,data=None):
  88. """只读查询,不应 commit。"""
  89. conn = self.get_conn()
  90. cursor = conn.cursor()
  91. try:
  92. if data:
  93. cursor.execute(sql,data)
  94. return cursor.fetchall()
  95. else:
  96. cursor.execute(sql)
  97. return cursor.fetchall()
  98. except Exception as e:
  99. logger.error("SQL 查询错误: %s", e)
  100. return []
  101. finally:
  102. self._close(conn, cursor)
  103. def execute(self, sql, data=None):
  104. """通用写操作,返回受影响行数;失败返回 0。"""
  105. def _do(c, s, d):
  106. if d is not None:
  107. return c.execute(s, d)
  108. return c.execute(s)
  109. result = self._write(_do, sql, data)
  110. return result if isinstance(result, int) else 0
  111. class MySQLPool(_MySQLPoolBase):
  112. """test2 库(可通过环境变量覆盖,便于部署;未设置则沿用原默认)。"""
  113. def __init__(self):
  114. pool = _create_pool(
  115. host=os.environ.get("MYSQL_TEST2_HOST", "47.119.164.65"),
  116. port=_env_port("MYSQL_TEST2_PORT", "3306"),
  117. user=os.environ.get("MYSQL_TEST2_USER", "test_c"),
  118. password=os.environ.get("MYSQL_TEST2_PASSWORD", "Dfwy@2025"),
  119. database=os.environ.get("MYSQL_TEST2_DB", "test2"),
  120. )
  121. super().__init__(pool)
  122. class MySQLPool39(_MySQLPoolBase):
  123. """drug_retrieve 库。"""
  124. def __init__(self):
  125. pool = _create_pool(
  126. host=os.environ.get("MYSQL_DRUG_HOST", "39.108.116.125"),
  127. port=_env_port("MYSQL_DRUG_PORT", "3306"),
  128. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  129. password=os.environ.get("MYSQL_DRUG_PASSWORD", "Pem287cwM58jNpe2"),
  130. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  131. )
  132. super().__init__(pool)
  133. class MySQLPoolOnline(_MySQLPoolBase):
  134. """drug_retrieve 库。"""
  135. def __init__(self):
  136. pool = _create_pool(
  137. host=os.environ.get("MYSQL_DRUG_HOST", "120.24.49.2"),
  138. port=_env_port("MYSQL_DRUG_PORT", "3306"),
  139. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  140. password=os.environ.get("MYSQL_DRUG_PASSWORD", "ksCt3xm6chzdkafj"),
  141. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  142. )
  143. super().__init__(pool)
  144. if __name__ == "__main__":
  145. pool = MySQLPool()
  146. rows = pool.select_data("SELECT 1 AS one")
  147. print(rows)