import logging from typing import Dict, Any, Optional import threading import time import os from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory from db.connection import DBConnectionManager from db.stats_db import StatsDBOperator from utils.wechat.contact_manager import ContactManager class DashboardServer: """统计看板服务器""" def __init__(self, host: str = "0.0.0.0", port: int = 8888, username: str = "admin", password: str = "admin123"): self.host = host self.port = port self.username = username self.password = password self.logger = logging.getLogger("DashboardServer") # 修正:使用单例模式获取数据库连接 self.db_manager = DBConnectionManager.get_instance() self.stats_db = StatsDBOperator(self.db_manager) # 获取联系人管理器实例 self.contact_manager = ContactManager.get_instance() self.app = self._create_app() self._stop_event = threading.Event() self._server = None # 添加:存储服务器实例 def _create_app(self) -> Flask: """创建Flask应用""" app = Flask(__name__) app.secret_key = "stats_dashboard_secret_key" # 添加:实现基本的身份验证 def check_auth(): auth = request.authorization if not auth or auth.username != self.username or auth.password != self.password: return False return True # 静态文件目录 static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') @app.route('/static/') def serve_static(filename): return send_from_directory(static_folder, filename) @app.route('/') def index(): return render_template('index.html') @app.route('/plugins') def plugins(): return render_template('plugins.html') @app.route('/users') def users_page(): return render_template('users.html') @app.route('/groups') def groups(): return render_template('groups.html') @app.route('/errors') def errors(): return render_template('errors.html') @app.route('/api/plugin_stats') def api_plugin_stats(): days = request.args.get('days', 7, type=int) stats = self.stats_db.get_plugin_stats(days) return jsonify({"success": True, "data": stats}) @app.route('/api/user_stats') def api_user_stats(): days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 10, type=int) stats = self.stats_db.get_user_stats(days, limit) # 将用户ID转换为名称 for item in stats: if 'user_id' in item: user_id = item['user_id'] item['user_name'] = self.contact_manager.get_nickname(user_id) return jsonify({"success": True, "data": stats}) @app.route('/api/group_stats') def api_group_stats(): days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 10, type=int) stats = self.stats_db.get_group_stats(days, limit) # 将群ID转换为名称 for item in stats: if 'group_id' in item: group_id = item['group_id'] item['group_name'] = self.contact_manager.get_nickname(group_id) return jsonify({"success": True, "data": stats}) @app.route('/api/error_logs') def api_error_logs(): days = request.args.get('days', 7, type=int) limit = request.args.get('limit', 50, type=int) logs = self.stats_db.get_error_logs(days, limit) return jsonify({"success": True, "data": logs}) @app.route('/api/error_detail/') def api_error_detail(error_id): detail = self.stats_db.get_error_detail(error_id) return jsonify({"success": True, "data": detail}) # 修改:添加错误处理的API路由示例 @app.route('/api/dashboard_summary') def api_dashboard_summary(): try: days = request.args.get('days', 7, type=int) summary = self.stats_db.get_dashboard_summary(days) # 转换用户和群组ID为名称 if 'top_users' in summary: for user in summary['top_users']: if 'user_id' in user: user['user_name'] = self.contact_manager.get_nickname(user['user_id']) if 'top_groups' in summary: for group in summary['top_groups']: if 'group_id' in group: group['group_name'] = self.contact_manager.get_nickname(group['group_id']) self.logger.info(f"看板主页统计数据: {summary}") return jsonify({"success": True, "data": summary}) except Exception as e: self.logger.error(f"获取仪表盘摘要数据出错: {e}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/plugin_trend') def api_plugin_trend(): days = request.args.get('days', 7, type=int) plugin_name = request.args.get('plugin_name', '') trend = self.stats_db.get_plugin_trend(plugin_name, days) # 如果趋势数据中包含用户或群组ID,也进行转换 if isinstance(trend, list): for item in trend: if 'user_id' in item: item['user_name'] = self.contact_manager.get_nickname(item['user_id']) if 'group_id' in item: item['group_name'] = self.contact_manager.get_nickname(item['group_id']) self.logger.info(f"看板主页/api/plugin_trend: {trend}") return jsonify({"success": True, "data": trend}) return app def run(self) -> None: """运行服务器""" try: self.logger.info(f"启动统计看板服务器,地址: {self.host}:{self.port}") # 修改:使用线程安全的方式运行服务器 from werkzeug.serving import make_server self._server = make_server(self.host, self.port, self.app) self._server.serve_forever() except Exception as e: self.logger.error(f"运行统计看板服务器出错: {e}") def stop(self) -> None: """停止服务器""" try: self.logger.info("正在停止统计看板服务器...") self._stop_event.set() # 关闭服务器 if self._server: self._server.shutdown() self._server = None # 等待所有线程完成 time.sleep(0.5) # 给线程一些时间来完成 self.logger.info("统计看板服务器已完全停止") except Exception as e: self.logger.error(f"停止统计看板服务器出错: {e}") raise def __del__(self): """析构函数,确保资源被释放""" try: if hasattr(self, '_server') and self._server: self.stop() except Exception as e: if hasattr(self, 'logger'): self.logger.error(f"DashboardServer 析构时出错: {e}")