Files
abot/db/connection.py

169 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import mysql.connector
import redis
class DBConnectionManager:
"""数据库连接管理器负责管理MySQL和Redis连接池"""
_instance = None
@classmethod
def get_instance(cls, mysql_config=None, redis_config=None):
"""获取单例实例
Args:
mysql_config: MySQL配置
redis_config: Redis配置
Returns:
DBConnectionManager实例
"""
if cls._instance is None:
cls._instance = cls(mysql_config, redis_config)
elif mysql_config or redis_config:
# 如果已经有实例但又传入了配置,则更新配置
if mysql_config and not cls._instance.mysql_pool:
cls._instance.init_mysql_pool(mysql_config)
if redis_config and not cls._instance.redis_pool:
cls._instance.init_redis_pool(redis_config)
return cls._instance
def __init__(self, mysql_config=None, redis_config=None):
"""初始化数据库连接管理器
Args:
mysql_config: MySQL配置
redis_config: Redis配置
"""
self.LOG = logger
self.mysql_pool = None
self.redis_pool = None
# 保留原始配置快照,方便后台健康看板、安全校验等辅助能力读取基础元信息。
# 注意这里只存轻量配置引用,不持久化敏感信息,也不改变现有连接池行为。
self.mysql_config = dict(mysql_config or {})
self.redis_config = dict(redis_config or {})
# 初始化MySQL连接池
if mysql_config:
self.init_mysql_pool(mysql_config)
# 初始化Redis连接池
if redis_config:
self.init_redis_pool(redis_config)
def init_mysql_pool(self, config):
"""初始化MySQL连接池
Args:
config: MySQL配置包含host, port, user, password, database等
"""
try:
if not config:
self.LOG.warning("MySQL配置为空跳过初始化")
return
# 记录最新配置,供运行态摘要读取当前 database / 阈值等基础信息。
self.mysql_config = dict(config or {})
# 准备连接池配置
pool_config = {
'pool_name': 'wechat_robot_pool',
'pool_size': 30, # 连接池大小
'host': config.get('host', 'localhost'),
'port': config.get('port', 3306),
'user': config.get('user', 'root'),
'password': config.get('password', ''),
'database': config.get('database', ''),
'charset': config.get('charset', 'utf8mb4'),
'use_pure': True, # 使用纯Python实现
'autocommit': True
}
# 创建连接池
self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config)
self.LOG.info("MySQL连接池初始化成功")
except Exception as e:
self.LOG.error(f"MySQL连接池初始化失败: {e}")
self.mysql_pool = None
def init_redis_pool(self, config):
"""初始化Redis连接池
Args:
config: Redis配置包含host, port, password, db等
"""
try:
if not config:
self.LOG.warning("Redis配置为空跳过初始化")
return
# Redis db 序号等信息首页会直接展示,因此这里同步保存配置快照。
self.redis_config = dict(config or {})
self.redis_pool = redis.ConnectionPool(
host=config.get('host', 'localhost'),
port=config.get('port', 6379),
password=config.get('password', None),
db=config.get('db', 0),
decode_responses=config.get('decode_responses', True),
max_connections=config.get('max_connections', 30)
)
self.LOG.info("Redis连接池初始化成功")
except Exception as e:
self.LOG.error(f"Redis连接池初始化失败: {e}")
self.redis_pool = None
def get_mysql_connection(self):
"""获取MySQL连接
Returns:
MySQL连接
Raises:
Exception: MySQL连接池未初始化
"""
if self.mysql_pool is None:
raise Exception("MySQL连接池未初始化")
return self.mysql_pool.get_connection()
def get_redis_connection(self):
"""获取Redis连接
Returns:
Redis连接
Raises:
Exception: Redis连接池未初始化
"""
if self.redis_pool is None:
raise Exception("Redis连接池未初始化")
return redis.Redis(connection_pool=self.redis_pool)
def get_mysql_database_name(self) -> str:
"""返回当前 MySQL 连接配置中的数据库名。"""
return str((self.mysql_config or {}).get('database') or '').strip()
def get_slow_query_threshold_ms(self, default: int = 300) -> int:
"""返回慢 SQL 阈值(毫秒)。
兼容说明:
1. 新版部分分支会在配置里维护 `slow_query_threshold_ms`
2. 旧版没有时直接回退默认值,避免后台健康接口因辅助字段缺失而报错。
"""
try:
value = (self.mysql_config or {}).get('slow_query_threshold_ms', default)
return int(float(value))
except (TypeError, ValueError):
return int(default)
def close(self):
"""关闭所有连接池"""
# MySQL连接池会自动管理连接的关闭
self.mysql_pool = None
# 关闭Redis连接池
if self.redis_pool:
self.redis_pool.disconnect()
self.redis_pool = None