Files
abot/admin/dashboard/server.py
2025-04-02 11:17:22 +08:00

636 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import sys
import threading
from datetime import datetime
import time
import xml.etree.ElementTree as ET
from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
# 导入消息格式化工具
from utils.message_formatter import format_quote_message
# 导入toml用于配置文件
import toml
# 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
class DashboardServer:
"""统计看板服务器"""
def __init__(self, host: str = None, port: int = None,
username: str = None, password: str = None,
robot_instance=None):
# 加载配置文件
self.config = self._load_dashboard_config()
# 优先使用传入的参数,其次使用配置文件中的参数
self.host = host or self.config.get("server", {}).get("host", "0.0.0.0")
self.port = port or self.config.get("server", {}).get("port", 8888)
self.username = username or self.config.get("auth", {}).get("username", "admin")
self.password = password or self.config.get("auth", {}).get("password", "admin123")
self.logger = logging.getLogger("DashboardServer")
self.logger.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}")
# 如果提供了robot实例则使用其对象
if robot_instance:
self.db_manager = robot_instance.db_manager
self.stats_db = StatsDBOperator(self.db_manager)
self.message_storage = MessageStorageDB(self.db_manager)
# 获取联系人管理器实例
self.contact_manager = robot_instance.contact_manager
self.logger.info("使用Robot实例的对象进行初始化")
else:
self.logger.error("未提供Robot实例Dashboard无法正常工作")
raise ValueError("必须提供Robot实例")
self.app = self._create_app()
self._stop_event = threading.Event()
self._server = None # 存储服务器实例
def _load_dashboard_config(self):
"""加载Dashboard配置文件"""
try:
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml')
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
return toml.load(f)
else:
# 如果配置文件不存在,创建默认配置
default_config = {
"server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "admin123"}
}
with open(config_path, 'w', encoding='utf-8') as f:
toml.dump(default_config, f)
return default_config
except Exception as e:
self.logger.error(f"加载Dashboard配置文件失败: {e}")
# 返回默认配置
return {
"server": {"host": "0.0.0.0", "port": 8888},
"auth": {"username": "admin", "password": "admin123"}
}
def _create_app(self) -> Flask:
"""创建Flask应用"""
app = Flask(__name__)
app.secret_key = "stats_dashboard_secret_key"
# 静态文件目录
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory(static_folder, filename)
# 配置静态文件访问
# 获取项目根目录下的static/images目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
images_dir = os.path.join(project_root, "static", "images")
# 确保目录存在
os.makedirs(images_dir, exist_ok=True)
@app.route('/static/images/<path:filename>')
def serve_images(filename):
return send_from_directory(images_dir, filename)
# 添加一个路由处理favicon请求
@app.route('/favicon.ico')
def favicon():
return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon')
# 登录页面
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == self.username and password == self.password:
session['logged_in'] = True
return redirect(url_for('index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 登出
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
# 登录检查装饰器
def login_required(f):
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/plugins')
@login_required
def plugins():
return render_template('plugins.html')
@app.route('/users')
@login_required
def users_page():
return render_template('users.html')
@app.route('/groups')
@login_required
def groups():
return render_template('groups.html')
@app.route('/errors')
@login_required
def errors():
return render_template('errors.html')
@app.route('/messages')
@login_required
def message_list_page():
"""消息列表页面"""
return render_template('message_list.html')
# 在路由部分添加
@app.route('/wx_logs')
@login_required
def wx_logs():
return render_template('wx_logs.html')
# 在_create_app方法中添加新的路由
@app.route('/robot_management')
@login_required
def robot_management():
return render_template('robot_management.html')
@app.route('/api/robot/groups')
@login_required
def api_robot_groups():
try:
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
# 如果方法返回None或发生异常使用本地缓存
if groups is None and hasattr(GroupBotManager, "local_cache"):
groups = GroupBotManager.local_cache.get("group_list", set())
# 如果仍然为None则初始化为空集合
if groups is None:
groups = set()
self.logger.info(f"获取到 {len(groups)} 个群组")
group_data = []
for group_id in groups:
try:
# 获取群名称,如果失败则使用默认值
group_name = self.contact_manager.get_nickname(group_id)
if not group_name:
group_name = f"未知群组({group_id})"
# 获取机器人状态,如果失败则使用默认值
try:
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
except:
robot_status = PermissionStatus.UNKNOWN
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value if robot_status else "unknown"
})
except Exception as e:
self.logger.warning(f"处理群组 {group_id} 信息时出错: {e}")
# 添加基本信息,避免单个群组错误影响整个列表
group_data.append({
"group_id": group_id,
"group_name": "获取失败",
"robot_status": "unknown"
})
return jsonify({"success": True, "data": group_data})
except Exception as e:
self.logger.error(f"获取群组列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
try:
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
except Exception as e:
self.logger.error(f"获取群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = self.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
self.logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/robot/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
self.logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/user_stats')
@login_required
def api_user_stats():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取用户统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/group_stats')
@login_required
def api_group_stats():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取群组统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_stats')
@login_required
def api_plugin_stats():
try:
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取插件统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/error_logs')
@login_required
def api_error_logs():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 100, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
except Exception as e:
self.logger.error(f"获取错误日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/system_info')
@login_required
def api_system_info():
try:
# 获取系统信息
import platform
import psutil
system_info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"uptime": time.time() - psutil.boot_time(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return jsonify({"success": True, "data": system_info})
except Exception as e:
self.logger.error(f"获取系统信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/dashboard_summary')
@login_required
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
@login_required
def api_plugin_trend():
try:
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
return jsonify({"success": True, "data": trend})
except Exception as e:
self.logger.error(f"获取插件趋势失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/message_trend')
@login_required
def api_group_message_trend(group_id):
try:
days = request.args.get('days', 7, type=int)
trend_data = self.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
self.logger.error(f"获取群组消息趋势数据出错: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/error_detail/<int:error_id>')
@login_required
def api_error_detail(error_id):
try:
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
except Exception as e:
self.logger.error(f"获取错误详情失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/add_group', methods=['POST'])
@login_required
def api_add_group():
try:
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = self.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = self.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
self.logger.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/messages', methods=['GET'])
@login_required
def get_messages():
"""获取消息列表API"""
try:
# 获取查询参数
group_id = request.args.get('group_id')
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
search_text = request.args.get('search_text')
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
# 调用数据库方法获取消息
result = self.message_storage.get_messages_by_filter(
group_id=group_id,
start_date=start_date,
end_date=end_date,
search_text=search_text,
page=page,
page_size=page_size
)
# 处理消息数据,添加群组名称和发送者昵称,并格式化引用消息
for msg in result['messages']:
# 获取群组名称
msg['group_name'] = self.contact_manager.get_nickname(msg['group_id']) or msg['group_id']
# 获取发送者昵称
msg['sender_name'] = self.contact_manager.get_nickname(msg['sender']) or msg['sender']
# 处理消息内容,格式化引用消息
if msg['message_type'] == "49" and msg['content']: # 应用消息类型
try:
# 检查是否为引用消息
if '<refermsg>' in msg['content']:
# 使用格式化工具处理引用消息
msg['content'] = format_quote_message(msg['content'])
else:
# 其他类型的应用消息,解析 XML 提取标题
root = ET.fromstring(msg['content'])
title_elem = root.find('.//title')
if title_elem is not None:
msg['content'] = title_elem.text
except Exception as e:
self.logger.error(f"解析消息类型49出错: {e}")
return jsonify(result)
except Exception as e:
self.logger.error(f"获取消息列表失败: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/groups', methods=['GET'])
@login_required
def get_groups():
"""获取群组列表API"""
try:
# 获取机器人管理的群组列表
groups = []
for group_id in self.contact_manager.get_contacts():
if '@chatroom' in group_id:
groups.append({
'group_id': group_id,
'group_name': self.contact_manager.get_nickname(group_id) or group_id
})
return jsonify({'groups': groups})
except Exception as e:
self.logger.error(f"获取群组列表失败: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api_wx_logs')
@login_required
def api_wx_logs():
try:
log_type = request.args.get('type', 'info') # 默认显示info日志
lines = request.args.get('lines', 100, type=int) # 默认显示最后100行
# 确定日志文件路径
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if log_type == 'error':
log_file = os.path.join(base_dir, 'wx_error.log')
else:
log_file = os.path.join(base_dir, 'wx_info.log')
# 读取日志文件
log_content = []
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 使用deque获取最后N行
log_content = list(deque(f, lines))
return jsonify({
"success": True,
"data": {
"log_type": log_type,
"log_file": log_file,
"content": log_content,
"lines": len(log_content)
}
})
except Exception as e:
self.logger.error(f"获取微信日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
return app
def run(self):
"""运行服务器"""
from werkzeug.serving import make_server
self.logger.info(f"启动服务器: {self.host}:{self.port}")
try:
# 使用线程安全的方式运行服务器
self._server = make_server(self.host, self.port, self.app)
self._server.serve_forever()
except Exception as e:
self.logger.error(f"服务器运行失败: {e}")
self._stop_event.set()
def stop(self):
"""停止服务器"""
self.logger.info("正在停止服务器...")
self._stop_event.set()
# 使用werkzeug服务器的关闭方法
if self._server:
self._server.shutdown()
self.logger.info("服务器已停止")