169 lines
6.0 KiB
Python
169 lines
6.0 KiB
Python
|
||
from loguru import logger
|
||
import mysql.connector
|
||
import redis
|
||
|
||
class DBConnectionManager:
|
||
"""数据库连接管理器,负责管理MySQL和Redis连接池"""
|
||
|
||
_instance = None
|
||
|
||
@classmethod
|
||
def get_instance(cls, mysql_config=None, redis_config=None):
|
||
"""获取单例实例
|
||
|
||
Args:
|
||
mysql_config: MySQL配置
|
||
redis_config: Redis配置
|
||
|
||
Returns:
|
||
DBConnectionManager实例
|
||
"""
|
||
if cls._instance is None:
|
||
cls._instance = cls(mysql_config, redis_config)
|
||
elif mysql_config or redis_config:
|
||
# 如果已经有实例但又传入了配置,则更新配置
|
||
if mysql_config and not cls._instance.mysql_pool:
|
||
cls._instance.init_mysql_pool(mysql_config)
|
||
if redis_config and not cls._instance.redis_pool:
|
||
cls._instance.init_redis_pool(redis_config)
|
||
return cls._instance
|
||
|
||
def __init__(self, mysql_config=None, redis_config=None):
|
||
"""初始化数据库连接管理器
|
||
|
||
Args:
|
||
mysql_config: MySQL配置
|
||
redis_config: Redis配置
|
||
"""
|
||
self.LOG = logger
|
||
self.mysql_pool = None
|
||
self.redis_pool = None
|
||
# 保留原始配置快照,方便后台健康看板、安全校验等辅助能力读取基础元信息。
|
||
# 注意这里只存轻量配置引用,不持久化敏感信息,也不改变现有连接池行为。
|
||
self.mysql_config = dict(mysql_config or {})
|
||
self.redis_config = dict(redis_config or {})
|
||
|
||
# 初始化MySQL连接池
|
||
if mysql_config:
|
||
self.init_mysql_pool(mysql_config)
|
||
|
||
# 初始化Redis连接池
|
||
if redis_config:
|
||
self.init_redis_pool(redis_config)
|
||
|
||
def init_mysql_pool(self, config):
|
||
"""初始化MySQL连接池
|
||
|
||
Args:
|
||
config: MySQL配置,包含host, port, user, password, database等
|
||
"""
|
||
try:
|
||
if not config:
|
||
self.LOG.warning("MySQL配置为空,跳过初始化")
|
||
return
|
||
# 记录最新配置,供运行态摘要读取当前 database / 阈值等基础信息。
|
||
self.mysql_config = dict(config or {})
|
||
|
||
# 准备连接池配置
|
||
pool_config = {
|
||
'pool_name': 'wechat_robot_pool',
|
||
'pool_size': 30, # 连接池大小
|
||
'host': config.get('host', 'localhost'),
|
||
'port': config.get('port', 3306),
|
||
'user': config.get('user', 'root'),
|
||
'password': config.get('password', ''),
|
||
'database': config.get('database', ''),
|
||
'charset': config.get('charset', 'utf8mb4'),
|
||
'use_pure': True, # 使用纯Python实现
|
||
'autocommit': True
|
||
}
|
||
|
||
# 创建连接池
|
||
self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config)
|
||
self.LOG.info("MySQL连接池初始化成功")
|
||
except Exception as e:
|
||
self.LOG.error(f"MySQL连接池初始化失败: {e}")
|
||
self.mysql_pool = None
|
||
|
||
def init_redis_pool(self, config):
|
||
"""初始化Redis连接池
|
||
|
||
Args:
|
||
config: Redis配置,包含host, port, password, db等
|
||
"""
|
||
try:
|
||
if not config:
|
||
self.LOG.warning("Redis配置为空,跳过初始化")
|
||
return
|
||
# Redis db 序号等信息首页会直接展示,因此这里同步保存配置快照。
|
||
self.redis_config = dict(config or {})
|
||
|
||
self.redis_pool = redis.ConnectionPool(
|
||
host=config.get('host', 'localhost'),
|
||
port=config.get('port', 6379),
|
||
password=config.get('password', None),
|
||
db=config.get('db', 0),
|
||
decode_responses=config.get('decode_responses', True),
|
||
max_connections=config.get('max_connections', 30)
|
||
)
|
||
self.LOG.info("Redis连接池初始化成功")
|
||
except Exception as e:
|
||
self.LOG.error(f"Redis连接池初始化失败: {e}")
|
||
self.redis_pool = None
|
||
|
||
def get_mysql_connection(self):
|
||
"""获取MySQL连接
|
||
|
||
Returns:
|
||
MySQL连接
|
||
|
||
Raises:
|
||
Exception: MySQL连接池未初始化
|
||
"""
|
||
if self.mysql_pool is None:
|
||
raise Exception("MySQL连接池未初始化")
|
||
|
||
return self.mysql_pool.get_connection()
|
||
|
||
def get_redis_connection(self):
|
||
"""获取Redis连接
|
||
|
||
Returns:
|
||
Redis连接
|
||
|
||
Raises:
|
||
Exception: Redis连接池未初始化
|
||
"""
|
||
if self.redis_pool is None:
|
||
raise Exception("Redis连接池未初始化")
|
||
|
||
return redis.Redis(connection_pool=self.redis_pool)
|
||
|
||
def get_mysql_database_name(self) -> str:
|
||
"""返回当前 MySQL 连接配置中的数据库名。"""
|
||
return str((self.mysql_config or {}).get('database') or '').strip()
|
||
|
||
def get_slow_query_threshold_ms(self, default: int = 300) -> int:
|
||
"""返回慢 SQL 阈值(毫秒)。
|
||
|
||
兼容说明:
|
||
1. 新版部分分支会在配置里维护 `slow_query_threshold_ms`;
|
||
2. 旧版没有时直接回退默认值,避免后台健康接口因辅助字段缺失而报错。
|
||
"""
|
||
try:
|
||
value = (self.mysql_config or {}).get('slow_query_threshold_ms', default)
|
||
return int(float(value))
|
||
except (TypeError, ValueError):
|
||
return int(default)
|
||
|
||
def close(self):
|
||
"""关闭所有连接池"""
|
||
# MySQL连接池会自动管理连接的关闭
|
||
self.mysql_pool = None
|
||
|
||
# 关闭Redis连接池
|
||
if self.redis_pool:
|
||
self.redis_pool.disconnect()
|
||
self.redis_pool = None
|