Files
abot/plugins/stats_dashboard/dashboard_server.py

200 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from typing import Dict, Any, Optional
import threading
import time
import os
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
from db.connection import DBConnectionManager
from db.stats_db import StatsDBOperator
from contact_manager import ContactManager # 导入联系人管理器
class DashboardServer:
"""统计看板服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8888,
username: str = "admin", password: str = "admin123"):
self.host = host
self.port = port
self.username = username
self.password = password
self.logger = logging.getLogger("DashboardServer")
# 修正:使用单例模式获取数据库连接
self.db_manager = DBConnectionManager.get_instance()
self.stats_db = StatsDBOperator(self.db_manager)
# 获取联系人管理器实例
self.contact_manager = ContactManager.get_instance()
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 添加:存储服务器实例
def _create_app(self) -> Flask:
"""创建Flask应用"""
app = Flask(__name__)
app.secret_key = "stats_dashboard_secret_key"
# 添加:实现基本的身份验证
def check_auth():
auth = request.authorization
if not auth or auth.username != self.username or auth.password != self.password:
return False
return True
# 静态文件目录
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/plugins')
def plugins():
return render_template('plugins.html')
@app.route('/users')
def users_page():
return render_template('users.html')
@app.route('/groups')
def groups():
return render_template('groups.html')
@app.route('/errors')
def errors():
return render_template('errors.html')
@app.route('/api/plugin_stats')
def api_plugin_stats():
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
@app.route('/api/user_stats')
def api_user_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/group_stats')
def api_group_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/error_logs')
def api_error_logs():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 50, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
@app.route('/api/error_detail/<int:error_id>')
def api_error_detail(error_id):
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
# 修改添加错误处理的API路由示例
@app.route('/api/dashboard_summary')
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
self.logger.info(f"看板主页统计数据: {summary}")
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
def api_plugin_trend():
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
# 如果趋势数据中包含用户或群组ID也进行转换
if isinstance(trend, list):
for item in trend:
if 'user_id' in item:
item['user_name'] = self.contact_manager.get_nickname(item['user_id'])
if 'group_id' in item:
item['group_name'] = self.contact_manager.get_nickname(item['group_id'])
self.logger.info(f"看板主页/api/plugin_trend: {trend}")
return jsonify({"success": True, "data": trend})
return app
def run(self) -> None:
"""运行服务器"""
try:
self.logger.info(f"启动统计看板服务器,地址: {self.host}:{self.port}")
# 修改:使用线程安全的方式运行服务器
from werkzeug.serving import make_server
self._server = make_server(self.host, self.port, self.app)
self._server.serve_forever()
except Exception as e:
self.logger.error(f"运行统计看板服务器出错: {e}")
def stop(self) -> None:
"""停止服务器"""
try:
self.logger.info("正在停止统计看板服务器...")
self._stop_event.set()
# 关闭服务器
if self._server:
self._server.shutdown()
self._server = None
# 等待所有线程完成
time.sleep(0.5) # 给线程一些时间来完成
self.logger.info("统计看板服务器已完全停止")
except Exception as e:
self.logger.error(f"停止统计看板服务器出错: {e}")
raise
def __del__(self):
"""析构函数,确保资源被释放"""
try:
if hasattr(self, '_server') and self._server:
self.stop()
except Exception as e:
if hasattr(self, 'logger'):
self.logger.error(f"DashboardServer 析构时出错: {e}")