Files
abot/admin/dashboard/server.py

412 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import sys
import threading
import time
from datetime import datetime
# 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
from werkzeug.security import check_password_hash, generate_password_hash
from db.connection import DBConnectionManager
from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator
from utils.wechat.contact_manager import ContactManager
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
class DashboardServer:
"""统计看板服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8888,
username: str = "admin", password: str = "admin123"):
self.host = host
self.port = port
self.username = username
self.password = password
self.logger = logging.getLogger("DashboardServer")
# 使用单例模式获取数据库连接
self.db_manager = DBConnectionManager.get_instance()
self.stats_db = StatsDBOperator(self.db_manager)
self.message_storage = MessageStorageDB(self.db_manager)
# 获取联系人管理器实例
self.contact_manager = ContactManager.get_instance()
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 存储服务器实例
def _create_app(self) -> Flask:
"""创建Flask应用"""
app = Flask(__name__)
app.secret_key = "stats_dashboard_secret_key"
# 实现基本的身份验证
def check_auth():
auth = request.authorization
if not auth or auth.username != self.username or auth.password != self.password:
return False
return True
# 静态文件目录
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
# 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
# 登录页面
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == self.username and password == self.password:
session['logged_in'] = True
return redirect(url_for('index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 登出
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
# 登录检查装饰器
def login_required(f):
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/plugins')
@login_required
def plugins():
return render_template('plugins.html')
@app.route('/users')
@login_required
def users_page():
return render_template('users.html')
@app.route('/groups')
@login_required
def groups():
return render_template('groups.html')
@app.route('/errors')
@login_required
def errors():
return render_template('errors.html')
# 在_create_app方法中添加新的路由
@app.route('/robot_management')
@login_required
def robot_management():
return render_template('robot_management.html')
@app.route('/api/robot/groups')
@login_required
def api_robot_groups():
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
group_data = []
for group_id in groups:
group_name = self.contact_manager.get_nickname(group_id)
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value
})
return jsonify({"success": True, "data": group_data})
@app.route('/api/robot/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = self.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
self.logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/robot/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
self.logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/user_stats')
@login_required
def api_user_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/group_stats')
@login_required
def api_group_stats():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
@app.route('/api/plugin_stats')
@login_required
def api_plugin_stats():
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
@app.route('/api/error_logs')
@login_required
def api_error_logs():
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 100, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
@app.route('/api/system_info')
@login_required
def api_system_info():
# 获取系统信息
import platform
import psutil
system_info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"uptime": time.time() - psutil.boot_time(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return jsonify({"success": True, "data": system_info})
@app.route('/api/dashboard_summary')
@login_required
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
@login_required
def api_plugin_trend():
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
return jsonify({"success": True, "data": trend})
@app.route('/api/robot/group/<group_id>/message_trend')
@login_required
def api_group_message_trend(group_id):
try:
days = request.args.get('days', 7, type=int)
trend_data = self.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
self.logger.error(f"获取群组消息趋势数据出错: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/error_detail/<int:error_id>')
@login_required
def api_error_detail(error_id):
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
@app.route('/api/robot/add_group', methods=['POST'])
@login_required
def api_add_group():
try:
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = self.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = self.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
self.logger.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
return app
def run(self):
"""运行服务器"""
from werkzeug.serving import make_server
self.logger.info(f"启动服务器: {self.host}:{self.port}")
try:
# 修改:使用线程安全的方式运行服务器
self._server = make_server(self.host, self.port, self.app)
self._server.serve_forever()
except Exception as e:
self.logger.error(f"服务器运行失败: {e}")
self._stop_event.set()
def stop(self):
"""停止服务器"""
self.logger.info("正在停止服务器...")
self._stop_event.set()
# 修改使用werkzeug服务器的关闭方法
if self._server:
self._server.shutdown()
self.logger.info("服务器已停止")