from loguru import logger import mysql.connector import redis class DBConnectionManager: """数据库连接管理器,负责管理MySQL和Redis连接池""" _instance = None @classmethod def get_instance(cls, mysql_config=None, redis_config=None): """获取单例实例 Args: mysql_config: MySQL配置 redis_config: Redis配置 Returns: DBConnectionManager实例 """ if cls._instance is None: cls._instance = cls(mysql_config, redis_config) elif mysql_config or redis_config: # 如果已经有实例但又传入了配置,则更新配置 if mysql_config and not cls._instance.mysql_pool: cls._instance.init_mysql_pool(mysql_config) if redis_config and not cls._instance.redis_pool: cls._instance.init_redis_pool(redis_config) return cls._instance def __init__(self, mysql_config=None, redis_config=None): """初始化数据库连接管理器 Args: mysql_config: MySQL配置 redis_config: Redis配置 """ self.LOG = logger self.mysql_pool = None self.redis_pool = None # 保留原始配置快照,方便后台健康看板、安全校验等辅助能力读取基础元信息。 # 注意这里只存轻量配置引用,不持久化敏感信息,也不改变现有连接池行为。 self.mysql_config = dict(mysql_config or {}) self.redis_config = dict(redis_config or {}) # 初始化MySQL连接池 if mysql_config: self.init_mysql_pool(mysql_config) # 初始化Redis连接池 if redis_config: self.init_redis_pool(redis_config) def init_mysql_pool(self, config): """初始化MySQL连接池 Args: config: MySQL配置,包含host, port, user, password, database等 """ try: if not config: self.LOG.warning("MySQL配置为空,跳过初始化") return # 记录最新配置,供运行态摘要读取当前 database / 阈值等基础信息。 self.mysql_config = dict(config or {}) # 准备连接池配置 pool_config = { 'pool_name': 'wechat_robot_pool', 'pool_size': 30, # 连接池大小 'host': config.get('host', 'localhost'), 'port': config.get('port', 3306), 'user': config.get('user', 'root'), 'password': config.get('password', ''), 'database': config.get('database', ''), 'charset': config.get('charset', 'utf8mb4'), 'use_pure': True, # 使用纯Python实现 'autocommit': True } # 创建连接池 self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config) self.LOG.info("MySQL连接池初始化成功") except Exception as e: self.LOG.error(f"MySQL连接池初始化失败: {e}") self.mysql_pool = None def init_redis_pool(self, config): """初始化Redis连接池 Args: config: Redis配置,包含host, port, password, db等 """ try: if not config: self.LOG.warning("Redis配置为空,跳过初始化") return # Redis db 序号等信息首页会直接展示,因此这里同步保存配置快照。 self.redis_config = dict(config or {}) self.redis_pool = redis.ConnectionPool( host=config.get('host', 'localhost'), port=config.get('port', 6379), password=config.get('password', None), db=config.get('db', 0), decode_responses=config.get('decode_responses', True), max_connections=config.get('max_connections', 30) ) self.LOG.info("Redis连接池初始化成功") except Exception as e: self.LOG.error(f"Redis连接池初始化失败: {e}") self.redis_pool = None def get_mysql_connection(self): """获取MySQL连接 Returns: MySQL连接 Raises: Exception: MySQL连接池未初始化 """ if self.mysql_pool is None: raise Exception("MySQL连接池未初始化") return self.mysql_pool.get_connection() def get_redis_connection(self): """获取Redis连接 Returns: Redis连接 Raises: Exception: Redis连接池未初始化 """ if self.redis_pool is None: raise Exception("Redis连接池未初始化") return redis.Redis(connection_pool=self.redis_pool) def get_mysql_database_name(self) -> str: """返回当前 MySQL 连接配置中的数据库名。""" return str((self.mysql_config or {}).get('database') or '').strip() def get_slow_query_threshold_ms(self, default: int = 300) -> int: """返回慢 SQL 阈值(毫秒)。 兼容说明: 1. 新版部分分支会在配置里维护 `slow_query_threshold_ms`; 2. 旧版没有时直接回退默认值,避免后台健康接口因辅助字段缺失而报错。 """ try: value = (self.mysql_config or {}).get('slow_query_threshold_ms', default) return int(float(value)) except (TypeError, ValueError): return int(default) def close(self): """关闭所有连接池""" # MySQL连接池会自动管理连接的关闭 self.mysql_pool = None # 关闭Redis连接池 if self.redis_pool: self.redis_pool.disconnect() self.redis_pool = None