Files
abot/db/connection.py
2025-09-22 13:57:25 +08:00

143 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import mysql.connector
import redis
class DBConnectionManager:
"""数据库连接管理器负责管理MySQL和Redis连接池"""
_instance = None
@classmethod
def get_instance(cls, mysql_config=None, redis_config=None):
"""获取单例实例
Args:
mysql_config: MySQL配置
redis_config: Redis配置
Returns:
DBConnectionManager实例
"""
if cls._instance is None:
cls._instance = cls(mysql_config, redis_config)
elif mysql_config or redis_config:
# 如果已经有实例但又传入了配置,则更新配置
if mysql_config and not cls._instance.mysql_pool:
cls._instance.init_mysql_pool(mysql_config)
if redis_config and not cls._instance.redis_pool:
cls._instance.init_redis_pool(redis_config)
return cls._instance
def __init__(self, mysql_config=None, redis_config=None):
"""初始化数据库连接管理器
Args:
mysql_config: MySQL配置
redis_config: Redis配置
"""
self.LOG = logger
self.mysql_pool = None
self.redis_pool = None
# 初始化MySQL连接池
if mysql_config:
self.init_mysql_pool(mysql_config)
# 初始化Redis连接池
if redis_config:
self.init_redis_pool(redis_config)
def init_mysql_pool(self, config):
"""初始化MySQL连接池
Args:
config: MySQL配置包含host, port, user, password, database等
"""
try:
if not config:
self.LOG.warning("MySQL配置为空跳过初始化")
return
# 准备连接池配置
pool_config = {
'pool_name': 'wechat_robot_pool',
'pool_size': 30, # 连接池大小
'host': config.get('host', 'localhost'),
'port': config.get('port', 3306),
'user': config.get('user', 'root'),
'password': config.get('password', ''),
'database': config.get('database', ''),
'charset': config.get('charset', 'utf8mb4'),
'use_pure': True, # 使用纯Python实现
'autocommit': True
}
# 创建连接池
self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config)
self.LOG.info("MySQL连接池初始化成功")
except Exception as e:
self.LOG.error(f"MySQL连接池初始化失败: {e}")
self.mysql_pool = None
def init_redis_pool(self, config):
"""初始化Redis连接池
Args:
config: Redis配置包含host, port, password, db等
"""
try:
if not config:
self.LOG.warning("Redis配置为空跳过初始化")
return
self.redis_pool = redis.ConnectionPool(
host=config.get('host', 'localhost'),
port=config.get('port', 6379),
password=config.get('password', None),
db=config.get('db', 0),
decode_responses=config.get('decode_responses', True),
max_connections=config.get('max_connections', 30)
)
self.LOG.info("Redis连接池初始化成功")
except Exception as e:
self.LOG.error(f"Redis连接池初始化失败: {e}")
self.redis_pool = None
def get_mysql_connection(self):
"""获取MySQL连接
Returns:
MySQL连接
Raises:
Exception: MySQL连接池未初始化
"""
if self.mysql_pool is None:
raise Exception("MySQL连接池未初始化")
return self.mysql_pool.get_connection()
def get_redis_connection(self):
"""获取Redis连接
Returns:
Redis连接
Raises:
Exception: Redis连接池未初始化
"""
if self.redis_pool is None:
raise Exception("Redis连接池未初始化")
return redis.Redis(connection_pool=self.redis_pool)
def close(self):
"""关闭所有连接池"""
# MySQL连接池会自动管理连接的关闭
self.mysql_pool = None
# 关闭Redis连接池
if self.redis_pool:
self.redis_pool.disconnect()
self.redis_pool = None