from loguru import logger import mysql.connector import redis class DBConnectionManager: """数据库连接管理器,负责管理MySQL和Redis连接池""" _instance = None @classmethod def get_instance(cls, mysql_config=None, redis_config=None): """获取单例实例 Args: mysql_config: MySQL配置 redis_config: Redis配置 Returns: DBConnectionManager实例 """ if cls._instance is None: cls._instance = cls(mysql_config, redis_config) elif mysql_config or redis_config: # 如果已经有实例但又传入了配置,则更新配置 if mysql_config and not cls._instance.mysql_pool: cls._instance.init_mysql_pool(mysql_config) if redis_config and not cls._instance.redis_pool: cls._instance.init_redis_pool(redis_config) return cls._instance def __init__(self, mysql_config=None, redis_config=None): """初始化数据库连接管理器 Args: mysql_config: MySQL配置 redis_config: Redis配置 """ self.LOG = logger self.mysql_pool = None self.redis_pool = None # 初始化MySQL连接池 if mysql_config: self.init_mysql_pool(mysql_config) # 初始化Redis连接池 if redis_config: self.init_redis_pool(redis_config) def init_mysql_pool(self, config): """初始化MySQL连接池 Args: config: MySQL配置,包含host, port, user, password, database等 """ try: if not config: self.LOG.warning("MySQL配置为空,跳过初始化") return # 准备连接池配置 pool_config = { 'pool_name': 'wechat_robot_pool', 'pool_size': 10, # 连接池大小 'host': config.get('host', 'localhost'), 'port': config.get('port', 3306), 'user': config.get('user', 'root'), 'password': config.get('password', ''), 'database': config.get('database', ''), 'charset': config.get('charset', 'utf8mb4'), 'use_pure': True, # 使用纯Python实现 'autocommit': True } # 创建连接池 self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config) self.LOG.info("MySQL连接池初始化成功") except Exception as e: self.LOG.error(f"MySQL连接池初始化失败: {e}") self.mysql_pool = None def init_redis_pool(self, config): """初始化Redis连接池 Args: config: Redis配置,包含host, port, password, db等 """ try: if not config: self.LOG.warning("Redis配置为空,跳过初始化") return self.redis_pool = redis.ConnectionPool( host=config.get('host', 'localhost'), port=config.get('port', 6379), password=config.get('password', None), db=config.get('db', 0), decode_responses=config.get('decode_responses', True), max_connections=config.get('max_connections', 10) ) self.LOG.info("Redis连接池初始化成功") except Exception as e: self.LOG.error(f"Redis连接池初始化失败: {e}") self.redis_pool = None def get_mysql_connection(self): """获取MySQL连接 Returns: MySQL连接 Raises: Exception: MySQL连接池未初始化 """ if self.mysql_pool is None: raise Exception("MySQL连接池未初始化") return self.mysql_pool.get_connection() def get_redis_connection(self): """获取Redis连接 Returns: Redis连接 Raises: Exception: Redis连接池未初始化 """ if self.redis_pool is None: raise Exception("Redis连接池未初始化") return redis.Redis(connection_pool=self.redis_pool) def close(self): """关闭所有连接池""" # MySQL连接池会自动管理连接的关闭 self.mysql_pool = None # 关闭Redis连接池 if self.redis_pool: self.redis_pool.disconnect() self.redis_pool = None