conn_mysql.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import logging
  2. import os
  3. import pymysql
  4. from dbutils.pooled_db import PooledDB
  5. from pymysql.cursors import DictCursor
  6. logger = logging.getLogger(__name__)
  7. # 连接池公共参数(可自行调大 maxconnections 等)
  8. _POOL_COMMON = {
  9. "maxconnections": 10,
  10. "mincached": 2,
  11. "maxcached": 5,
  12. "blocking": True,
  13. }
  14. def _create_pool(
  15. host,
  16. port,
  17. user,
  18. password,
  19. database,
  20. ):
  21. return PooledDB(
  22. creator=pymysql,
  23. cursorclass=DictCursor,
  24. charset="utf8mb4",
  25. host=host,
  26. port=port,
  27. user=user,
  28. password=password,
  29. database=database,
  30. **_POOL_COMMON,
  31. )
  32. class _MySQLPoolBase:
  33. """共用的取连接、写库、读库逻辑,避免两套类复制粘贴。"""
  34. def __init__(self, pool):
  35. self.pool = pool
  36. def get_conn(self):
  37. return self.pool.connection()
  38. def _close(self, conn, cursor) -> None:
  39. try:
  40. if cursor:
  41. cursor.close()
  42. finally:
  43. if conn:
  44. conn.close()
  45. def _write(self, fn, *args, **kwargs):
  46. conn = self.get_conn()
  47. cursor = conn.cursor()
  48. try:
  49. result = fn(cursor, *args, **kwargs)
  50. conn.commit()
  51. return result
  52. except Exception as e:
  53. logger.error("SQL 执行错误: %s", e)
  54. conn.rollback()
  55. return None
  56. finally:
  57. self._close(conn, cursor)
  58. def execute_many(self, sql, data):
  59. rows = list(data)
  60. if not rows:
  61. return
  62. def _do(c, s, d):
  63. c.executemany(s, d)
  64. self._write(_do, sql, rows)
  65. def execute_one(self, sql, data):
  66. def _do(c, s, d):
  67. c.execute(s, d)
  68. self._write(_do, sql, data)
  69. def update_data(self, sql, data):
  70. self.execute_one(sql, data)
  71. def select_data(self, sql,data=None):
  72. """只读查询,不应 commit。"""
  73. conn = self.get_conn()
  74. cursor = conn.cursor()
  75. try:
  76. if data:
  77. cursor.execute(sql,data)
  78. return cursor.fetchall()
  79. else:
  80. cursor.execute(sql)
  81. return cursor.fetchall()
  82. except Exception as e:
  83. logger.error("SQL 查询错误: %s", e)
  84. return []
  85. finally:
  86. self._close(conn, cursor)
  87. def execute(self, sql, data=None):
  88. """通用写操作,返回受影响行数;失败返回 0。"""
  89. def _do(c, s, d):
  90. if d is not None:
  91. return c.execute(s, d)
  92. return c.execute(s)
  93. result = self._write(_do, sql, data)
  94. return result if isinstance(result, int) else 0
  95. class MySQLPool(_MySQLPoolBase):
  96. """test2 库(可通过环境变量覆盖,便于部署;未设置则沿用原默认)。"""
  97. def __init__(self):
  98. pool = _create_pool(
  99. host=os.environ.get("MYSQL_TEST2_HOST", "47.119.164.65"),
  100. port=int(os.environ.get("MYSQL_TEST2_PORT", "3306")),
  101. user=os.environ.get("MYSQL_TEST2_USER", "test_c"),
  102. password=os.environ.get("MYSQL_TEST2_PASSWORD", "Dfwy@2025"),
  103. database=os.environ.get("MYSQL_TEST2_DB", "test2"),
  104. )
  105. super().__init__(pool)
  106. class MySQLPool39(_MySQLPoolBase):
  107. """drug_retrieve 库。"""
  108. def __init__(self):
  109. pool = _create_pool(
  110. host=os.environ.get("MYSQL_DRUG_HOST", "39.108.116.125"),
  111. port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")),
  112. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  113. password=os.environ.get("MYSQL_DRUG_PASSWORD", "Pem287cwM58jNpe2"),
  114. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  115. )
  116. super().__init__(pool)
  117. class MySQLPoolOnline(_MySQLPoolBase):
  118. """drug_retrieve 库。"""
  119. def __init__(self):
  120. pool = _create_pool(
  121. host=os.environ.get("MYSQL_DRUG_HOST", "120.25.48.236"),
  122. port=int(os.environ.get("MYSQL_DRUG_PORT", "3306")),
  123. user=os.environ.get("MYSQL_DRUG_USER", "drug_retrieve"),
  124. password=os.environ.get("MYSQL_DRUG_PASSWORD", "ksCt3xm6chzdkafj"),
  125. database=os.environ.get("MYSQL_DRUG_DB", "drug_retrieve"),
  126. )
  127. super().__init__(pool)
  128. if __name__ == "__main__":
  129. pool = MySQLPool()
  130. rows = pool.select_data("SELECT 1 AS one")
  131. print(rows)