Files
abot/plugins/stats_dashboard/dashboard_server.py
2025-03-27 10:15:13 +08:00

372 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import threading
import time
import os
from datetime import datetime
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
from db.connection import DBConnectionManager
from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator
from utils.wechat.contact_manager import ContactManager
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
class DashboardServer:
"""统计看板服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8888,
username: str = "admin", password: str = "admin123"):
self.host = host
self.port = port
self.username = username
self.password = password
self.logger = logging.getLogger("DashboardServer")
# 修正:使用单例模式获取数据库连接
self.db_manager = DBConnectionManager.get_instance()
self.stats_db = StatsDBOperator(self.db_manager)
self.message_storage = MessageStorageDB(self.db_manager)
# 获取联系人管理器实例
self.contact_manager = ContactManager.get_instance()
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 添加:存储服务器实例
def _create_app(self) -> Flask:
"""创建Flask应用"""
app = Flask(__name__)
app.secret_key = "stats_dashboard_secret_key"
# 添加:实现基本的身份验证
def check_auth():
auth = request.authorization
if not auth or auth.username != self.username or auth.password != self.password:
return False
return True
# 静态文件目录
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
# 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/plugins')
def plugins():
return render_template('plugins.html')
@app.route('/users')
def users_page():
return render_template('users.html')
@app.route('/groups')
def groups():
return render_template('groups.html')
@app.route('/errors')
def errors():
return render_template('errors.html')
# 在_create_app方法中添加新的路由
@app.route('/robot_management')
def robot_management():
return render_template('robot_management.html')
@app.route('/api/robot/groups')
def api_robot_groups():
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
group_data = []
for group_id in groups:
group_name = self.contact_manager.get_nickname(group_id)
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value
})
return jsonify({"success": True, "data": group_data})
@app.route('/api/robot/group/<group_id>/permissions')
def api_robot_group_permissions(group_id):
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
def api_update_robot_permissions(group_id):
# 更新群组功能权限
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = self.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
self.logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/robot/batch_operation', methods=['POST'])
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
self.logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
# 添加手动添加群组的API接口
@app.route('/api/robot/add_group', methods=['POST'])
def api_add_group():
try:
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = self.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = self.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
self.logger.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_stats')
def api_plugin_stats():
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
@app.route('/api/user_stats')
def api_user_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/group_stats')
def api_group_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/error_logs')
def api_error_logs():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 50, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
@app.route('/api/error_detail/<int:error_id>')
def api_error_detail(error_id):
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
# 修改添加错误处理的API路由示例
@app.route('/api/dashboard_summary')
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
self.logger.info(f"看板主页统计数据: {summary}")
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
def api_plugin_trend():
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
# 如果趋势数据中包含用户或群组ID也进行转换
if isinstance(trend, list):
for item in trend:
if 'user_id' in item:
item['user_name'] = self.contact_manager.get_nickname(item['user_id'])
if 'group_id' in item:
item['group_name'] = self.contact_manager.get_nickname(item['group_id'])
self.logger.info(f"看板主页/api/plugin_trend: {trend}")
return jsonify({"success": True, "data": trend})
@app.route('/api/robot/group/<group_id>/message_trend', methods=['GET'])
def get_group_message_trend(group_id):
"""获取群组消息趋势数据"""
try:
days = request.args.get('days', default=7, type=int)
# 获取消息存储实例
trend_data = self.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
return app
def run(self) -> None:
"""运行服务器"""
try:
self.logger.info(f"启动统计看板服务器,地址: {self.host}:{self.port}")
# 修改:使用线程安全的方式运行服务器
from werkzeug.serving import make_server
self._server = make_server(self.host, self.port, self.app)
self._server.serve_forever()
except Exception as e:
self.logger.error(f"运行统计看板服务器出错: {e}")
def stop(self) -> None:
"""停止服务器"""
try:
self.logger.info("正在停止统计看板服务器...")
self._stop_event.set()
# 关闭服务器
if self._server:
self._server.shutdown()
self._server = None
# 等待所有线程完成
time.sleep(0.5) # 给线程一些时间来完成
self.logger.info("统计看板服务器已完全停止")
except Exception as e:
self.logger.error(f"停止统计看板服务器出错: {e}")
raise
def __del__(self):
"""析构函数,确保资源被释放"""
try:
if hasattr(self, '_server') and self._server:
self.stop()
except Exception as e:
if hasattr(self, 'logger'):
self.logger.error(f"DashboardServer 析构时出错: {e}")