conn_mysql.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import logging
  2. import os
  3. from typing import Any, Callable, Iterable, List, Optional, TypeVar
  4. import pymysql
  5. from dbutils.pooled_db import PooledDB
  6. from pymysql.cursors import DictCursor
  7. logger = logging.getLogger(__name__)
  8. T = TypeVar("T")
  9. # 连接池公共参数(可自行调大 maxconnections 等)
  10. _POOL_COMMON = {
  11. "maxconnections": 10,
  12. "mincached": 2,
  13. "maxcached": 5,
  14. "blocking": True,
  15. }
  16. def _create_pool(
  17. host: str,
  18. port: int,
  19. user: str,
  20. password: str,
  21. database: str,
  22. ) -> PooledDB:
  23. return PooledDB(
  24. creator=pymysql,
  25. cursorclass=DictCursor,
  26. charset="utf8mb4",
  27. host=host,
  28. port=port,
  29. user=user,
  30. password=password,
  31. database=database,
  32. **_POOL_COMMON,
  33. )
  34. class _MySQLPoolBase:
  35. """共用的取连接、写库、读库逻辑,避免两套类复制粘贴。"""
  36. def __init__(self, pool: PooledDB) -> None:
  37. self.pool = pool
  38. def get_conn(self):
  39. return self.pool.connection()
  40. def _close(self, conn, cursor) -> None:
  41. try:
  42. if cursor:
  43. cursor.close()
  44. finally:
  45. if conn:
  46. conn.close()
  47. def _write(self, fn: Callable[..., T], *args, **kwargs) -> Optional[T]:
  48. conn = self.get_conn()
  49. cursor = conn.cursor()
  50. try:
  51. result = fn(cursor, *args, **kwargs)
  52. conn.commit()
  53. return result
  54. except Exception as e:
  55. logger.error("SQL 执行错误: %s", e)
  56. conn.rollback()
  57. return None
  58. finally:
  59. self._close(conn, cursor)
  60. def execute_many(self, sql: str, data: Iterable) -> None:
  61. rows = list(data)
  62. if not rows:
  63. return
  64. def _do(c, s, d):
  65. c.executemany(s, d)
  66. self._write(_do, sql, rows)
  67. def execute_one(self, sql: str, data) -> None:
  68. def _do(c, s, d):
  69. c.execute(s, d)
  70. self._write(_do, sql, data)
  71. def update_data(self, sql: str, data) -> None:
  72. self.execute_one(sql, data)
  73. def select_data(self, sql: str) -> List[Any]:
  74. """只读查询,不应 commit。"""
  75. conn = self.get_conn()
  76. cursor = conn.cursor()
  77. try:
  78. cursor.execute(sql)
  79. return cursor.fetchall()
  80. except Exception as e:
  81. logger.error("SQL 查询错误: %s", e)
  82. return []
  83. finally:
  84. self._close(conn, cursor)
  85. def execute(self, sql: str, data: Any = None) -> int:
  86. """通用写操作,返回受影响行数;失败返回 0。"""
  87. def _do(c, s, d):
  88. if d is not None:
  89. return c.execute(s, d)
  90. return c.execute(s)
  91. result = self._write(_do, sql, data)
  92. return result if isinstance(result, int) else 0
  93. class MySQLPool(_MySQLPoolBase):
  94. """test2 库(可通过环境变量覆盖,便于部署;未设置则沿用原默认)。"""
  95. def __init__(self) -> None:
  96. pool = _create_pool(
  97. host=os.environ.get("MYSQL_TEST2_HOST", "47.119.164.65"),
  98. port=int(os.environ.get("MYSQL_TEST2_PORT", "3306")),
  99. user=os.environ.get("MYSQL_TEST2_USER", "test_c"),
  100. password=os.environ.get("MYSQL_TEST2_PASSWORD", "Dfwy@2025"),
  101. database=os.environ.get("MYSQL_TEST2_DB", "test2"),
  102. )
  103. super().__init__(pool)
  104. class MySQLPool39(_MySQLPoolBase):
  105. """drug_retrieve 库。"""
  106. def __init__(self) -> None:
  107. pool = _create_pool(
  108. host=os.environ.get("MYSQL_DRUG_HOST", "39.108.116.125"),
  109. port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")),
  110. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  111. password=os.environ.get("MYSQL_DRUG_PASSWORD", "Pem287cwM58jNpe2"),
  112. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  113. )
  114. super().__init__(pool)
  115. class MySQLPoolOnline(_MySQLPoolBase):
  116. """drug_retrieve 库。"""
  117. def __init__(self) -> None:
  118. pool = _create_pool(
  119. host=os.environ.get("MYSQL_DRUG_HOST", "120.24.49.2"),
  120. port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")),
  121. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  122. password=os.environ.get("MYSQL_DRUG_PASSWORD", "ksCt3xm6chzdkafj"),
  123. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  124. )
  125. super().__init__(pool)
  126. if __name__ == "__main__":
  127. pool = MySQLPool()
  128. rows = pool.select_data("SELECT 1 AS one")
  129. print(rows)