import logging import os import sys import threading from datetime import datetime import time from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator from utils.wechat.contact_manager import ContactManager # 添加项目根目录到系统路径,确保可以导入项目模块 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus # 导入toml用于配置文件 import toml class DashboardServer: """统计看板服务器""" def __init__(self, host: str = None, port: int = None, username: str = None, password: str = None, robot_instance=None): # 加载配置文件 self.config = self._load_dashboard_config() # 优先使用传入的参数,其次使用配置文件中的参数 self.host = host or self.config.get("server", {}).get("host", "0.0.0.0") self.port = port or self.config.get("server", {}).get("port", 8888) self.username = username or self.config.get("auth", {}).get("username", "admin") self.password = password or self.config.get("auth", {}).get("password", "admin123") self.logger = logging.getLogger("DashboardServer") self.logger.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}") # 如果提供了robot实例,则使用其对象 if robot_instance: self.db_manager = robot_instance.db_manager self.stats_db = StatsDBOperator(self.db_manager) self.message_storage = MessageStorageDB(self.db_manager) # 获取联系人管理器实例 self.contact_manager = robot_instance.contact_manager self.logger.info("使用Robot实例的对象进行初始化") else: self.logger.error("未提供Robot实例,Dashboard无法正常工作") raise ValueError("必须提供Robot实例") self.app = self._create_app() self._stop_event = threading.Event() self._server = None # 存储服务器实例 def _load_dashboard_config(self): """加载Dashboard配置文件""" try: config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml') if os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: return toml.load(f) else: # 如果配置文件不存在,创建默认配置 default_config = { "server": {"host": "0.0.0.0", "port": 8888}, "auth": {"username": "admin", "password": "admin123"} } with open(config_path, 'w', encoding='utf-8') as f: toml.dump(default_config, f) return default_config except Exception as e: self.logger.error(f"加载Dashboard配置文件失败: {e}") # 返回默认配置 return { "server": {"host": "0.0.0.0", "port": 8888}, "auth": {"username": "admin", "password": "admin123"} } def _create_app(self) -> Flask: """创建Flask应用""" app = Flask(__name__) app.secret_key = "stats_dashboard_secret_key" # 静态文件目录 static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') @app.route('/static/') def serve_static(filename): return send_from_directory(static_folder, filename) # 添加一个路由处理favicon请求 @app.route('/favicon.ico') def favicon(): return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico', mimetype='image/vnd.microsoft.icon') # 登录页面 @app.route('/login', methods=['GET', 'POST']) def login(): error = None if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username == self.username and password == self.password: session['logged_in'] = True return redirect(url_for('index')) else: error = '用户名或密码错误' return render_template('login.html', error=error) # 登出 @app.route('/logout') def logout(): session.pop('logged_in', None) return redirect(url_for('login')) # 登录检查装饰器 def login_required(f): def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login')) return f(*args, **kwargs) decorated_function.__name__ = f.__name__ return decorated_function @app.route('/') @login_required def index(): return render_template('index.html') @app.route('/plugins') @login_required def plugins(): return render_template('plugins.html') @app.route('/users') @login_required def users_page(): return render_template('users.html') @app.route('/groups') @login_required def groups(): return render_template('groups.html') @app.route('/errors') @login_required def errors(): return render_template('errors.html') # 在_create_app方法中添加新的路由 @app.route('/robot_management') @login_required def robot_management(): return render_template('robot_management.html') @app.route('/api/robot/groups') @login_required def api_robot_groups(): try: # 获取所有群组列表 groups = GroupBotManager.get_group_list() # 如果方法返回None或发生异常,使用本地缓存 if groups is None and hasattr(GroupBotManager, "local_cache"): groups = GroupBotManager.local_cache.get("group_list", set()) # 如果仍然为None,则初始化为空集合 if groups is None: groups = set() self.logger.info(f"获取到 {len(groups)} 个群组") group_data = [] for group_id in groups: try: # 获取群名称,如果失败则使用默认值 group_name = self.contact_manager.get_nickname(group_id) if not group_name: group_name = f"未知群组({group_id})" # 获取机器人状态,如果失败则使用默认值 try: robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT) except: robot_status = PermissionStatus.UNKNOWN group_data.append({ "group_id": group_id, "group_name": group_name, "robot_status": robot_status.value if robot_status else "unknown" }) except Exception as e: self.logger.warning(f"处理群组 {group_id} 信息时出错: {e}") # 添加基本信息,避免单个群组错误影响整个列表 group_data.append({ "group_id": group_id, "group_name": "获取失败", "robot_status": "unknown" }) return jsonify({"success": True, "data": group_data}) except Exception as e: self.logger.error(f"获取群组列表失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/robot/group//permissions') @login_required def api_robot_group_permissions(group_id): try: permissions = GroupBotManager.list_group_permissions(group_id) permission_data = [] for feature, status in permissions.items(): permission_data.append({ "feature_id": feature.value, "feature_name": feature.name, "feature_description": feature.description, "status": status.value }) return jsonify({"success": True, "data": permission_data}) except Exception as e: self.logger.error(f"获取群组权限失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/robot/group//permissions', methods=['POST']) @login_required def api_update_robot_permissions(group_id): # 更新群组功能权限 data = request.json feature_id = data.get('feature_id') status = data.get('status') try: feature = Feature(int(feature_id)) new_status = PermissionStatus(status) # 特殊处理ROBOT功能 if feature == Feature.ROBOT: r = self.db_manager.get_redis_connection() if new_status == PermissionStatus.ENABLED: GroupBotManager.local_cache["group_list"].add(group_id) r.sadd("group:list", group_id) else: GroupBotManager.local_cache["group_list"].remove(group_id) r.srem("group:list", group_id) GroupBotManager.set_group_permission(group_id, feature, new_status) return jsonify({"success": True}) except Exception as e: self.logger.error(f"更新群组权限失败: {e}") return jsonify({"success": False, "error": str(e)}), 400 @app.route('/api/robot/batch_operation', methods=['POST']) @login_required def api_robot_batch_operation(): # 批量操作接口 data = request.json operation = data.get('operation') group_ids = data.get('group_ids', []) results = {} try: if operation == 'remove_groups': for group_id in group_ids: result = GroupBotManager.remove_group(group_id) results[group_id] = result return jsonify({"success": True, "results": results}) else: return jsonify({"success": False, "error": "不支持的操作类型"}), 400 except Exception as e: self.logger.error(f"批量操作失败: {e}") return jsonify({"success": False, "error": str(e)}), 400 @app.route('/api/user_stats') @login_required def api_user_stats(): try: days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 10, type=int) stats = self.stats_db.get_user_stats(days, limit) # 将用户ID转换为名称 for item in stats: if 'user_id' in item: user_id = item['user_id'] item['user_name'] = self.contact_manager.get_nickname(user_id) return jsonify({"success": True, "data": stats}) except Exception as e: self.logger.error(f"获取用户统计失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/group_stats') @login_required def api_group_stats(): try: days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 10, type=int) stats = self.stats_db.get_group_stats(days, limit) # 将群ID转换为名称 for item in stats: if 'group_id' in item: group_id = item['group_id'] item['group_name'] = self.contact_manager.get_nickname(group_id) return jsonify({"success": True, "data": stats}) except Exception as e: self.logger.error(f"获取群组统计失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/plugin_stats') @login_required def api_plugin_stats(): try: days = request.args.get('days', 7, type=int) stats = self.stats_db.get_plugin_stats(days) return jsonify({"success": True, "data": stats}) except Exception as e: self.logger.error(f"获取插件统计失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/error_logs') @login_required def api_error_logs(): try: days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 100, type=int) logs = self.stats_db.get_error_logs(days, limit) return jsonify({"success": True, "data": logs}) except Exception as e: self.logger.error(f"获取错误日志失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/system_info') @login_required def api_system_info(): try: # 获取系统信息 import platform import psutil system_info = { "os": platform.system(), "os_version": platform.version(), "python_version": platform.python_version(), "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "uptime": time.time() - psutil.boot_time(), "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } return jsonify({"success": True, "data": system_info}) except Exception as e: self.logger.error(f"获取系统信息失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/dashboard_summary') @login_required def api_dashboard_summary(): try: days = request.args.get('days', 7, type=int) summary = self.stats_db.get_dashboard_summary(days) # 转换用户和群组ID为名称 if 'top_users' in summary: for user in summary['top_users']: if 'user_id' in user: user['user_name'] = self.contact_manager.get_nickname(user['user_id']) if 'top_groups' in summary: for group in summary['top_groups']: if 'group_id' in group: group['group_name'] = self.contact_manager.get_nickname(group['group_id']) return jsonify({"success": True, "data": summary}) except Exception as e: self.logger.error(f"获取仪表盘摘要数据出错: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/plugin_trend') @login_required def api_plugin_trend(): try: days = request.args.get('days', 7, type=int) plugin_name = request.args.get('plugin_name', '') trend = self.stats_db.get_plugin_trend(plugin_name, days) return jsonify({"success": True, "data": trend}) except Exception as e: self.logger.error(f"获取插件趋势失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/robot/group//message_trend') @login_required def api_group_message_trend(group_id): try: days = request.args.get('days', 7, type=int) trend_data = self.message_storage.get_message_trend(group_id, days) # 格式化数据为前端需要的格式 dates = [] counts = [] for item in trend_data: # 将日期转换为字符串 if isinstance(item['date'], datetime): date_str = item['date'].strftime('%Y-%m-%d') else: date_str = str(item['date']) dates.append(date_str) counts.append(item['message_count']) return jsonify({ 'success': True, 'data': { 'dates': dates, 'counts': counts } }) except Exception as e: self.logger.error(f"获取群组消息趋势数据出错: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/error_detail/') @login_required def api_error_detail(error_id): try: detail = self.stats_db.get_error_detail(error_id) return jsonify({"success": True, "data": detail}) except Exception as e: self.logger.error(f"获取错误详情失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/robot/add_group', methods=['POST']) @login_required def api_add_group(): try: data = request.json group_id = data.get('group_id') if not group_id or not group_id.strip(): return jsonify({"success": False, "error": "群组ID不能为空"}), 400 group_id = group_id.strip() # 检查群组是否已存在 if group_id in GroupBotManager.local_cache["group_list"]: return jsonify({"success": False, "error": "该群组已存在"}), 400 # 添加群组到列表并启用机器人功能 GroupBotManager.local_cache["group_list"].add(group_id) r = self.db_manager.get_redis_connection() r.sadd("group:list", group_id) # 设置ROBOT功能为启用状态 GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED) # 获取群组名称(如果可能) group_name = self.contact_manager.get_nickname(group_id) return jsonify({ "success": True, "message": f"群组 {group_id} 已成功添加", "group": { "group_id": group_id, "group_name": group_name, "robot_status": "enabled" } }) except Exception as e: self.logger.error(f"添加群组失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 return app def run(self): """运行服务器""" from werkzeug.serving import make_server self.logger.info(f"启动服务器: {self.host}:{self.port}") try: # 使用线程安全的方式运行服务器 self._server = make_server(self.host, self.port, self.app) self._server.serve_forever() except Exception as e: self.logger.error(f"服务器运行失败: {e}") self._stop_event.set() def stop(self): """停止服务器""" self.logger.info("正在停止服务器...") self._stop_event.set() # 使用werkzeug服务器的关闭方法 if self._server: self._server.shutdown() self.logger.info("服务器已停止")