Files
abot/admin/dashboard/server.py

392 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import sys
import threading
import time
from datetime import datetime
# 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
from werkzeug.security import check_password_hash, generate_password_hash
from db.connection import DBConnectionManager
from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator
from utils.wechat.contact_manager import ContactManager
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
class DashboardServer:
"""统计看板服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8888,
username: str = "admin", password: str = "admin123"):
self.host = host
self.port = port
self.username = username
self.password = password
self.logger = logging.getLogger("DashboardServer")
# 使用单例模式获取数据库连接
self.db_manager = DBConnectionManager.get_instance()
self.stats_db = StatsDBOperator(self.db_manager)
self.message_storage = MessageStorageDB(self.db_manager)
# 获取联系人管理器实例
self.contact_manager = ContactManager.get_instance()
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 存储服务器实例
def _create_app(self) -> Flask:
"""创建Flask应用"""
app = Flask(__name__)
app.secret_key = "stats_dashboard_secret_key"
# 实现基本的身份验证
def check_auth():
auth = request.authorization
if not auth or auth.username != self.username or auth.password != self.password:
return False
return True
# 静态文件目录
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
# 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
# 登录页面
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == self.username and password == self.password:
session['logged_in'] = True
return redirect(url_for('index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 登出
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
# 登录检查装饰器
def login_required(f):
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/plugins')
@login_required
def plugins():
return render_template('plugins.html')
@app.route('/users')
@login_required
def users_page():
return render_template('users.html')
@app.route('/groups')
@login_required
def groups():
return render_template('groups.html')
@app.route('/errors')
@login_required
def errors():
return render_template('errors.html')
# 在_create_app方法中添加新的路由
@app.route('/robot_management')
@login_required
def robot_management():
return render_template('robot_management.html')
@app.route('/api/robot/groups')
@login_required
def api_robot_groups():
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
group_data = []
for group_id in groups:
group_name = self.contact_manager.get_nickname(group_id)
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value
})
return jsonify({"success": True, "data": group_data})
@app.route('/api/robot/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = self.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
self.logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/robot/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
self.logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/user_stats')
@login_required
def api_user_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/group_stats')
@login_required
def api_group_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/plugin_stats')
@login_required
def api_plugin_stats():
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
@app.route('/api/error_logs')
@login_required
def api_error_logs():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 100, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
@app.route('/api/system_info')
@login_required
def api_system_info():
# 获取系统信息
import platform
import psutil
system_info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"uptime": time.time() - psutil.boot_time(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return jsonify({"success": True, "data": system_info})
return app
def run(self):
"""运行服务器"""
from waitress import serve
self.logger.info(f"启动服务器: {self.host}:{self.port}")
try:
serve(self.app, host=self.host, port=self.port)
except Exception as e:
self.logger.error(f"服务器运行失败: {e}")
self._stop_event.set()
def stop(self):
"""停止服务器"""
self.logger.info("正在停止服务器...")
self._stop_event.set()
# 如果使用了waitress需要额外的停止逻辑
# 这里可能需要根据实际情况调整
if self._server:
# 某些服务器可能有shutdown方法
if hasattr(self._server, 'shutdown'):
self._server.shutdown()
# 或者需要关闭socket
elif hasattr(self._server, 'socket'):
self._server.socket.close()
self.logger.info("服务器已停止")
# 在 _create_app 方法中添加以下路由
@app.route('/api/dashboard_summary')
@login_required
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
self.logger.info(f"看板主页统计数据: {summary}")
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
@login_required
def api_plugin_trend():
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
# 如果趋势数据中包含用户或群组ID也进行转换
if isinstance(trend, list):
for item in trend:
if 'user_id' in item:
item['user_name'] = self.contact_manager.get_nickname(item['user_id'])
if 'group_id' in item:
item['group_name'] = self.contact_manager.get_nickname(item['group_id'])
self.logger.info(f"看板主页/api/plugin_trend: {trend}")
return jsonify({"success": True, "data": trend})
@app.route('/api/robot/group/<group_id>/message_trend', methods=['GET'])
@login_required
def get_group_message_trend(group_id):
"""获取群组消息趋势数据"""
try:
days = request.args.get('days', default=7, type=int)
# 获取消息存储实例
trend_data = self.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/api/error_detail/<int:error_id>')
@login_required
def api_error_detail(error_id):
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})