722 lines
30 KiB
Python
722 lines
30 KiB
Python
import logging
|
||
import os
|
||
import sys
|
||
import threading
|
||
from datetime import datetime
|
||
import time
|
||
import xml.etree.ElementTree as ET
|
||
from db.message_storage import MessageStorageDB
|
||
from db.stats_db import StatsDBOperator
|
||
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, send_from_directory
|
||
from robot_cmd.robot_command import GroupBotManager, Feature, PermissionStatus
|
||
# 导入消息格式化工具
|
||
from utils.message_formatter import format_quote_message
|
||
# 导入toml用于配置文件
|
||
import toml
|
||
|
||
# 添加项目根目录到系统路径,确保可以导入项目模块
|
||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
|
||
|
||
|
||
class DashboardServer:
|
||
"""统计看板服务器"""
|
||
|
||
def __init__(self, host: str = None, port: int = None,
|
||
username: str = None, password: str = None,
|
||
robot_instance=None):
|
||
# 加载配置文件
|
||
self.config = self._load_dashboard_config()
|
||
|
||
# 优先使用传入的参数,其次使用配置文件中的参数
|
||
self.host = host or self.config.get("server", {}).get("host", "0.0.0.0")
|
||
self.port = port or self.config.get("server", {}).get("port", 8888)
|
||
self.username = username or self.config.get("auth", {}).get("username", "admin")
|
||
self.password = password or self.config.get("auth", {}).get("password", "admin123")
|
||
|
||
self.logger = logging.getLogger("DashboardServer")
|
||
self.logger.info(f"Dashboard配置加载完成: 服务器将运行在 {self.host}:{self.port}")
|
||
|
||
# 如果提供了robot实例,则使用其对象
|
||
if robot_instance:
|
||
self.db_manager = robot_instance.db_manager
|
||
self.stats_db = StatsDBOperator(self.db_manager)
|
||
self.message_storage = MessageStorageDB(self.db_manager)
|
||
# 获取联系人管理器实例
|
||
self.contact_manager = robot_instance.contact_manager
|
||
self.logger.info("使用Robot实例的对象进行初始化")
|
||
else:
|
||
self.logger.error("未提供Robot实例,Dashboard无法正常工作")
|
||
raise ValueError("必须提供Robot实例")
|
||
|
||
self.app = self._create_app()
|
||
self._stop_event = threading.Event()
|
||
self._server = None # 存储服务器实例
|
||
|
||
def _load_dashboard_config(self):
|
||
"""加载Dashboard配置文件"""
|
||
try:
|
||
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.toml')
|
||
if os.path.exists(config_path):
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
return toml.load(f)
|
||
else:
|
||
# 如果配置文件不存在,创建默认配置
|
||
default_config = {
|
||
"server": {"host": "0.0.0.0", "port": 8888},
|
||
"auth": {"username": "admin", "password": "admin123"}
|
||
}
|
||
with open(config_path, 'w', encoding='utf-8') as f:
|
||
toml.dump(default_config, f)
|
||
return default_config
|
||
except Exception as e:
|
||
self.logger.error(f"加载Dashboard配置文件失败: {e}")
|
||
# 返回默认配置
|
||
return {
|
||
"server": {"host": "0.0.0.0", "port": 8888},
|
||
"auth": {"username": "admin", "password": "admin123"}
|
||
}
|
||
|
||
def _create_app(self) -> Flask:
|
||
"""创建Flask应用"""
|
||
app = Flask(__name__)
|
||
app.secret_key = "stats_dashboard_secret_key"
|
||
# 禁用模板缓存,使修改HTML文件后立即生效
|
||
app.config['TEMPLATES_AUTO_RELOAD'] = True
|
||
# 静态文件目录
|
||
static_folder = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
|
||
|
||
@app.route('/static/<path:filename>')
|
||
def serve_static(filename):
|
||
return send_from_directory(static_folder, filename)
|
||
|
||
# 配置静态文件访问
|
||
# 获取项目根目录下的static/images目录
|
||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
images_dir = os.path.join(project_root, "static", "images")
|
||
|
||
# 确保目录存在
|
||
os.makedirs(images_dir, exist_ok=True)
|
||
|
||
@app.route('/static/images/<path:filename>')
|
||
def serve_images(filename):
|
||
return send_from_directory(images_dir, filename)
|
||
|
||
# 添加一个路由处理favicon请求
|
||
@app.route('/favicon.ico')
|
||
def favicon():
|
||
return send_from_directory(os.path.join(app.root_path, 'static'),
|
||
'favicon.ico', mimetype='image/vnd.microsoft.icon')
|
||
|
||
# 登录页面
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
error = None
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
|
||
if username == self.username and password == self.password:
|
||
session['logged_in'] = True
|
||
return redirect(url_for('index'))
|
||
else:
|
||
error = '用户名或密码错误'
|
||
|
||
return render_template('login.html', error=error)
|
||
|
||
# 登出
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.pop('logged_in', None)
|
||
return redirect(url_for('login'))
|
||
|
||
# 登录检查装饰器
|
||
def login_required(f):
|
||
def decorated_function(*args, **kwargs):
|
||
if not session.get('logged_in'):
|
||
return redirect(url_for('login'))
|
||
return f(*args, **kwargs)
|
||
|
||
decorated_function.__name__ = f.__name__
|
||
return decorated_function
|
||
|
||
@app.route('/')
|
||
@login_required
|
||
def index():
|
||
return render_template('index.html')
|
||
|
||
@app.route('/plugins')
|
||
@login_required
|
||
def plugins():
|
||
return render_template('plugins.html')
|
||
|
||
@app.route('/users')
|
||
@login_required
|
||
def users_page():
|
||
return render_template('users.html')
|
||
|
||
@app.route('/groups')
|
||
@login_required
|
||
def groups():
|
||
return render_template('groups.html')
|
||
|
||
@app.route('/errors')
|
||
@login_required
|
||
def errors():
|
||
return render_template('errors.html')
|
||
|
||
@app.route('/messages')
|
||
@login_required
|
||
def message_list_page():
|
||
"""消息列表页面"""
|
||
return render_template('message_list.html')
|
||
|
||
# 在路由部分添加
|
||
@app.route('/wx_logs')
|
||
@login_required
|
||
def wx_logs():
|
||
return render_template('wx_logs.html')
|
||
# 在_create_app方法中添加新的路由
|
||
@app.route('/robot_management')
|
||
@login_required
|
||
def robot_management():
|
||
return render_template('robot_management.html')
|
||
|
||
# 添加通讯录管理页面路由
|
||
@app.route('/contacts')
|
||
@login_required
|
||
def contacts_management():
|
||
"""通讯录管理页面"""
|
||
return render_template('contacts_management.html')
|
||
|
||
# 添加通讯录相关API
|
||
@app.route('/api/contacts/all', methods=['GET'])
|
||
@login_required
|
||
def api_contacts_all():
|
||
"""获取所有联系人信息API"""
|
||
try:
|
||
contacts = self.contact_manager.get_contacts()
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"contacts": contacts
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取所有联系人信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/contacts/groups', methods=['GET'])
|
||
@login_required
|
||
def api_contacts_groups():
|
||
"""获取群组联系人信息API"""
|
||
try:
|
||
contacts = self.contact_manager.get_contacts()
|
||
group_contacts = {wxid: name for wxid, name in contacts.items() if '@@' in wxid or '@chatroom' in wxid}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"groups": group_contacts
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组联系人信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/contacts/personal', methods=['GET'])
|
||
@login_required
|
||
def api_contacts_personal():
|
||
"""获取个人联系人信息API"""
|
||
try:
|
||
contacts = self.contact_manager.get_contacts()
|
||
personal_contacts = {wxid: name for wxid, name in contacts.items()
|
||
if '@@' not in wxid and '@chatroom' not in wxid}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"personal": personal_contacts
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取个人联系人信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/contacts/statistics', methods=['GET'])
|
||
@login_required
|
||
def api_contacts_statistics():
|
||
"""获取联系人统计信息API"""
|
||
try:
|
||
contacts = self.contact_manager.get_contacts()
|
||
group_contacts = {wxid: name for wxid, name in contacts.items()
|
||
if '@@' in wxid or '@chatroom' in wxid}
|
||
personal_contacts = {wxid: name for wxid, name in contacts.items()
|
||
if '@@' not in wxid and '@chatroom' not in wxid}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"total": len(contacts),
|
||
"groups": len(group_contacts),
|
||
"personal": len(personal_contacts)
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取联系人统计信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/robot/groups')
|
||
@login_required
|
||
def api_robot_groups():
|
||
try:
|
||
# 获取所有群组列表
|
||
groups = GroupBotManager.get_group_list()
|
||
|
||
# 如果方法返回None或发生异常,使用本地缓存
|
||
if groups is None and hasattr(GroupBotManager, "local_cache"):
|
||
groups = GroupBotManager.local_cache.get("group_list", set())
|
||
|
||
# 如果仍然为None,则初始化为空集合
|
||
if groups is None:
|
||
groups = set()
|
||
|
||
self.logger.info(f"获取到 {len(groups)} 个群组")
|
||
group_data = []
|
||
|
||
for group_id in groups:
|
||
try:
|
||
# 获取群名称,如果失败则使用默认值
|
||
group_name = self.contact_manager.get_nickname(group_id)
|
||
if not group_name:
|
||
group_name = f"未知群组({group_id})"
|
||
|
||
# 获取机器人状态,如果失败则使用默认值
|
||
try:
|
||
robot_status = GroupBotManager.get_group_permission(group_id, Feature.ROBOT)
|
||
except:
|
||
robot_status = PermissionStatus.UNKNOWN
|
||
|
||
group_data.append({
|
||
"group_id": group_id,
|
||
"group_name": group_name,
|
||
"robot_status": robot_status.value if robot_status else "unknown"
|
||
})
|
||
except Exception as e:
|
||
self.logger.warning(f"处理群组 {group_id} 信息时出错: {e}")
|
||
# 添加基本信息,避免单个群组错误影响整个列表
|
||
group_data.append({
|
||
"group_id": group_id,
|
||
"group_name": "获取失败",
|
||
"robot_status": "unknown"
|
||
})
|
||
|
||
return jsonify({"success": True, "data": group_data})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组列表失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/robot/group/<group_id>/permissions')
|
||
@login_required
|
||
def api_robot_group_permissions(group_id):
|
||
try:
|
||
permissions = GroupBotManager.list_group_permissions(group_id)
|
||
permission_data = []
|
||
|
||
for feature, status in permissions.items():
|
||
permission_data.append({
|
||
"feature_id": feature.value,
|
||
"feature_name": feature.name,
|
||
"feature_description": feature.description,
|
||
"status": status.value
|
||
})
|
||
|
||
return jsonify({"success": True, "data": permission_data})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组权限失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/robot/group/<group_id>/permissions', methods=['POST'])
|
||
@login_required
|
||
def api_update_robot_permissions(group_id):
|
||
# 更新群组功能权限
|
||
data = request.json
|
||
feature_id = data.get('feature_id')
|
||
status = data.get('status')
|
||
|
||
try:
|
||
feature = Feature(int(feature_id))
|
||
new_status = PermissionStatus(status)
|
||
|
||
# 特殊处理ROBOT功能
|
||
if feature == Feature.ROBOT:
|
||
r = self.db_manager.get_redis_connection()
|
||
if new_status == PermissionStatus.ENABLED:
|
||
GroupBotManager.local_cache["group_list"].add(group_id)
|
||
r.sadd("group:list", group_id)
|
||
else:
|
||
GroupBotManager.local_cache["group_list"].remove(group_id)
|
||
r.srem("group:list", group_id)
|
||
|
||
GroupBotManager.set_group_permission(group_id, feature, new_status)
|
||
return jsonify({"success": True})
|
||
except Exception as e:
|
||
self.logger.error(f"更新群组权限失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 400
|
||
|
||
@app.route('/api/robot/batch_operation', methods=['POST'])
|
||
@login_required
|
||
def api_robot_batch_operation():
|
||
# 批量操作接口
|
||
data = request.json
|
||
operation = data.get('operation')
|
||
group_ids = data.get('group_ids', [])
|
||
|
||
results = {}
|
||
|
||
try:
|
||
if operation == 'remove_groups':
|
||
for group_id in group_ids:
|
||
result = GroupBotManager.remove_group(group_id)
|
||
results[group_id] = result
|
||
|
||
return jsonify({"success": True, "results": results})
|
||
else:
|
||
return jsonify({"success": False, "error": "不支持的操作类型"}), 400
|
||
except Exception as e:
|
||
self.logger.error(f"批量操作失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 400
|
||
|
||
@app.route('/api/user_stats')
|
||
@login_required
|
||
def api_user_stats():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
limit = request.args.get('limit', 10, type=int)
|
||
stats = self.stats_db.get_user_stats(days, limit)
|
||
|
||
# 将用户ID转换为名称
|
||
for item in stats:
|
||
if 'user_id' in item:
|
||
user_id = item['user_id']
|
||
item['user_name'] = self.contact_manager.get_nickname(user_id)
|
||
|
||
return jsonify({"success": True, "data": stats})
|
||
except Exception as e:
|
||
self.logger.error(f"获取用户统计失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/group_stats')
|
||
@login_required
|
||
def api_group_stats():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
limit = request.args.get('limit', 10, type=int)
|
||
stats = self.stats_db.get_group_stats(days, limit)
|
||
|
||
# 将群ID转换为名称
|
||
for item in stats:
|
||
if 'group_id' in item:
|
||
group_id = item['group_id']
|
||
item['group_name'] = self.contact_manager.get_nickname(group_id)
|
||
|
||
return jsonify({"success": True, "data": stats})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组统计失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/plugin_stats')
|
||
@login_required
|
||
def api_plugin_stats():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
stats = self.stats_db.get_plugin_stats(days)
|
||
return jsonify({"success": True, "data": stats})
|
||
except Exception as e:
|
||
self.logger.error(f"获取插件统计失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/error_logs')
|
||
@login_required
|
||
def api_error_logs():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
limit = request.args.get('limit', 100, type=int)
|
||
logs = self.stats_db.get_error_logs(days, limit)
|
||
return jsonify({"success": True, "data": logs})
|
||
except Exception as e:
|
||
self.logger.error(f"获取错误日志失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/system_info')
|
||
@login_required
|
||
def api_system_info():
|
||
try:
|
||
# 获取系统信息
|
||
import platform
|
||
import psutil
|
||
|
||
system_info = {
|
||
"os": platform.system(),
|
||
"os_version": platform.version(),
|
||
"python_version": platform.python_version(),
|
||
"cpu_usage": psutil.cpu_percent(),
|
||
"memory_usage": psutil.virtual_memory().percent,
|
||
"disk_usage": psutil.disk_usage('/').percent,
|
||
"uptime": time.time() - psutil.boot_time(),
|
||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
return jsonify({"success": True, "data": system_info})
|
||
except Exception as e:
|
||
self.logger.error(f"获取系统信息失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/dashboard_summary')
|
||
@login_required
|
||
def api_dashboard_summary():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
summary = self.stats_db.get_dashboard_summary(days)
|
||
|
||
# 转换用户和群组ID为名称
|
||
if 'top_users' in summary:
|
||
for user in summary['top_users']:
|
||
if 'user_id' in user:
|
||
user['user_name'] = self.contact_manager.get_nickname(user['user_id'])
|
||
|
||
if 'top_groups' in summary:
|
||
for group in summary['top_groups']:
|
||
if 'group_id' in group:
|
||
group['group_name'] = self.contact_manager.get_nickname(group['group_id'])
|
||
|
||
return jsonify({"success": True, "data": summary})
|
||
except Exception as e:
|
||
self.logger.error(f"获取仪表盘摘要数据出错: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/plugin_trend')
|
||
@login_required
|
||
def api_plugin_trend():
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
plugin_name = request.args.get('plugin_name', '')
|
||
trend = self.stats_db.get_plugin_trend(plugin_name, days)
|
||
return jsonify({"success": True, "data": trend})
|
||
except Exception as e:
|
||
self.logger.error(f"获取插件趋势失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/robot/group/<group_id>/message_trend')
|
||
@login_required
|
||
def api_group_message_trend(group_id):
|
||
try:
|
||
days = request.args.get('days', 7, type=int)
|
||
trend_data = self.message_storage.get_message_trend(group_id, days)
|
||
|
||
# 格式化数据为前端需要的格式
|
||
dates = []
|
||
counts = []
|
||
for item in trend_data:
|
||
# 将日期转换为字符串
|
||
if isinstance(item['date'], datetime):
|
||
date_str = item['date'].strftime('%Y-%m-%d')
|
||
else:
|
||
date_str = str(item['date'])
|
||
|
||
dates.append(date_str)
|
||
counts.append(item['message_count'])
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'data': {
|
||
'dates': dates,
|
||
'counts': counts
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组消息趋势数据出错: {e}")
|
||
return jsonify({'success': False, 'error': str(e)}), 500
|
||
|
||
@app.route('/api/error_detail/<int:error_id>')
|
||
@login_required
|
||
def api_error_detail(error_id):
|
||
try:
|
||
detail = self.stats_db.get_error_detail(error_id)
|
||
return jsonify({"success": True, "data": detail})
|
||
except Exception as e:
|
||
self.logger.error(f"获取错误详情失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/robot/add_group', methods=['POST'])
|
||
@login_required
|
||
def api_add_group():
|
||
try:
|
||
data = request.json
|
||
group_id = data.get('group_id')
|
||
|
||
if not group_id or not group_id.strip():
|
||
return jsonify({"success": False, "error": "群组ID不能为空"}), 400
|
||
|
||
group_id = group_id.strip()
|
||
|
||
# 检查群组是否已存在
|
||
if group_id in GroupBotManager.local_cache["group_list"]:
|
||
return jsonify({"success": False, "error": "该群组已存在"}), 400
|
||
|
||
# 添加群组到列表并启用机器人功能
|
||
GroupBotManager.local_cache["group_list"].add(group_id)
|
||
r = self.db_manager.get_redis_connection()
|
||
r.sadd("group:list", group_id)
|
||
|
||
# 设置ROBOT功能为启用状态
|
||
GroupBotManager.set_group_permission(group_id, Feature.ROBOT, PermissionStatus.ENABLED)
|
||
|
||
# 获取群组名称(如果可能)
|
||
group_name = self.contact_manager.get_nickname(group_id)
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": f"群组 {group_id} 已成功添加",
|
||
"group": {
|
||
"group_id": group_id,
|
||
"group_name": group_name,
|
||
"robot_status": "enabled"
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"添加群组失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
@app.route('/api/messages', methods=['GET'])
|
||
@login_required
|
||
def get_messages():
|
||
"""获取消息列表API"""
|
||
try:
|
||
# 获取查询参数
|
||
group_id = request.args.get('group_id')
|
||
start_date = request.args.get('start_date', datetime.now().strftime('%Y-%m-%d'))
|
||
end_date = request.args.get('end_date', datetime.now().strftime('%Y-%m-%d'))
|
||
search_text = request.args.get('search_text')
|
||
page = int(request.args.get('page', 1))
|
||
page_size = int(request.args.get('page_size', 20))
|
||
|
||
# 调用数据库方法获取消息
|
||
result = self.message_storage.get_messages_by_filter(
|
||
group_id=group_id,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
search_text=search_text,
|
||
page=page,
|
||
page_size=page_size
|
||
)
|
||
|
||
# 处理消息数据,添加群组名称和发送者昵称,并格式化引用消息
|
||
for msg in result['messages']:
|
||
# 获取群组名称
|
||
msg['group_name'] = self.contact_manager.get_nickname(msg['group_id']) or msg['group_id']
|
||
|
||
# 获取发送者昵称
|
||
msg['sender_name'] = self.contact_manager.get_nickname(msg['sender']) or msg['sender']
|
||
|
||
# 处理消息内容,格式化引用消息
|
||
if msg['message_type'] == "49" and msg['content']: # 应用消息类型
|
||
try:
|
||
# 检查是否为引用消息
|
||
if '<refermsg>' in msg['content']:
|
||
# 使用格式化工具处理引用消息
|
||
msg['content'] = format_quote_message(msg['content'])
|
||
else:
|
||
# 其他类型的应用消息,解析 XML 提取标题
|
||
root = ET.fromstring(msg['content'])
|
||
title_elem = root.find('.//title')
|
||
if title_elem is not None:
|
||
msg['content'] = title_elem.text
|
||
except Exception as e:
|
||
self.logger.error(f"解析消息类型49出错: {e}")
|
||
|
||
return jsonify(result)
|
||
except Exception as e:
|
||
self.logger.error(f"获取消息列表失败: {e}")
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
@app.route('/api/groups', methods=['GET'])
|
||
@login_required
|
||
def get_groups():
|
||
"""获取群组列表API"""
|
||
try:
|
||
# 获取机器人管理的群组列表
|
||
groups = []
|
||
for group_id in self.contact_manager.get_contacts():
|
||
if '@chatroom' in group_id:
|
||
groups.append({
|
||
'group_id': group_id,
|
||
'group_name': self.contact_manager.get_nickname(group_id) or group_id
|
||
})
|
||
|
||
return jsonify({'groups': groups})
|
||
except Exception as e:
|
||
self.logger.error(f"获取群组列表失败: {e}")
|
||
return jsonify({'error': str(e)}), 500
|
||
|
||
|
||
@app.route('/api/wx_logs')
|
||
@login_required
|
||
def api_wx_logs():
|
||
try:
|
||
log_type = request.args.get('type', 'info') # 默认显示info日志
|
||
lines = request.args.get('lines', 100, type=int) # 默认显示最后100行
|
||
|
||
# 确定日志文件路径
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
if log_type == 'error':
|
||
log_file = os.path.join(base_dir, 'wx_error.log')
|
||
else:
|
||
log_file = os.path.join(base_dir, 'wx_info.log')
|
||
|
||
# 读取日志文件
|
||
log_content = []
|
||
if os.path.exists(log_file):
|
||
from collections import deque # 添加这一行导入deque
|
||
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
|
||
# 使用deque获取最后N行
|
||
log_content = list(deque(f, lines))
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"data": {
|
||
"log_type": log_type,
|
||
"log_file": log_file,
|
||
"content": log_content,
|
||
"lines": len(log_content)
|
||
}
|
||
})
|
||
except Exception as e:
|
||
self.logger.error(f"获取微信日志失败: {e}")
|
||
return jsonify({"success": False, "error": str(e)}), 500
|
||
|
||
return app
|
||
|
||
def run(self):
|
||
"""运行服务器"""
|
||
from werkzeug.serving import make_server
|
||
|
||
self.logger.info(f"启动服务器: {self.host}:{self.port}")
|
||
try:
|
||
# 使用线程安全的方式运行服务器
|
||
self._server = make_server(self.host, self.port, self.app)
|
||
self._server.serve_forever()
|
||
except Exception as e:
|
||
self.logger.error(f"服务器运行失败: {e}")
|
||
self._stop_event.set()
|
||
|
||
def stop(self):
|
||
"""停止服务器"""
|
||
self.logger.info("正在停止服务器...")
|
||
self._stop_event.set()
|
||
|
||
# 使用werkzeug服务器的关闭方法
|
||
if self._server:
|
||
self._server.shutdown()
|
||
|
||
self.logger.info("服务器已停止")
|