from datetime import datetime, timedelta from typing import Dict, List, Optional, Union, Tuple from db.base import BaseDBOperator from db.connection import DBConnectionManager class StatsDBOperator(BaseDBOperator): """统计数据库操作类""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) self._group_user_tracker_ready = False def _ensure_group_user_tracker_table(self) -> bool: """确保群维度唯一用户追踪表存在""" if self._group_user_tracker_ready: return True sql = """ CREATE TABLE IF NOT EXISTS t_group_command_user_stats ( id BIGINT AUTO_INCREMENT PRIMARY KEY, group_id VARCHAR(50) NOT NULL COMMENT '群组ID', plugin_name VARCHAR(50) NOT NULL COMMENT '插件名称', command VARCHAR(50) NOT NULL COMMENT '触发命令', user_id VARCHAR(50) NOT NULL COMMENT '用户ID', first_used_at DATETIME NOT NULL COMMENT '首次触发时间', last_used_at DATETIME NOT NULL COMMENT '最近触发时间', UNIQUE KEY uk_group_plugin_command_user (group_id, plugin_name, command, user_id), INDEX idx_group_plugin_command (group_id, plugin_name, command), INDEX idx_last_used_at (last_used_at) ) COMMENT='群命令用户去重追踪表' """ ok = self.execute_update(sql) if ok: self._group_user_tracker_ready = True return ok def record_plugin_call(self, plugin_name: str, command: str, user_id: str, group_id: Optional[str], success: bool, process_time_ms: float) -> bool: """记录插件调用信息 Args: plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID group_id: 群组ID,私聊为None success: 是否调用成功 process_time_ms: 处理时间(毫秒) Returns: 是否成功记录 """ # 1. 更新插件统计汇总表 self._update_plugin_stats(plugin_name, command, success, process_time_ms, True if group_id else False) # 2. 更新用户使用统计表 self._update_user_stats(user_id, plugin_name, command, success) # 3. 如果是群聊,更新群组使用统计表 if group_id: self._update_group_stats(group_id, plugin_name, command, user_id, success) return True def record_error(self, plugin_name: str, command: str, user_id: str, group_id: Optional[str], error_message: str, stack_trace: Optional[str] = None) -> bool: """记录错误信息 Args: plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID group_id: 群组ID,私聊为None error_message: 错误信息 stack_trace: 堆栈跟踪 Returns: 是否成功记录 """ sql = """ INSERT INTO t_error_logs (plugin_name, command, user_id, group_id, error_message, stack_trace) VALUES (%s, %s, %s, %s, %s, %s) """ params = (plugin_name, command, user_id, group_id, error_message, stack_trace) return self.execute_update(sql, params) def _update_plugin_stats(self, plugin_name: str, command: str, success: bool, process_time_ms: float, is_group: bool) -> bool: """更新插件统计汇总表 Args: plugin_name: 插件名称 command: 触发的命令 success: 是否调用成功 process_time_ms: 处理时间(毫秒) is_group: 是否群聊 Returns: 是否成功更新 """ today = datetime.now().strftime("%Y-%m-%d") # 先查询当前记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, group_calls, private_calls, avg_process_time FROM t_plugin_stats WHERE plugin_name = %s AND command = %s AND stat_date = %s """ query_params = (plugin_name, command, today) result = self.execute_query(query_sql, query_params, fetch_one=True) if result: # 更新现有记录 total_calls = result['total_calls'] + 1 success_calls = result['success_calls'] + (1 if success else 0) failed_calls = result['failed_calls'] + (0 if success else 1) group_calls = result['group_calls'] + (1 if is_group else 0) private_calls = result['private_calls'] + (0 if is_group else 1) # 计算新的平均处理时间 old_avg = result['avg_process_time'] new_avg = ((old_avg * (total_calls - 1)) + process_time_ms) / total_calls update_sql = """ UPDATE t_plugin_stats SET total_calls = %s, success_calls = %s, failed_calls = %s, group_calls = %s, private_calls = %s, avg_process_time = %s WHERE id = %s """ update_params = (total_calls, success_calls, failed_calls, group_calls, private_calls, new_avg, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_plugin_stats (plugin_name, command, stat_date, total_calls, success_calls, failed_calls, group_calls, private_calls, avg_process_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( plugin_name, command, today, 1, 1 if success else 0, 0 if success else 1, 1 if is_group else 0, 0 if is_group else 1, process_time_ms ) return self.execute_update(insert_sql, insert_params) def _update_user_stats(self, user_id: str, plugin_name: str, command: str, success: bool) -> bool: """更新用户使用统计表 Args: user_id: 用户ID plugin_name: 插件名称 command: 触发的命令 success: 是否调用成功 Returns: 是否成功更新 """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询是否存在记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, first_used_at FROM t_user_stats WHERE user_id = %s AND plugin_name = %s AND command = %s """ query_params = (user_id, plugin_name, command) result = self.execute_query(query_sql, query_params, fetch_one=True) if result: # 更新现有记录 update_sql = """ UPDATE t_user_stats SET total_calls = total_calls + 1, success_calls = success_calls + %s, failed_calls = failed_calls + %s, last_used_at = %s WHERE id = %s """ update_params = (1 if success else 0, 0 if success else 1, now, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_user_stats (user_id, plugin_name, command, total_calls, success_calls, failed_calls, first_used_at, last_used_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( user_id, plugin_name, command, 1, 1 if success else 0, 0 if success else 1, now, now ) return self.execute_update(insert_sql, insert_params) def _update_group_stats(self, group_id: str, plugin_name: str, command: str, user_id: str, success: bool) -> bool: """更新群组使用统计表 Args: group_id: 群组ID plugin_name: 插件名称 command: 触发的命令 user_id: 用户ID success: 是否调用成功 Returns: 是否成功更新 """ now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询是否存在记录 query_sql = """ SELECT id, total_calls, success_calls, failed_calls, unique_users, first_used_at FROM t_group_stats WHERE group_id = %s AND plugin_name = %s AND command = %s """ query_params = (group_id, plugin_name, command) result = self.execute_query(query_sql, query_params, fetch_one=True) # 记录“群 + 插件 + 命令 + 用户”去重关系,再计算该群该命令唯一用户数 unique_users = 1 if self._ensure_group_user_tracker_table(): upsert_unique_user_sql = """ INSERT INTO t_group_command_user_stats (group_id, plugin_name, command, user_id, first_used_at, last_used_at) VALUES (%s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE last_used_at = VALUES(last_used_at) """ self.execute_update( upsert_unique_user_sql, (group_id, plugin_name, command, user_id, now, now) ) user_query_sql = """ SELECT COUNT(*) as user_count FROM t_group_command_user_stats WHERE group_id = %s AND plugin_name = %s AND command = %s """ user_query_params = (group_id, plugin_name, command) user_result = self.execute_query(user_query_sql, user_query_params, fetch_one=True) unique_users = user_result['user_count'] if user_result and user_result.get('user_count') is not None else 1 elif result: unique_users = max(result.get('unique_users') or 0, 1) if result: # 更新现有记录 update_sql = """ UPDATE t_group_stats SET total_calls = total_calls + 1, success_calls = success_calls + %s, failed_calls = failed_calls + %s, unique_users = %s, last_used_at = %s WHERE id = %s """ update_params = (1 if success else 0, 0 if success else 1, unique_users, now, result['id']) return self.execute_update(update_sql, update_params) else: # 插入新记录 insert_sql = """ INSERT INTO t_group_stats (group_id, plugin_name, command, total_calls, success_calls, failed_calls, unique_users, first_used_at, last_used_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_params = ( group_id, plugin_name, command, 1, 1 if success else 0, 0 if success else 1, unique_users, now, now ) return self.execute_update(insert_sql, insert_params) def get_plugin_stats(self, days: int = 7) -> List[Dict]: """获取插件使用统计 Args: days: 统计天数 Returns: 插件统计列表 """ sql = """ SELECT plugin_name, command, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls, AVG(avg_process_time) as avg_process_time FROM t_plugin_stats WHERE stat_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY plugin_name, command ORDER BY total_calls DESC """ return self.execute_query(sql, (days,)) or [] def get_user_stats(self, days: int = 7, limit: int = 20) -> List[Dict]: """获取用户使用统计 Args: days: 统计天数 limit: 返回记录数量限制 Returns: 用户统计列表 """ sql = """ SELECT user_id, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls, COUNT(DISTINCT plugin_name) as used_plugins FROM t_user_stats WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY user_id ORDER BY total_calls DESC LIMIT %s """ return self.execute_query(sql, (days, limit)) or [] def get_group_stats(self, days: int = 7, limit: int = 20) -> List[Dict]: """获取群组使用统计 Args: days: 统计天数 limit: 返回记录数量限制 Returns: 群组统计列表 """ sql = """ SELECT group_id, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls, COUNT(DISTINCT plugin_name) as used_plugins, MAX(unique_users) as unique_users FROM t_group_stats WHERE last_used_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY group_id ORDER BY total_calls DESC LIMIT %s """ return self.execute_query(sql, (days, limit)) or [] def get_error_logs(self, days: int = 7, page: int = 1, limit: int = 20) -> Tuple[List[Dict], int]: """获取错误日志 Args: days: 统计天数 page: 页码 limit: 每页数量 Returns: (日志列表, 总数) """ # 1. 获取总数 count_sql = """ SELECT COUNT(*) as total FROM t_error_logs WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) """ count_result = self.execute_query(count_sql, (days,), fetch_one=True) total = count_result['total'] if count_result else 0 # 2. 获取分页数据 offset = (page - 1) * limit sql = """ SELECT id, plugin_name, command, user_id, group_id, error_message, stack_trace, created_at FROM t_error_logs WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) ORDER BY created_at DESC LIMIT %s OFFSET %s """ logs = self.execute_query(sql, (days, limit, offset)) or [] return logs, total def get_error_detail(self, error_id: int) -> Optional[Dict]: """获取错误详情 Args: error_id: 错误ID Returns: 错误详情 """ sql = """ SELECT id, plugin_name, command, user_id, group_id, error_message, stack_trace, created_at FROM t_error_logs WHERE id = %s """ return self.execute_query(sql, (error_id,), fetch_one=True) def get_dashboard_summary(self, days: int = 7) -> Dict: """获取仪表盘摘要数据 Args: days: 统计天数 Returns: 仪表盘摘要数据 """ # 获取时间范围 end_date = datetime.now() start_date = end_date - timedelta(days=days) start_date_str = start_date.strftime("%Y-%m-%d") # 1. 总调用次数 total_calls_sql = """ SELECT SUM(total_calls) as total_calls FROM t_plugin_stats WHERE stat_date >= %s """ total_calls_result = self.execute_query(total_calls_sql, (start_date_str,), fetch_one=True) total_calls = total_calls_result['total_calls'] if total_calls_result and total_calls_result['total_calls'] else 0 # 2. 成功率 success_rate_sql = """ SELECT SUM(success_calls) as success_calls, SUM(total_calls) as total_calls FROM t_plugin_stats WHERE stat_date >= %s """ success_rate_result = self.execute_query(success_rate_sql, (start_date_str,), fetch_one=True) success_rate = 0.0 if success_rate_result and success_rate_result['total_calls']: success_rate = float((success_rate_result['success_calls'] / success_rate_result['total_calls']) * 100) # 3. 活跃用户数 active_users_sql = """ SELECT COUNT(DISTINCT user_id) as active_users FROM t_user_stats WHERE last_used_at >= %s """ active_users_result = self.execute_query(active_users_sql, (start_date_str,), fetch_one=True) active_users = active_users_result['active_users'] if active_users_result else 0 # 3.1 新增用户(统计窗口内首次触发) new_users_sql = """ SELECT COUNT(DISTINCT user_id) as new_users FROM t_user_stats WHERE first_used_at >= %s """ new_users_result = self.execute_query(new_users_sql, (start_date_str,), fetch_one=True) new_users = new_users_result['new_users'] if new_users_result else 0 # 4. 活跃群组数 active_groups_sql = """ SELECT COUNT(DISTINCT group_id) as active_groups FROM t_group_stats WHERE last_used_at >= %s """ active_groups_result = self.execute_query(active_groups_sql, (start_date_str,), fetch_one=True) active_groups = active_groups_result['active_groups'] if active_groups_result else 0 # 5. 错误数量 error_count_sql = """ SELECT COUNT(*) as error_count FROM t_error_logs WHERE created_at >= %s """ error_count_result = self.execute_query(error_count_sql, (start_date_str,), fetch_one=True) error_count = error_count_result['error_count'] if error_count_result else 0 # 6. 平均响应时间 avg_response_time_sql = """ SELECT AVG(avg_process_time) as avg_response_time FROM t_plugin_stats WHERE stat_date >= %s """ avg_response_time_result = self.execute_query(avg_response_time_sql, (start_date_str,), fetch_one=True) avg_response_time = float( avg_response_time_result['avg_response_time'] ) if avg_response_time_result and avg_response_time_result['avg_response_time'] else 0.0 # 6.1 群渗透率(活跃群内,触发人数 / 群成员数 的均值) group_penetration_sql = """ SELECT AVG(group_penetration) AS avg_group_penetration FROM ( SELECT gs.group_id, CASE WHEN gm.member_count > 0 THEN (LEAST(gs.unique_users, gm.member_count) / gm.member_count) * 100 ELSE NULL END AS group_penetration FROM ( SELECT group_id, MAX(unique_users) AS unique_users FROM t_group_stats WHERE last_used_at >= %s GROUP BY group_id ) gs LEFT JOIN ( SELECT chatroom_id AS group_id, COUNT(*) AS member_count FROM t_chatroom_member WHERE status = 1 OR status IS NULL GROUP BY chatroom_id ) gm ON gs.group_id = gm.group_id ) t WHERE group_penetration IS NOT NULL """ group_penetration_result = self.execute_query(group_penetration_sql, (start_date_str,), fetch_one=True) avg_group_penetration = 0 if group_penetration_result and group_penetration_result.get('avg_group_penetration') is not None: avg_group_penetration = float(group_penetration_result['avg_group_penetration']) # 6.2 群健康分(成功率 + 响应速度融合评分) # 响应时间评分: 500ms 及以内为满分, 5000ms 及以上趋近0分 latency_score = 0 if avg_response_time <= 500: latency_score = 100 elif avg_response_time >= 5000: latency_score = 0 else: latency_score = max(0, min(100, ((5000 - avg_response_time) / 4500) * 100)) group_health_score = (success_rate * 0.7) + (latency_score * 0.3) # 7. 最常用的插件(优先统计窗口,无数据时回退全量) top_plugins_sql = """ SELECT plugin_name, SUM(total_calls) as total_calls FROM t_plugin_stats WHERE stat_date >= %s GROUP BY plugin_name ORDER BY total_calls DESC LIMIT 5 """ top_plugins = self.execute_query(top_plugins_sql, (start_date_str,)) or [] if not top_plugins: top_plugins_all_sql = """ SELECT plugin_name, SUM(total_calls) as total_calls FROM t_plugin_stats GROUP BY plugin_name ORDER BY total_calls DESC LIMIT 5 """ top_plugins = self.execute_query(top_plugins_all_sql) or [] # 8. 最活跃的用户(优先统计窗口,无数据时回退全量) top_users_sql = """ SELECT user_id, SUM(total_calls) as total_calls FROM t_user_stats WHERE last_used_at >= %s GROUP BY user_id ORDER BY total_calls DESC LIMIT 5 """ top_users = self.execute_query(top_users_sql, (start_date_str,)) or [] if not top_users: top_users_all_sql = """ SELECT user_id, SUM(total_calls) as total_calls FROM t_user_stats GROUP BY user_id ORDER BY total_calls DESC LIMIT 5 """ top_users = self.execute_query(top_users_all_sql) or [] # 9. 最活跃的群组(优先统计窗口,无数据时回退全量) top_groups_sql = """ SELECT group_id, SUM(total_calls) as total_calls FROM t_group_stats WHERE last_used_at >= %s GROUP BY group_id ORDER BY total_calls DESC LIMIT 5 """ top_groups = self.execute_query(top_groups_sql, (start_date_str,)) or [] if not top_groups: top_groups_all_sql = """ SELECT group_id, SUM(total_calls) as total_calls FROM t_group_stats GROUP BY group_id ORDER BY total_calls DESC LIMIT 5 """ top_groups = self.execute_query(top_groups_all_sql) or [] # 返回汇总数据 return { "total_calls": total_calls, "success_rate": success_rate, "active_users": active_users, "new_users": new_users, "active_groups": active_groups, "error_count": error_count, "avg_response_time": avg_response_time, "avg_group_penetration": round(avg_group_penetration, 2), "group_health_score": round(group_health_score, 2), "top_plugins": top_plugins, "top_users": top_users, "top_groups": top_groups } def get_plugin_trend(self, plugin_name: str = "", days: int = 7) -> List[Dict]: """获取插件使用趋势数据 Args: plugin_name: 插件名称,为空则获取所有插件 days: 统计天数 Returns: 插件使用趋势数据 """ # 获取时间范围内的每一天 end_date = datetime.now().date() start_date = end_date - timedelta(days=days-1) # 包含今天,所以减1 if plugin_name: # 获取特定插件的趋势 sql = """ SELECT stat_date, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls FROM t_plugin_stats WHERE plugin_name = %s AND stat_date >= %s GROUP BY stat_date ORDER BY stat_date """ params = (plugin_name, start_date) else: # 获取所有插件的趋势 sql = """ SELECT stat_date, SUM(total_calls) as total_calls, SUM(success_calls) as success_calls, SUM(failed_calls) as failed_calls FROM t_plugin_stats WHERE stat_date >= %s GROUP BY stat_date ORDER BY stat_date """ params = (start_date,) results = self.execute_query(sql, params) or [] # 将结果转换为按日期的字典 trend_by_date = {r['stat_date'].strftime('%Y-%m-%d'): r for r in results} # 确保每一天都有数据 trend_data = [] current_date = start_date while current_date <= end_date: date_str = current_date.strftime('%Y-%m-%d') if date_str in trend_by_date: data = trend_by_date[date_str] trend_data.append({ 'date': date_str, 'total_calls': data['total_calls'], 'success_calls': data['success_calls'], 'failed_calls': data['failed_calls'], 'success_rate': (data['success_calls'] / data['total_calls'] * 100) if data['total_calls'] > 0 else 0 }) else: trend_data.append({ 'date': date_str, 'total_calls': 0, 'success_calls': 0, 'failed_calls': 0, 'success_rate': 0 }) current_date += timedelta(days=1) return trend_data def get_group_plugin_stats(self, group_id: str, days: int = 30, limit: int = 10) -> List[Dict]: """获取指定群的插件调用统计""" sql = """ SELECT plugin_name, command, SUM(total_calls) AS total_calls, SUM(success_calls) AS success_calls, SUM(failed_calls) AS failed_calls, MAX(unique_users) AS unique_users, MIN(first_used_at) AS first_used_at, MAX(last_used_at) AS last_used_at FROM t_group_stats WHERE group_id = %s AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY plugin_name, command ORDER BY total_calls DESC, last_used_at DESC LIMIT %s """ rows = self.execute_query(sql, (group_id, days, limit)) or [] for row in rows: for key in ("first_used_at", "last_used_at"): dt = row.get(key) if isinstance(dt, datetime): row[key] = dt.strftime("%Y-%m-%d %H:%M:%S") return rows def get_group_plugin_summary(self, group_id: str, days: int = 30) -> Dict: """获取指定群的插件调用摘要""" sql = """ SELECT SUM(total_calls) AS total_calls, SUM(success_calls) AS success_calls, SUM(failed_calls) AS failed_calls, COUNT(DISTINCT plugin_name) AS plugin_count, MAX(last_used_at) AS last_used_at FROM t_group_stats WHERE group_id = %s AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY) """ result = self.execute_query(sql, (group_id, days), fetch_one=True) or {} dt = result.get("last_used_at") if isinstance(dt, datetime): result["last_used_at"] = dt.strftime("%Y-%m-%d %H:%M:%S") return { "total_calls": int(result.get("total_calls") or 0), "success_calls": int(result.get("success_calls") or 0), "failed_calls": int(result.get("failed_calls") or 0), "plugin_count": int(result.get("plugin_count") or 0), "last_used_at": result.get("last_used_at") or "", } def get_user_plugin_stats(self, user_id: str, days: int = 30, limit: int = 10) -> List[Dict]: """获取指定用户的插件调用统计""" sql = """ SELECT plugin_name, command, SUM(total_calls) AS total_calls, SUM(success_calls) AS success_calls, SUM(failed_calls) AS failed_calls, MIN(first_used_at) AS first_used_at, MAX(last_used_at) AS last_used_at FROM t_user_stats WHERE user_id = %s AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY plugin_name, command ORDER BY total_calls DESC, last_used_at DESC LIMIT %s """ rows = self.execute_query(sql, (user_id, days, limit)) or [] for row in rows: for key in ("first_used_at", "last_used_at"): dt = row.get(key) if isinstance(dt, datetime): row[key] = dt.strftime("%Y-%m-%d %H:%M:%S") return rows def get_user_plugin_summary(self, user_id: str, days: int = 30) -> Dict: """获取指定用户的插件调用摘要""" sql = """ SELECT SUM(total_calls) AS total_calls, SUM(success_calls) AS success_calls, SUM(failed_calls) AS failed_calls, COUNT(DISTINCT plugin_name) AS plugin_count, MAX(last_used_at) AS last_used_at FROM t_user_stats WHERE user_id = %s AND last_used_at >= DATE_SUB(NOW(), INTERVAL %s DAY) """ result = self.execute_query(sql, (user_id, days), fetch_one=True) or {} dt = result.get("last_used_at") if isinstance(dt, datetime): result["last_used_at"] = dt.strftime("%Y-%m-%d %H:%M:%S") return { "total_calls": int(result.get("total_calls") or 0), "success_calls": int(result.get("success_calls") or 0), "failed_calls": int(result.get("failed_calls") or 0), "plugin_count": int(result.get("plugin_count") or 0), "last_used_at": result.get("last_used_at") or "", }