import logging import pymysql from pymysql.cursors import DictCursor from dbutils.pooled_db import PooledDB import redis class DBConnectionManager: """数据库连接管理器,负责管理MySQL和Redis连接池""" _instance = None @classmethod def get_instance(cls, mysql_config=None, redis_config=None): """获取单例实例 Args: mysql_config: MySQL配置 redis_config: Redis配置 Returns: DBConnectionManager实例 """ if cls._instance is None: cls._instance = cls(mysql_config, redis_config) elif mysql_config or redis_config: # 如果已经有实例但又传入了配置,则更新配置 if mysql_config and not cls._instance.mysql_pool: cls._instance.init_mysql_pool(mysql_config) if redis_config and not cls._instance.redis_pool: cls._instance.init_redis_pool(redis_config) return cls._instance def __init__(self, mysql_config=None, redis_config=None): """初始化数据库连接管理器 Args: mysql_config: MySQL配置 redis_config: Redis配置 """ self.logger = logging.getLogger("DBConnectionManager") self.mysql_pool = None self.redis_pool = None # 初始化MySQL连接池 if mysql_config: self.init_mysql_pool(mysql_config) # 初始化Redis连接池 if redis_config: self.init_redis_pool(redis_config) def init_mysql_pool(self, config): """初始化MySQL连接池 Args: config: MySQL配置,包含host, port, user, password, database等 """ try: if not config: self.logger.warning("MySQL配置为空,跳过初始化") return self.mysql_pool = PooledDB( creator=pymysql, maxconnections=10, # 连接池最大连接数 mincached=2, # 初始化时,连接池中至少创建的空闲的连接 maxcached=5, # 连接池中最多闲置的连接 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表 ping=0, # ping MySQL服务端,检查是否服务可用 host=config.get('host', 'localhost'), port=config.get('port', 3306), user=config.get('user', 'root'), password=config.get('password', ''), database=config.get('database', ''), charset=config.get('charset', 'utf8mb4'), cursorclass=DictCursor ) self.logger.info("MySQL连接池初始化成功") except Exception as e: self.logger.error(f"MySQL连接池初始化失败: {e}") self.mysql_pool = None def init_redis_pool(self, config): """初始化Redis连接池 Args: config: Redis配置,包含host, port, password, db等 """ try: if not config: self.logger.warning("Redis配置为空,跳过初始化") return self.redis_pool = redis.ConnectionPool( host=config.get('host', 'localhost'), port=config.get('port', 6379), password=config.get('password', None), db=config.get('db', 0), decode_responses=config.get('decode_responses', True), max_connections=config.get('max_connections', 10) ) self.logger.info("Redis连接池初始化成功") except Exception as e: self.logger.error(f"Redis连接池初始化失败: {e}") self.redis_pool = None def get_mysql_connection(self): """获取MySQL连接 Returns: MySQL连接 Raises: Exception: MySQL连接池未初始化 """ if self.mysql_pool is None: raise Exception("MySQL连接池未初始化") return self.mysql_pool.connection() def get_redis_connection(self): """获取Redis连接 Returns: Redis连接 Raises: Exception: Redis连接池未初始化 """ if self.redis_pool is None: raise Exception("Redis连接池未初始化") return redis.Redis(connection_pool=self.redis_pool) def close(self): """关闭所有连接池""" # MySQL连接池会自动管理连接的关闭 self.mysql_pool = None # 关闭Redis连接池 if self.redis_pool: self.redis_pool.disconnect() self.redis_pool = None