管理后台 server 使用蓝图模式,降低维护成本,降低修改功能时对其他模块的影响

This commit is contained in:
liuwei
2025-04-03 11:41:10 +08:00
parent 6a50faba7f
commit 4cd1008f3a
9 changed files with 738 additions and 633 deletions

View File

@@ -0,0 +1 @@
# 蓝图包初始化文件

View File

@@ -0,0 +1,39 @@
from flask import Blueprint, render_template, request, redirect, url_for, session, current_app
from functools import wraps
# 创建认证蓝图
auth_bp = Blueprint('auth', __name__)
# 登录检查装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
# 登录页面
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
# 从应用上下文获取服务器实例
server = current_app.dashboard_server
if username == server.username and password == server.password:
session['logged_in'] = True
return redirect(url_for('main.index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 登出
@auth_bp.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('auth.login'))

View File

@@ -0,0 +1,127 @@
from flask import Blueprint, render_template, jsonify, request
from .auth import login_required
import logging
# 创建联系人管理蓝图
contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts')
logger = logging.getLogger("ContactsBlueprint")
# 联系人管理页面
@contacts_bp.route('/')
@login_required
def contacts_management():
"""通讯录管理页面"""
return render_template('contacts_management.html')
# API路由
@contacts_bp.route('/api/all', methods=['GET'])
@login_required
def api_contacts_all():
"""获取所有联系人信息API"""
try:
server = contacts_bp.server
contacts = server.contact_manager.get_contacts()
return jsonify({
"success": True,
"data": {
"contacts": contacts
}
})
except Exception as e:
logger.error(f"获取所有联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/statistics', methods=['GET'])
@login_required
def api_contacts_statistics():
"""获取联系人统计信息API"""
try:
server = contacts_bp.server
# 使用新的联系人分类方法获取统计信息
total, groups, personal, public, official = server.contact_manager.get_contact_statistics()
return jsonify({
"success": True,
"data": {
"total": total,
"groups": groups,
"personal": personal,
"public": public,
"official": official
}
})
except Exception as e:
logger.error(f"获取联系人统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/groups', methods=['GET'])
@login_required
def api_contacts_groups():
"""获取群组联系人信息API"""
try:
server = contacts_bp.server
group_contacts = server.contact_manager.get_group_contacts()
return jsonify({
"success": True,
"data": {
"groups": group_contacts
}
})
except Exception as e:
logger.error(f"获取群组联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/personal', methods=['GET'])
@login_required
def api_contacts_personal():
"""获取个人联系人信息API"""
try:
server = contacts_bp.server
personal_contacts = server.contact_manager.get_personal_contacts()
return jsonify({
"success": True,
"data": {
"personal": personal_contacts
}
})
except Exception as e:
logger.error(f"获取个人联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/official', methods=['GET'])
@login_required
def api_contacts_official():
"""获取公众号联系人信息API"""
try:
server = contacts_bp.server
official_accounts = server.contact_manager.get_official_accounts()
return jsonify({
"success": True,
"data": {
"official": official_accounts
}
})
except Exception as e:
logger.error(f"获取公众号联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@contacts_bp.route('/api/public', methods=['GET'])
@login_required
def api_contacts_public():
"""获取公共好友信息API"""
try:
server = contacts_bp.server
public_contacts = server.contact_manager.get_public_contacts()
return jsonify({
"success": True,
"data": {
"public": public_contacts
}
})
except Exception as e:
logger.error(f"获取公共好友信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500

View File

@@ -0,0 +1,10 @@
from flask import Blueprint, render_template
from .auth import login_required
# 创建主页蓝图
main_bp = Blueprint('main', __name__)
@main_bp.route('/')
@login_required
def index():
return render_template('index.html')

View File

@@ -0,0 +1,91 @@
from flask import Blueprint, render_template, jsonify, request
from .auth import login_required
import logging
import xml.etree.ElementTree as ET
from datetime import datetime
from utils.message_formatter import format_quote_message
# 创建消息管理蓝图
messages_bp = Blueprint('messages', __name__, url_prefix='/messages')
logger = logging.getLogger("MessagesBlueprint")
# 消息列表页面
@messages_bp.route('/')
@login_required
def message_list_page():
"""消息列表页面"""
return render_template('message_list.html')
# API路由
@messages_bp.route('/api', methods=['GET'])
@login_required
def get_messages():
"""获取消息列表API"""
try:
server = messages_bp.server
# 获取查询参数
group_id = request.args.get('group_id')
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
search_text = request.args.get('search_text')
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
# 调用数据库方法获取消息
result = server.message_storage.get_messages_by_filter(
group_id=group_id,
start_date=start_date,
end_date=end_date,
search_text=search_text,
page=page,
page_size=page_size
)
# 处理消息数据,添加群组名称和发送者昵称,并格式化引用消息
for msg in result['messages']:
# 获取群组名称
msg['group_name'] = server.contact_manager.get_nickname(msg['group_id']) or msg['group_id']
# 获取发送者昵称
msg['sender_name'] = server.contact_manager.get_nickname(msg['sender']) or msg['sender']
# 处理消息内容,格式化引用消息
if msg['message_type'] == "49" and msg['content']: # 应用消息类型
try:
# 检查是否为引用消息
if '<refermsg>' in msg['content']:
# 使用格式化工具处理引用消息
msg['content'] = format_quote_message(msg['content'])
else:
# 其他类型的应用消息,解析 XML 提取标题
root = ET.fromstring(msg['content'])
title_elem = root.find('.//title')
if title_elem is not None:
msg['content'] = title_elem.text
except Exception as e:
logger.error(f"解析消息类型49出错: {e}")
return jsonify(result)
except Exception as e:
logger.error(f"获取消息列表失败: {e}")
return jsonify({'error': str(e)}), 500
@messages_bp.route('/api/groups', methods=['GET'])
@login_required
def get_groups():
"""获取群组列表API"""
try:
server = messages_bp.server
# 获取机器人管理的群组列表
groups = []
for group_id in server.contact_manager.get_contacts():
if '@chatroom' in group_id:
groups.append({
'group_id': group_id,
'group_name': server.contact_manager.get_nickname(group_id) or group_id
})
return jsonify({'groups': groups})
except Exception as e:
logger.error(f"获取群组列表失败: {e}")
return jsonify({'error': str(e)}), 500

View File

@@ -0,0 +1,212 @@
from flask import Blueprint, render_template, jsonify, request
from .auth import login_required
import logging
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
from datetime import datetime
# 创建机器人管理蓝图
robot_bp = Blueprint('robot', __name__, url_prefix='/robot')
logger = logging.getLogger("RobotBlueprint")
# 机器人管理页面
@robot_bp.route('/')
@login_required
def robot_management():
return render_template('robot_management.html')
# API路由
@robot_bp.route('/api/groups')
@login_required
def api_robot_groups():
try:
server = robot_bp.server
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
# 如果方法返回None或发生异常使用本地缓存
if groups is None and hasattr(GroupBotManager, "local_cache"):
groups = GroupBotManager.local_cache.get("group_list", set())
# 如果仍然为None则初始化为空集合
if groups is None:
groups = set()
logger.info(f"获取到 {len(groups)} 个群组")
group_data = []
for group_id in groups:
try:
# 获取群名称,如果失败则使用默认值
group_name = server.contact_manager.get_nickname(group_id)
if not group_name:
group_name = f"未知群组({group_id})"
# 获取机器人状态,如果失败则使用默认值
try:
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
except:
robot_status = PermissionStatus.UNKNOWN
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value if robot_status else "unknown"
})
except Exception as e:
logger.warning(f"处理群组 {group_id} 信息时出错: {e}")
# 添加基本信息,避免单个群组错误影响整个列表
group_data.append({
"group_id": group_id,
"group_name": "获取失败",
"robot_status": "unknown"
})
return jsonify({"success": True, "data": group_data})
except Exception as e:
logger.error(f"获取群组列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
try:
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
except Exception as e:
logger.error(f"获取群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
server = robot_bp.server
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = server.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@robot_bp.route('/api/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@robot_bp.route('/api/add_group', methods=['POST'])
@login_required
def api_add_group():
try:
server = robot_bp.server
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = server.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = server.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
logger.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@robot_bp.route('/api/group/<group_id>/message_trend')
@login_required
def api_group_message_trend(group_id):
try:
server = robot_bp.server
days = request.args.get('days', 7, type=int)
trend_data = server.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
logger.error(f"获取群组消息趋势数据出错: {e}")
return jsonify({'success': False, 'error': str(e)}), 500

View File

@@ -0,0 +1,144 @@
from flask import Blueprint, render_template, jsonify, request
from .auth import login_required
import logging
from datetime import datetime
from flask import current_app
# 创建统计数据蓝图
stats_bp = Blueprint('stats', __name__)
logger = logging.getLogger("StatsBlueprint")
# 页面路由
@stats_bp.route('/plugins')
@login_required
def plugins():
return render_template('plugins.html')
@stats_bp.route('/users')
@login_required
def users_page():
return render_template('users.html')
@stats_bp.route('/groups')
@login_required
def groups():
return render_template('groups.html')
@stats_bp.route('/errors')
@login_required
def errors():
return render_template('errors.html')
# API路由
@stats_bp.route('/api/user_stats')
@login_required
def api_user_stats():
try:
server = stats_bp.server
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = server.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = server.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
logger.error(f"获取用户统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/group_stats')
@login_required
def api_group_stats():
try:
server = stats_bp.server
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = server.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = server.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
logger.error(f"获取群组统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/plugin_stats')
@login_required
def api_plugin_stats():
try:
server = stats_bp.server
days = request.args.get('days', 7, type=int)
stats = server.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
except Exception as e:
logger.error(f"获取插件统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/error_logs')
@login_required
def api_error_logs():
try:
server = stats_bp.server
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 100, type=int)
logs = server.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
except Exception as e:
logger.error(f"获取错误日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/dashboard_summary')
@login_required
def api_dashboard_summary():
try:
server = current_app.dashboard_server
days = request.args.get('days', 7, type=int)
summary = server.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = server.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = server.contact_manager.get_nickname(group['group_id'])
return jsonify({"success": True, "data": summary})
except Exception as e:
logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/plugin_trend')
@login_required
def api_plugin_trend():
try:
server = stats_bp.server
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = server.stats_db.get_plugin_trend(plugin_name, days)
return jsonify({"success": True, "data": trend})
except Exception as e:
logger.error(f"获取插件趋势失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@stats_bp.route('/api/error_detail/<int:error_id>')
@login_required
def api_error_detail(error_id):
try:
server = stats_bp.server
detail = server.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
except Exception as e:
logger.error(f"获取错误详情失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500

View File

@@ -0,0 +1,75 @@
from flask import Blueprint, render_template, jsonify, request, send_from_directory
from .auth import login_required
import logging
import os
import time
from datetime import datetime
import platform
import psutil
from collections import deque
# 创建系统信息蓝图
system_bp = Blueprint('system', __name__)
logger = logging.getLogger("SystemBlueprint")
# 页面路由
@system_bp.route('/wx_logs')
@login_required
def wx_logs():
return render_template('wx_logs.html')
# API路由
@system_bp.route('/api/system_info')
@login_required
def api_system_info():
try:
# 获取系统信息
system_info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"uptime": time.time() - psutil.boot_time(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return jsonify({"success": True, "data": system_info})
except Exception as e:
logger.error(f"获取系统信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@system_bp.route('/api/wx_logs')
@login_required
def api_wx_logs():
try:
log_type = request.args.get('type', 'info') # 默认显示info日志
lines = request.args.get('lines', 100, type=int) # 默认显示最后100行
# 确定日志文件路径
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if log_type == 'error':
log_file = os.path.join(base_dir, 'wx_error.log')
else:
log_file = os.path.join(base_dir, 'wx_info.log')
# 读取日志文件
log_content = []
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 使用deque获取最后N行
log_content = list(deque(f, lines))
return jsonify({
"success": True,
"data": {
"log_type": log_type,
"log_file": log_file,
"content": log_content,
"lines": len(log_content)
}
})
except Exception as e:
logger.error(f"获取微信日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500

View File

@@ -1,23 +1,21 @@
# -*- coding: utf-8 -*-
"""
统计看板服务器 - 使用Flask蓝图重构版
"""
import logging import logging
import os import os
import sys import sys
import threading import threading
from datetime import datetime
import time import time
import xml.etree.ElementTree as ET from datetime import datetime
from db.message_storage import MessageStorageDB from db.message_storage import MessageStorageDB
from db.stats_db import StatsDBOperator from db.stats_db import StatsDBOperator
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory from flask import Flask, send_from_directory
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
# 导入消息格式化工具
from utils.message_formatter import format_quote_message
# 导入toml用于配置文件
import toml import toml
# 添加项目根目录到系统路径,确保可以导入项目模块 # 添加项目根目录到系统路径,确保可以导入项目模块
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
class DashboardServer: class DashboardServer:
"""统计看板服务器""" """统计看板服务器"""
@@ -78,18 +76,20 @@ class DashboardServer:
def _create_app(self) -> Flask: def _create_app(self) -> Flask:
"""创建Flask应用""" """创建Flask应用"""
app = Flask(__name__) # 指定模板文件夹路径
template_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
app = Flask(__name__, template_folder=template_folder)
app.secret_key = "stats_dashboard_secret_key" app.secret_key = "stats_dashboard_secret_key"
# 禁用模板缓存使修改HTML文件后立即生效 # 禁用模板缓存使修改HTML文件后立即生效
app.config['TEMPLATES_AUTO_RELOAD'] = True app.config['TEMPLATES_AUTO_RELOAD'] = True
# 静态文件目录
# 配置静态文件访问
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
@app.route('/static/<path:filename>') @app.route('/static/<path:filename>')
def serve_static(filename): def serve_static(filename):
return send_from_directory(static_folder, filename) return send_from_directory(static_folder, filename)
# 配置静态文件访问
# 获取项目根目录下的static/images目录 # 获取项目根目录下的static/images目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
images_dir = os.path.join(project_root, "static", "images") images_dir = os.path.join(project_root, "static", "images")
@@ -107,629 +107,35 @@ class DashboardServer:
return send_from_directory(os.path.join(app.root_path, 'static'), return send_from_directory(os.path.join(app.root_path, 'static'),
'favicon.ico', mimetype='image/vnd.microsoft.icon') 'favicon.ico', mimetype='image/vnd.microsoft.icon')
# 登录页面 # 注册蓝图
@app.route('/login', methods=['GET', 'POST']) self._register_blueprints(app)
def login():
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == self.username and password == self.password:
session['logged_in'] = True
return redirect(url_for('index'))
else:
error = '用户名或密码错误'
return render_template('login.html', error=error)
# 登出
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('login'))
# 登录检查装饰器
def login_required(f):
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/plugins')
@login_required
def plugins():
return render_template('plugins.html')
@app.route('/users')
@login_required
def users_page():
return render_template('users.html')
@app.route('/groups')
@login_required
def groups():
return render_template('groups.html')
@app.route('/errors')
@login_required
def errors():
return render_template('errors.html')
@app.route('/messages')
@login_required
def message_list_page():
"""消息列表页面"""
return render_template('message_list.html')
# 在路由部分添加
@app.route('/wx_logs')
@login_required
def wx_logs():
return render_template('wx_logs.html')
# 在_create_app方法中添加新的路由
@app.route('/robot_management')
@login_required
def robot_management():
return render_template('robot_management.html')
# 添加通讯录管理页面路由
@app.route('/contacts')
@login_required
def contacts_management():
"""通讯录管理页面"""
return render_template('contacts_management.html')
# 添加通讯录相关API
@app.route('/api/contacts/all', methods=['GET'])
@login_required
def api_contacts_all():
"""获取所有联系人信息API"""
try:
contacts = self.contact_manager.get_contacts()
return jsonify({
"success": True,
"data": {
"contacts": contacts
}
})
except Exception as e:
self.logger.error(f"获取所有联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 修改联系人统计信息API
@app.route('/api/contacts/statistics', methods=['GET'])
@login_required
def api_contacts_statistics():
"""获取联系人统计信息API"""
try:
# 使用新的联系人分类方法获取统计信息
total, groups, personal, public, official = self.contact_manager.get_contact_statistics()
return jsonify({
"success": True,
"data": {
"total": total,
"groups": groups,
"personal": personal,
"public": public,
"official": official
}
})
except Exception as e:
self.logger.error(f"获取联系人统计信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 修改群组联系人API使用新的分类方法
@app.route('/api/contacts/groups', methods=['GET'])
@login_required
def api_contacts_groups():
"""获取群组联系人信息API"""
try:
group_contacts = self.contact_manager.get_group_contacts()
return jsonify({
"success": True,
"data": {
"groups": group_contacts
}
})
except Exception as e:
self.logger.error(f"获取群组联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 修改个人联系人API使用新的分类方法
@app.route('/api/contacts/personal', methods=['GET'])
@login_required
def api_contacts_personal():
"""获取个人联系人信息API"""
try:
personal_contacts = self.contact_manager.get_personal_contacts()
return jsonify({
"success": True,
"data": {
"personal": personal_contacts
}
})
except Exception as e:
self.logger.error(f"获取个人联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 添加公众号联系人API
@app.route('/api/contacts/official', methods=['GET'])
@login_required
def api_contacts_official():
"""获取公众号联系人信息API"""
try:
official_accounts = self.contact_manager.get_official_accounts()
return jsonify({
"success": True,
"data": {
"official": official_accounts
}
})
except Exception as e:
self.logger.error(f"获取公众号联系人信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
# 添加公共好友API
@app.route('/api/contacts/public', methods=['GET'])
@login_required
def api_contacts_public():
"""获取公共好友信息API"""
try:
public_contacts = self.contact_manager.get_public_contacts()
return jsonify({
"success": True,
"data": {
"public": public_contacts
}
})
except Exception as e:
self.logger.error(f"获取公共好友信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/groups')
@login_required
def api_robot_groups():
try:
# 获取所有群组列表
groups = GroupBotManager.get_group_list()
# 如果方法返回None或发生异常使用本地缓存
if groups is None and hasattr(GroupBotManager, "local_cache"):
groups = GroupBotManager.local_cache.get("group_list", set())
# 如果仍然为None则初始化为空集合
if groups is None:
groups = set()
self.logger.info(f"获取到 {len(groups)} 个群组")
group_data = []
for group_id in groups:
try:
# 获取群名称,如果失败则使用默认值
group_name = self.contact_manager.get_nickname(group_id)
if not group_name:
group_name = f"未知群组({group_id})"
# 获取机器人状态,如果失败则使用默认值
try:
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
except:
robot_status = PermissionStatus.UNKNOWN
group_data.append({
"group_id": group_id,
"group_name": group_name,
"robot_status": robot_status.value if robot_status else "unknown"
})
except Exception as e:
self.logger.warning(f"处理群组 {group_id} 信息时出错: {e}")
# 添加基本信息,避免单个群组错误影响整个列表
group_data.append({
"group_id": group_id,
"group_name": "获取失败",
"robot_status": "unknown"
})
return jsonify({"success": True, "data": group_data})
except Exception as e:
self.logger.error(f"获取群组列表失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/permissions')
@login_required
def api_robot_group_permissions(group_id):
try:
permissions = GroupBotManager.list_group_permissions(group_id)
permission_data = []
for feature, status in permissions.items():
permission_data.append({
"feature_id": feature.value,
"feature_name": feature.name,
"feature_description": feature.description,
"status": status.value
})
return jsonify({"success": True, "data": permission_data})
except Exception as e:
self.logger.error(f"获取群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
@login_required
def api_update_robot_permissions(group_id):
# 更新群组功能权限
data = request.json
feature_id = data.get('feature_id')
status = data.get('status')
try:
feature = Feature(int(feature_id))
new_status = PermissionStatus(status)
# 特殊处理ROBOT功能
if feature == Feature.ROBOT:
r = self.db_manager.get_redis_connection()
if new_status == PermissionStatus.ENABLED:
GroupBotManager.local_cache["group_list"].add(group_id)
r.sadd("group:list", group_id)
else:
GroupBotManager.local_cache["group_list"].remove(group_id)
r.srem("group:list", group_id)
GroupBotManager.set_group_permission(group_id, feature, new_status)
return jsonify({"success": True})
except Exception as e:
self.logger.error(f"更新群组权限失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/robot/batch_operation', methods=['POST'])
@login_required
def api_robot_batch_operation():
# 批量操作接口
data = request.json
operation = data.get('operation')
group_ids = data.get('group_ids', [])
results = {}
try:
if operation == 'remove_groups':
for group_id in group_ids:
result = GroupBotManager.remove_group(group_id)
results[group_id] = result
return jsonify({"success": True, "results": results})
else:
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
except Exception as e:
self.logger.error(f"批量操作失败: {e}")
return jsonify({"success": False, "error": str(e)}), 400
@app.route('/api/user_stats')
@login_required
def api_user_stats():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_user_stats(days, limit)
# 将用户ID转换为名称
for item in stats:
if 'user_id' in item:
user_id = item['user_id']
item['user_name'] = self.contact_manager.get_nickname(user_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取用户统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/group_stats')
@login_required
def api_group_stats():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
stats = self.stats_db.get_group_stats(days, limit)
# 将群ID转换为名称
for item in stats:
if 'group_id' in item:
group_id = item['group_id']
item['group_name'] = self.contact_manager.get_nickname(group_id)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取群组统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_stats')
@login_required
def api_plugin_stats():
try:
days = request.args.get('days', 7, type=int)
stats = self.stats_db.get_plugin_stats(days)
return jsonify({"success": True, "data": stats})
except Exception as e:
self.logger.error(f"获取插件统计失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/error_logs')
@login_required
def api_error_logs():
try:
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 100, type=int)
logs = self.stats_db.get_error_logs(days, limit)
return jsonify({"success": True, "data": logs})
except Exception as e:
self.logger.error(f"获取错误日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/system_info')
@login_required
def api_system_info():
try:
# 获取系统信息
import platform
import psutil
system_info = {
"os": platform.system(),
"os_version": platform.version(),
"python_version": platform.python_version(),
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent,
"uptime": time.time() - psutil.boot_time(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return jsonify({"success": True, "data": system_info})
except Exception as e:
self.logger.error(f"获取系统信息失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/dashboard_summary')
@login_required
def api_dashboard_summary():
try:
days = request.args.get('days', 7, type=int)
summary = self.stats_db.get_dashboard_summary(days)
# 转换用户和群组ID为名称
if 'top_users' in summary:
for user in summary['top_users']:
if 'user_id' in user:
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
if 'top_groups' in summary:
for group in summary['top_groups']:
if 'group_id' in group:
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
return jsonify({"success": True, "data": summary})
except Exception as e:
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/plugin_trend')
@login_required
def api_plugin_trend():
try:
days = request.args.get('days', 7, type=int)
plugin_name = request.args.get('plugin_name', '')
trend = self.stats_db.get_plugin_trend(plugin_name, days)
return jsonify({"success": True, "data": trend})
except Exception as e:
self.logger.error(f"获取插件趋势失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/group/<group_id>/message_trend')
@login_required
def api_group_message_trend(group_id):
try:
days = request.args.get('days', 7, type=int)
trend_data = self.message_storage.get_message_trend(group_id, days)
# 格式化数据为前端需要的格式
dates = []
counts = []
for item in trend_data:
# 将日期转换为字符串
if isinstance(item['date'], datetime):
date_str = item['date'].strftime('%Y-%m-%d')
else:
date_str = str(item['date'])
dates.append(date_str)
counts.append(item['message_count'])
return jsonify({
'success': True,
'data': {
'dates': dates,
'counts': counts
}
})
except Exception as e:
self.logger.error(f"获取群组消息趋势数据出错: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/error_detail/<int:error_id>')
@login_required
def api_error_detail(error_id):
try:
detail = self.stats_db.get_error_detail(error_id)
return jsonify({"success": True, "data": detail})
except Exception as e:
self.logger.error(f"获取错误详情失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/robot/add_group', methods=['POST'])
@login_required
def api_add_group():
try:
data = request.json
group_id = data.get('group_id')
if not group_id or not group_id.strip():
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
group_id = group_id.strip()
# 检查群组是否已存在
if group_id in GroupBotManager.local_cache["group_list"]:
return jsonify({"success": False, "error": "该群组已存在"}), 400
# 添加群组到列表并启用机器人功能
GroupBotManager.local_cache["group_list"].add(group_id)
r = self.db_manager.get_redis_connection()
r.sadd("group:list", group_id)
# 设置ROBOT功能为启用状态
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
# 获取群组名称(如果可能)
group_name = self.contact_manager.get_nickname(group_id)
return jsonify({
"success": True,
"message": f"群组 {group_id} 已成功添加",
"group": {
"group_id": group_id,
"group_name": group_name,
"robot_status": "enabled"
}
})
except Exception as e:
self.logger.error(f"添加群组失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
@app.route('/api/messages', methods=['GET'])
@login_required
def get_messages():
"""获取消息列表API"""
try:
# 获取查询参数
group_id = request.args.get('group_id')
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
end_date = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
search_text = request.args.get('search_text')
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
# 调用数据库方法获取消息
result = self.message_storage.get_messages_by_filter(
group_id=group_id,
start_date=start_date,
end_date=end_date,
search_text=search_text,
page=page,
page_size=page_size
)
# 处理消息数据,添加群组名称和发送者昵称,并格式化引用消息
for msg in result['messages']:
# 获取群组名称
msg['group_name'] = self.contact_manager.get_nickname(msg['group_id']) or msg['group_id']
# 获取发送者昵称
msg['sender_name'] = self.contact_manager.get_nickname(msg['sender']) or msg['sender']
# 处理消息内容,格式化引用消息
if msg['message_type'] == "49" and msg['content']: # 应用消息类型
try:
# 检查是否为引用消息
if '<refermsg>' in msg['content']:
# 使用格式化工具处理引用消息
msg['content'] = format_quote_message(msg['content'])
else:
# 其他类型的应用消息,解析 XML 提取标题
root = ET.fromstring(msg['content'])
title_elem = root.find('.//title')
if title_elem is not None:
msg['content'] = title_elem.text
except Exception as e:
self.logger.error(f"解析消息类型49出错: {e}")
return jsonify(result)
except Exception as e:
self.logger.error(f"获取消息列表失败: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/groups', methods=['GET'])
@login_required
def get_groups():
"""获取群组列表API"""
try:
# 获取机器人管理的群组列表
groups = []
for group_id in self.contact_manager.get_contacts():
if '@chatroom' in group_id:
groups.append({
'group_id': group_id,
'group_name': self.contact_manager.get_nickname(group_id) or group_id
})
return jsonify({'groups': groups})
except Exception as e:
self.logger.error(f"获取群组列表失败: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/wx_logs')
@login_required
def api_wx_logs():
try:
log_type = request.args.get('type', 'info') # 默认显示info日志
lines = request.args.get('lines', 100, type=int) # 默认显示最后100行
# 确定日志文件路径
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
if log_type == 'error':
log_file = os.path.join(base_dir, 'wx_error.log')
else:
log_file = os.path.join(base_dir, 'wx_info.log')
# 读取日志文件
log_content = []
if os.path.exists(log_file):
from collections import deque # 添加这一行导入deque
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
# 使用deque获取最后N行
log_content = list(deque(f, lines))
return jsonify({
"success": True,
"data": {
"log_type": log_type,
"log_file": log_file,
"content": log_content,
"lines": len(log_content)
}
})
except Exception as e:
self.logger.error(f"获取微信日志失败: {e}")
return jsonify({"success": False, "error": str(e)}), 500
return app return app
def _register_blueprints(self, app):
"""注册所有蓝图"""
# 在函数内部导入蓝图,避免循环导入
from admin.dashboard.blueprints.auth import auth_bp
from admin.dashboard.blueprints.contacts import contacts_bp
from admin.dashboard.blueprints.robot import robot_bp
from admin.dashboard.blueprints.messages import messages_bp
from admin.dashboard.blueprints.stats import stats_bp
from admin.dashboard.blueprints.system import system_bp
from admin.dashboard.blueprints.main import main_bp
# 将服务器实例存储在应用上下文中
app.dashboard_server = self
# 注册蓝图
app.register_blueprint(auth_bp)
app.register_blueprint(main_bp)
app.register_blueprint(contacts_bp, url_prefix='/contacts')
app.register_blueprint(robot_bp, url_prefix='/robot')
app.register_blueprint(messages_bp, url_prefix='/messages')
app.register_blueprint(stats_bp)
app.register_blueprint(system_bp)
self.logger.info("所有蓝图已注册")
def run(self): def run(self):
"""运行服务器""" """运行服务器"""