from datetime import datetime, timedelta from typing import Dict, List, Optional, Union, Tuple from db.base import BaseDBOperator from db.connection import DBConnectionManager class StatsDBOperator(BaseDBOperator): """统计数据库操作类""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) def record_plugin_call(self, plugin_name: str, command: str, user_id: str, group_id: Optional[str], success: bool, process_time_ms: float) -> bool: """记录插件调用信息 Args: plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID group_id: 群组ID,私聊为None success: 是否调用成功 process_time_ms: 处理时间(毫秒) Returns: 是否成功记录 """ # 1. 更新插件统计汇总表 self._update_plugin_stats(plugin_name, command, success, process_time_ms, True if group_id else False) # 2. 更新用户使用统计表 self._update_user_stats(user_id, plugin_name, command, success) # 3. 如果是群聊,更新群组使用统计表 if group_id: self._update_group_stats(group_id, plugin_name, command, user_id, success) return True def record_error(self, plugin_name: str, command: str, user_id: str, group_id: Optional[str], error_message: str, stack_trace: Optional[str] = None) -> bool: """记录错误信息 Args: plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID group_id: 群组ID,私聊为None error_message: 错误信息 stack_trace: 堆栈跟踪 Returns: 是否成功记录 """ sql = """ INSERT INTO t_error_logs (plugin_name, command, user_id, group_id, error_message, stack_trace) VALUES (%s, %s, %s, %s, %s, %s) """ params = (plugin_name, command, user_id, group_id, error_message, stack_trace) return self.execute_update(sql, params) def _update_plugin_stats(self, plugin_name: str, command: str, success: bool, process_time_ms: float, is_group: bool) -> bool: """更新插件统计汇总表 Args: plugin_name: 插件名称 command: 触发的命令 success: 是否调用成功 process_time_ms: 处理时间(毫秒) is_group: 是否群聊 Returns: 是否成功更新 """ today = datetime.now().strftime("%Y-%m-%d") # 先查询当前记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, group_calls, private_calls, avg_process_time FROM t_plugin_stats WHERE plugin_name = %s AND command = %s AND stat_date = %s """ query_params = (plugin_name, command, today) result = self.execute_query(query_sql, query_params, fetch_one=True) if result: # 更新现有记录 total_calls = result['total_calls'] + 1 success_calls = result['success_calls'] + (1 if success else 0) failed_calls = result['failed_calls'] + (0 if success else 1) group_calls = result['group_calls'] + (1 if is_group else 0) private_calls = result['private_calls'] + (0 if is_group else 1) # 计算新的平均处理时间 old_avg = result['avg_process_time'] new_avg = ((old_avg * (total_calls - 1)) + process_time_ms) / total_calls update_sql = """ UPDATE t_plugin_stats SET total_calls = %s, success_calls = %s, failed_calls = %s, group_calls = %s, private_calls = %s, avg_process_time = %s WHERE id = %s """ update_params = (total_calls, success_calls, failed_calls, group_calls, private_calls, new_avg, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_plugin_stats (plugin_name, command, stat_date, total_calls, success_calls, failed_calls, group_calls, private_calls, avg_process_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( plugin_name, command, today, 1, 1 if success else 0, 0 if success else 1, 1 if is_group else 0, 0 if is_group else 1, process_time_ms ) return self.execute_update(insert_sql, insert_params) def _update_user_stats(self, user_id: str, plugin_name: str, command: str, success: bool) -> bool: """更新用户使用统计表 Args: user_id: 用户ID plugin_name: 插件名称 command: 触发的命令 success: 是否调用成功 Returns: 是否成功更新 """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询是否存在记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, first_used_at FROM t_user_stats WHERE user_id = %s AND plugin_name = %s AND command = %s """ query_params = (user_id, plugin_name, command) result = self.execute_query(query_sql, query_params, fetch_one=True) if result: # 更新现有记录 update_sql = """ UPDATE t_user_stats SET total_calls = total_calls + 1, success_calls = success_calls + %s, failed_calls = failed_calls + %s, last_used_at = %s WHERE id = %s """ update_params = (1 if success else 0, 0 if success else 1, now, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_user_stats (user_id, plugin_name, command, total_calls, success_calls, failed_calls, first_used_at, last_used_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( user_id, plugin_name, command, 1, 1 if success else 0, 0 if success else 1, now, now ) return self.execute_update(insert_sql, insert_params) def _update_group_stats(self, group_id: str, plugin_name: str, command: str, user_id: str, success: bool) -> bool: """更新群组使用统计表 Args: group_id: 群组ID plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID success: 是否调用成功 Returns: 是否成功更新 """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询是否存在记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, unique_users, first_used_at FROM t_group_stats WHERE group_id = %s AND plugin_name = %s AND command = %s """ query_params = (group_id, plugin_name, command) result = self.execute_query(query_sql, query_params, fetch_one=True) # 查询该命令的唯一用户 user_query_sql = """ SELECT COUNT(DISTINCT user_id) as user_count FROM t_user_stats WHERE plugin_name = %s AND command = %s """ user_query_params = (plugin_name, command) user_result = self.execute_query(user_query_sql, user_query_params, fetch_one=True) unique_users = user_result['user_count'] if user_result else 1 if result: # 更新现有记录 update_sql = """ UPDATE t_group_stats SET total_calls = total_calls + 1, success_calls = success_calls + %s, failed_calls = failed_calls + %s, unique_users = %s, last_used_at = %s WHERE id = %s """ update_params = (1 if success else 0, 0 if success else 1, unique_users, now, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_group_stats (group_id, plugin_name, command, total_calls, success_calls, failed_calls, unique_users, first_used_at, last_used_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( group_id, plugin_name, command, 1, 1 if success else 0, 0 if success else 1, unique_users, now, now ) return self.execute_update(insert_sql, insert_params) def get_plugin_stats(self, days: int = 7) -> List[Dict]: """获取插件使用统计 Args: days: 统计天数 Returns: 插件统计列表 """ sql = """ SELECT plugin_name, command, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls, AVG(avg_process_time) as avg_process_time FROM t_plugin_stats WHERE stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY plugin_name, command ORDER BY total_calls DESC """ return self.execute_query(sql, (days,)) or [] def get_user_stats(self, days: int = 7, limit: int = 20) -> List[Dict]: """获取用户使用统计 Args: days: 统计天数 limit: 返回记录数量限制 Returns: 用户统计列表 """ sql = """ SELECT user_id, SUM(total_calls) as total_calls, COUNT(DISTINCT plugin_name) as used_plugins FROM t_user_stats WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY user_id ORDER BY total_calls DESC LIMIT %s """ return self.execute_query(sql, (days, limit)) or [] def get_group_stats(self, days: int = 7, limit: int = 20) -> List[Dict]: """获取群组使用统计 Args: days: 统计天数 limit: 返回记录数量限制 Returns: 群组统计列表 """ sql = """ SELECT group_id, SUM(total_calls) as total_calls, COUNT(DISTINCT plugin_name) as used_plugins, MAX(unique_users) as max_unique_users FROM t_group_stats WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY group_id ORDER BY total_calls DESC LIMIT %s """ return self.execute_query(sql, (days, limit)) or [] def get_error_logs(self, days: int = 7, limit: int = 100) -> List[Dict]: """获取错误日志 Args: days: 统计天数 limit: 返回记录数量限制 Returns: 错误日志列表 """ sql = """ SELECT id, plugin_name, command, user_id, group_id, error_message, stack_trace, created_at FROM t_error_logs WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) ORDER BY created_at DESC LIMIT %s """ return self.execute_query(sql, (days, limit)) or [] def get_error_detail(self, error_id: int) -> Optional[Dict]: """获取错误详情 Args: error_id: 错误ID Returns: 错误详情 """ sql = """ SELECT id, plugin_name, command, user_id, group_id, error_message, stack_trace, created_at FROM t_error_logs WHERE id = %s """ return self.execute_query(sql, (error_id,), fetch_one=True) def get_dashboard_summary(self, days: int = 7) -> Dict[str, Any]: """获取仪表盘摘要数据""" try: # 计算日期范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) # 获取总调用次数 total_calls_query = """ SELECT COUNT(*) as total_calls FROM plugin_calls WHERE created_at >= ? AND created_at <= ? """ total_calls_result = self.db_manager.execute_query(total_calls_query, (start_date, end_date)) total_calls = total_calls_result[0]['total_calls'] if total_calls_result else 0 # 获取成功率 success_rate_query = """ SELECT ROUND((SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*)), 2) as success_rate FROM plugin_calls WHERE created_at >= ? AND created_at <= ? """ success_rate_result = self.db_manager.execute_query(success_rate_query, (start_date, end_date)) success_rate = success_rate_result[0]['success_rate'] if success_rate_result and success_rate_result[0]['success_rate'] is not None else 0 # 获取活跃用户数 active_users_query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM plugin_calls WHERE created_at >= ? AND created_at <= ? """ active_users_result = self.db_manager.execute_query(active_users_query, (start_date, end_date)) active_users = active_users_result[0]['active_users'] if active_users_result else 0 # 获取活跃群组数 active_groups_query = """ SELECT COUNT(DISTINCT group_id) as active_groups FROM plugin_calls WHERE created_at >= ? AND created_at <= ? AND group_id != '' """ active_groups_result = self.db_manager.execute_query(active_groups_query, (start_date, end_date)) active_groups = active_groups_result[0]['active_groups'] if active_groups_result else 0 return { "total_calls": total_calls, "success_rate": success_rate, "active_users": active_users, "active_groups": active_groups } except Exception as e: logging.error(f"获取仪表盘摘要数据出错: {e}") return { "total_calls": 0, "success_rate": 0, "active_users": 0, "active_groups": 0 } def get_plugin_trend(self, plugin_name: str = '', days: int = 7) -> List[Dict[str, Any]]: """获取插件调用趋势数据""" try: # 计算日期范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) # 构建查询条件 params = [start_date, end_date] plugin_condition = "" if plugin_name: plugin_condition = "AND plugin_name = ?" params.append(plugin_name) # 按日期分组查询 query = f""" SELECT date(created_at) as date, COUNT(*) as call_count, SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) as success_count FROM plugin_calls WHERE created_at >= ? AND created_at <= ? {plugin_condition} GROUP BY date(created_at) ORDER BY date """ results = self.db_manager.execute_query(query, tuple(params)) # 确保每一天都有数据 date_format = "%Y-%m-%d" trend_data = [] # 创建日期范围内的所有日期 current_date = start_date while current_date <= end_date: date_str = current_date.strftime(date_format) # 查找当前日期的数据 day_data = next((item for item in results if item['date'] == date_str), None) if day_data: trend_data.append({ "date": date_str, "call_count": day_data['call_count'], "success_count": day_data['success_count'], "success_rate": round(day_data['success_count'] * 100 / day_data['call_count'], 2) if day_data['call_count'] > 0 else 0 }) else: trend_data.append({ "date": date_str, "call_count": 0, "success_count": 0, "success_rate": 0 }) current_date += timedelta(days=1) return trend_data except Exception as e: logging.error(f"获取插件趋势数据出错: {e}") return []