- 为数据库公共层增加慢 SQL 阈值配置与统一耗时日志记录 - 为消息表补充群成员回溯、群类型过滤和待处理媒体扫描等关键索引 - 将多处按日期查询改为时间范围查询,减少 DATE(timestamp) 导致的索引失效 - 修正消息存储层重复定义的日期范围方法,并更新工程优化文档中的 7.4 当前进展
174 lines
6.0 KiB
Python
174 lines
6.0 KiB
Python
|
||
from loguru import logger
|
||
import mysql.connector
|
||
import redis
|
||
|
||
class DBConnectionManager:
|
||
"""数据库连接管理器,负责管理MySQL和Redis连接池"""
|
||
|
||
_instance = None
|
||
|
||
@classmethod
|
||
def get_instance(cls, mysql_config=None, redis_config=None):
|
||
"""获取单例实例
|
||
|
||
Args:
|
||
mysql_config: MySQL配置
|
||
redis_config: Redis配置
|
||
|
||
Returns:
|
||
DBConnectionManager实例
|
||
"""
|
||
if cls._instance is None:
|
||
cls._instance = cls(mysql_config, redis_config)
|
||
elif mysql_config or redis_config:
|
||
# 如果已经有实例但又传入了配置,则更新配置
|
||
if mysql_config and not cls._instance.mysql_pool:
|
||
cls._instance.init_mysql_pool(mysql_config)
|
||
if redis_config and not cls._instance.redis_pool:
|
||
cls._instance.init_redis_pool(redis_config)
|
||
return cls._instance
|
||
|
||
def __init__(self, mysql_config=None, redis_config=None):
|
||
"""初始化数据库连接管理器
|
||
|
||
Args:
|
||
mysql_config: MySQL配置
|
||
redis_config: Redis配置
|
||
"""
|
||
self.LOG = logger
|
||
self.mysql_pool = None
|
||
self.redis_pool = None
|
||
# 保存原始配置快照,供慢 SQL 阈值、库名探测等公共能力复用:
|
||
# 1. BaseDBOperator 需要读取数据库名,去 information_schema 中检查索引;
|
||
# 2. 慢 SQL 记录需要统一读取阈值配置,而不是每个 DB Operator 各自硬编码;
|
||
# 3. 这里做浅拷贝即可,避免后续外部修改传入 dict 时影响内部状态。
|
||
self.mysql_config = dict(mysql_config or {})
|
||
self.redis_config = dict(redis_config or {})
|
||
|
||
# 初始化MySQL连接池
|
||
if mysql_config:
|
||
self.init_mysql_pool(mysql_config)
|
||
|
||
# 初始化Redis连接池
|
||
if redis_config:
|
||
self.init_redis_pool(redis_config)
|
||
|
||
def init_mysql_pool(self, config):
|
||
"""初始化MySQL连接池
|
||
|
||
Args:
|
||
config: MySQL配置,包含host, port, user, password, database等
|
||
"""
|
||
try:
|
||
if not config:
|
||
self.LOG.warning("MySQL配置为空,跳过初始化")
|
||
return
|
||
|
||
self.mysql_config = dict(config or {})
|
||
|
||
# 准备连接池配置
|
||
pool_config = {
|
||
'pool_name': 'wechat_robot_pool',
|
||
'pool_size': 30, # 连接池大小
|
||
'host': config.get('host', 'localhost'),
|
||
'port': config.get('port', 3306),
|
||
'user': config.get('user', 'root'),
|
||
'password': config.get('password', ''),
|
||
'database': config.get('database', ''),
|
||
'charset': config.get('charset', 'utf8mb4'),
|
||
'use_pure': True, # 使用纯Python实现
|
||
'autocommit': True
|
||
}
|
||
|
||
# 创建连接池
|
||
self.mysql_pool = mysql.connector.pooling.MySQLConnectionPool(**pool_config)
|
||
self.LOG.info("MySQL连接池初始化成功")
|
||
except Exception as e:
|
||
self.LOG.error(f"MySQL连接池初始化失败: {e}")
|
||
self.mysql_pool = None
|
||
|
||
def init_redis_pool(self, config):
|
||
"""初始化Redis连接池
|
||
|
||
Args:
|
||
config: Redis配置,包含host, port, password, db等
|
||
"""
|
||
try:
|
||
if not config:
|
||
self.LOG.warning("Redis配置为空,跳过初始化")
|
||
return
|
||
|
||
self.redis_config = dict(config or {})
|
||
|
||
self.redis_pool = redis.ConnectionPool(
|
||
host=config.get('host', 'localhost'),
|
||
port=config.get('port', 6379),
|
||
password=config.get('password', None),
|
||
db=config.get('db', 0),
|
||
decode_responses=config.get('decode_responses', True),
|
||
max_connections=config.get('max_connections', 30)
|
||
)
|
||
self.LOG.info("Redis连接池初始化成功")
|
||
except Exception as e:
|
||
self.LOG.error(f"Redis连接池初始化失败: {e}")
|
||
self.redis_pool = None
|
||
|
||
def get_mysql_connection(self):
|
||
"""获取MySQL连接
|
||
|
||
Returns:
|
||
MySQL连接
|
||
|
||
Raises:
|
||
Exception: MySQL连接池未初始化
|
||
"""
|
||
if self.mysql_pool is None:
|
||
raise Exception("MySQL连接池未初始化")
|
||
|
||
return self.mysql_pool.get_connection()
|
||
|
||
def get_mysql_database_name(self) -> str:
|
||
"""返回当前 MySQL 目标库名。"""
|
||
return str(self.mysql_config.get('database', '') or '').strip()
|
||
|
||
def get_slow_query_threshold_ms(self) -> int:
|
||
"""读取慢 SQL 阈值,默认 500ms。"""
|
||
try:
|
||
threshold = int(self.mysql_config.get('slow_query_threshold_ms', 500) or 500)
|
||
return threshold if threshold > 0 else 500
|
||
except (TypeError, ValueError):
|
||
return 500
|
||
|
||
def is_slow_query_log_enabled(self) -> bool:
|
||
"""是否启用慢 SQL 日志。"""
|
||
raw_value = self.mysql_config.get('enable_slow_query_log', True)
|
||
if isinstance(raw_value, str):
|
||
normalized = raw_value.strip().lower()
|
||
return normalized not in {'0', 'false', 'off', 'no'}
|
||
return bool(raw_value)
|
||
|
||
def get_redis_connection(self):
|
||
"""获取Redis连接
|
||
|
||
Returns:
|
||
Redis连接
|
||
|
||
Raises:
|
||
Exception: Redis连接池未初始化
|
||
"""
|
||
if self.redis_pool is None:
|
||
raise Exception("Redis连接池未初始化")
|
||
|
||
return redis.Redis(connection_pool=self.redis_pool)
|
||
|
||
def close(self):
|
||
"""关闭所有连接池"""
|
||
# MySQL连接池会自动管理连接的关闭
|
||
self.mysql_pool = None
|
||
|
||
# 关闭Redis连接池
|
||
if self.redis_pool:
|
||
self.redis_pool.disconnect()
|
||
self.redis_pool = None
|