Merge remote-tracking branch 'origin/feature-855' into feature-855

# Conflicts:
#	admin/dashboard/blueprints/system.py
This commit is contained in:
liuwei
2026-05-06 08:39:09 +08:00
22 changed files with 156 additions and 3822 deletions

View File

@@ -7,42 +7,11 @@ from flask import Blueprint, request, jsonify, render_template, current_app
from admin.dashboard.blueprints.auth import login_required
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from plugins.robot_menu.menu_render_tool import RobotMenuRenderTool
# 创建蓝图
plugin_routes = Blueprint('plugin_routes', __name__)
LOG = logger
# 后台命令索引页只复用“命令目录生成”能力,不需要图片渲染,
# 因此这里固定使用轻量 text 配置创建一个工具实例即可。
_command_catalog_tool = RobotMenuRenderTool(
output_mode="text",
image_fallback_to_text=True,
image_render_timeout_seconds=30,
image_render_retries=1,
image_template_path="plugins/robot_menu/templates/menu_cards.html",
log=LOG,
)
def _build_group_options(server) -> list:
"""构建后台命令索引页的群组选项列表。"""
group_ids = sorted(set(GroupBotManager.get_group_list() or []))
options = []
for group_id in group_ids:
group_name = ""
try:
group_name = server.contact_manager.get_nickname(group_id) or ""
except Exception:
group_name = ""
options.append(
{
"group_id": group_id,
"group_name": str(group_name or group_id),
}
)
return options
# 机器人管理页面
@plugin_routes.route('/plugins_manage')
@@ -51,61 +20,40 @@ def robot_management():
return render_template('plugins_manage.html')
@plugin_routes.route('/command_catalog')
@login_required
def command_catalog_page():
"""后台命令索引页面。"""
return render_template('command_catalog.html')
@plugin_routes.route('/api/plugins', methods=['GET'])
@login_required
def get_plugins():
"""获取所有插件列表"""
try:
server = current_app.dashboard_server
# 统一改为消费 PluginManager 的标准治理快照:
# 1. 这样既能覆盖“已加载插件”,也能覆盖“发现但加载失败/配置禁用”的模块;
# 2. 后台不必重复拼装版本、命令、依赖、配置健康等字段;
# 3. 后续继续补错误统计、性能排名时,也只需要在快照层扩展。
plugin_list = server.plugin_manager.get_plugin_snapshots()
# 获取插件注册表
plugins = server.plugin_registry.get_all_plugins()
# 转换为前端需要的格式
plugin_list = []
for name, plugin in plugins.items():
# 获取插件模块名
try:
module_name = plugin.__class__.__module__.split('.')[-2]
except (IndexError, AttributeError):
module_name = "unknown"
plugin_info = {
"name": plugin.name,
"module_name": module_name,
"version": getattr(plugin, 'version', 'N/A'),
"author": getattr(plugin, 'author', 'N/A'),
"description": getattr(plugin, 'description', 'N/A'),
"status": plugin.status.name if hasattr(plugin, 'status') else 'UNKNOWN'
}
plugin_list.append(plugin_info)
return jsonify({"success": True, "data": plugin_list})
except Exception as e:
LOG.error(f"获取插件列表失败: {str(e)}", exc_info=True)
return jsonify({"success": False, "message": f"获取插件列表失败: {str(e)}"})
@plugin_routes.route('/api/plugins/command_catalog', methods=['GET'])
@login_required
def get_command_catalog():
"""获取后台命令索引数据。"""
try:
server = current_app.dashboard_server
group_id = str(request.args.get('group_id') or '').strip()
# 后台命令索引默认站在“管理员”视角,
# 这样既能看到当前可用命令,也能看到未启用能力和管理指令。
catalog = _command_catalog_tool.build_command_catalog_data(
group_id=group_id,
requester_id="dashboard_admin",
force_admin=True,
)
data = {
**catalog,
"group_options": _build_group_options(server),
"summary": {
"available_manual_count": len(catalog.get("available_manual", []) or []),
"available_auto_count": len(catalog.get("available_auto", []) or []),
"unavailable_manual_count": len(catalog.get("unavailable_manual", []) or []),
"admin_command_count": len(catalog.get("admin_commands", []) or []),
},
}
return jsonify({"success": True, "data": data})
except Exception as e:
LOG.error(f"获取命令索引失败: {str(e)}", exc_info=True)
return jsonify({"success": False, "message": f"获取命令索引失败: {str(e)}"})
@plugin_routes.route('/api/plugins/group_status', methods=['GET'])
@login_required
def get_plugin_group_status():
@@ -245,10 +193,31 @@ def get_plugin_info():
if not plugin_name:
return jsonify({"success": False, "message": "缺少插件名称参数"})
plugin_info = server.plugin_manager.get_plugin_snapshot(plugin_name)
if not plugin_info:
# 获取插件管理器
display_name, plugin = server.plugin_manager.find_plugin_by_name(plugin_name)
if not plugin:
return jsonify({"success": False, "message": f"未找到插件: {plugin_name}"})
# 获取插件模块名
try:
module_name = plugin.__class__.__module__.split('.')[-2]
except (IndexError, AttributeError):
module_name = "unknown"
# 构建详细信息
plugin_info = {
"name": plugin.name,
"module_name": module_name,
"version": getattr(plugin, 'version', 'N/A'),
"author": getattr(plugin, 'author', 'N/A'),
"description": getattr(plugin, 'description', 'N/A'),
"status": plugin.status.name if hasattr(plugin, 'status') else 'UNKNOWN',
"command_prefix": getattr(plugin, 'command_prefix', ''),
"commands": getattr(plugin, 'commands', []),
"config": getattr(plugin, '_config', {})
}
return jsonify({"success": True, "data": plugin_info})
except Exception as e:
LOG.error(f"获取插件详情失败: {str(e)}", exc_info=True)
@@ -266,14 +235,9 @@ def enable_plugin():
if not plugin_name:
return jsonify({"success": False, "message": "缺少插件名称参数"})
# 已加载插件直接启动;尚未加载的插件则先尝试加载,再进入启动流程。
display_name, plugin = server.plugin_manager.find_plugin_by_name(plugin_name)
if not plugin:
plugin = server.plugin_manager.load_plugin(plugin_name)
if plugin:
display_name = plugin.name
if plugin and server.plugin_manager.start_plugin(display_name or plugin_name):
# 获取插件管理器
# 启用插件
if server.plugin_manager.start_plugin(plugin_name):
return jsonify({"success": True, "message": f"插件 {plugin_name} 启用成功"})
else:
return jsonify({"success": False, "message": f"插件 {plugin_name} 启用失败"})
@@ -314,14 +278,8 @@ def reload_plugin():
if not plugin_name:
return jsonify({"success": False, "message": "缺少插件名称参数"})
# 已加载插件优先走重载;若当前未加载,则退化为“重新尝试加载并启动”。
display_name, plugin = server.plugin_manager.find_plugin_by_name(plugin_name)
if plugin:
reloaded_plugin = server.plugin_manager.reload_plugin(plugin_name)
else:
reloaded_plugin = server.plugin_manager.load_plugin(plugin_name)
if reloaded_plugin:
server.plugin_manager.start_plugin(reloaded_plugin.name)
# 载插件
reloaded_plugin = server.plugin_manager.reload_plugin(plugin_name)
if reloaded_plugin:
return jsonify({"success": True, "message": f"插件 {plugin_name} 重载成功"})
@@ -342,11 +300,16 @@ def get_raw_plugin_config():
if not plugin_name:
return jsonify({"success": False, "message": "缺少插件名称参数"})
plugin_snapshot = server.plugin_manager.get_plugin_snapshot(plugin_name)
if not plugin_snapshot:
# 获取插件管理器
# 查找插件
display_name, plugin = server.plugin_manager.find_plugin_by_name(plugin_name)
if not plugin:
return jsonify({"success": False, "message": f"未找到插件: {plugin_name}"})
config_path = str(plugin_snapshot.get("config_path", "") or "").strip()
# 获取配置文件路径
config_path = plugin.get_config_path()
if not os.path.exists(config_path):
return jsonify({"success": False, "message": f"配置文件不存在: {config_path}"})
@@ -386,29 +349,15 @@ def update_plugin_config():
if not plugin_name or config_text is None:
return jsonify({"success": False, "message": "缺少必要参数"})
# 查找插件
# 获取插件管理器
display_name, plugin = server.plugin_manager.find_plugin_by_name(plugin_name)
plugin_snapshot = server.plugin_manager.get_plugin_snapshot(plugin_name)
if not plugin_snapshot:
if not plugin:
return jsonify({"success": False, "message": f"未找到插件: {plugin_name}"})
config_path = str(plugin_snapshot.get("config_path", "") or "").strip()
if not config_path:
return jsonify({"success": False, "message": "插件未声明配置路径,暂不支持在线编辑"})
# 保存前先做格式校验:
# 1. 避免把坏 TOML 先写回磁盘,再让插件进入“文件已坏但提示成功”的状态;
# 2. 校验通过后再真正落盘,失败则保留线上旧配置;
# 3. 这也是插件治理中心第一阶段的“配置校验底座”。
try:
if format_type == 'toml':
config_obj = toml.loads(config_text)
elif format_type == 'json':
config_obj = json.loads(config_text)
else:
return jsonify({"success": False, "message": f"不支持的配置格式: {format_type}"})
except Exception as parse_error:
LOG.error(f"解析配置失败: {str(parse_error)}", exc_info=True)
return jsonify({"success": False, "message": f"配置格式校验失败: {str(parse_error)}"})
# 获取配置文件路径
config_path = plugin.get_config_path()
# 确保配置目录存在
os.makedirs(os.path.dirname(config_path), exist_ok=True)
@@ -417,11 +366,22 @@ def update_plugin_config():
with open(config_path, 'w', encoding='utf-8') as f:
f.write(config_text)
# 若插件当前已加载,则同步刷新内存中的配置镜像,减少“保存后详情弹窗仍是旧配置”的困惑。
if plugin:
# 解析配置并更新插件内部配置
try:
if format_type == 'toml':
config_obj = toml.loads(config_text)
elif format_type == 'json':
config_obj = json.loads(config_text)
else:
return jsonify({"success": False, "message": f"不支持的配置格式: {format_type}"})
# 更新插件内部配置
plugin._config = config_obj
return jsonify({"success": True, "message": "配置已保存并通过格式校验"})
return jsonify({"success": True, "message": "配置已保存"})
except Exception as e:
LOG.error(f"解析配置失败: {str(e)}", exc_info=True)
return jsonify({"success": False, "message": f"配置已保存,但解析失败: {str(e)}"})
except Exception as e:
LOG.error(f"更新插件配置失败: {str(e)}", exc_info=True)

View File

@@ -40,7 +40,7 @@ def api_list_schedules():
data = server.plugin_schedule_manager.list_schedules_with_runtime()
# 后端统一格式化时间字段,避免前端出现 Fri, 17 Apr 2026 ... 这类 RFC 时间串。
for row in data:
for key in ("next_run_at", "last_run_at", "latest_success_at", "latest_failed_at", "created_at", "updated_at"):
for key in ("next_run_at", "last_run_at", "created_at", "updated_at"):
if key in row:
row[key] = _normalize_datetime_text(row.get(key))
return jsonify({"success": True, "data": data})

View File

@@ -21,40 +21,6 @@ def _normalize_datetime_text(value):
return text
def _build_job_health_status(*, enabled: bool, running: bool, last_status: str, latest_success_at, latest_failure_summary: str) -> str:
"""根据任务启停、运行态和历史结果输出后台可读的健康状态。"""
# 状态设计尽量贴近运维判断顺序:
# 1. 停用态单独标记,避免和“从未执行”混淆;
# 2. 执行中的任务优先展示 running方便后台快速识别实时动作
# 3. 最近一次执行失败时直接标记 failed让异常任务在列表里一眼可见
# 4. 有成功历史且最近不是失败时视为 healthy否则落到 idle。
if not enabled:
return "disabled"
if running:
return "running"
if str(last_status or "").strip().lower() == "failed":
return "failed"
if latest_success_at or str(last_status or "").strip().lower() == "success":
return "healthy"
if str(latest_failure_summary or "").strip():
return "failed"
return "idle"
def _build_job_health_message(*, health_status: str, latest_success_at, latest_failure_summary: str) -> str:
"""为后台列表生成一句简短的任务健康提示。"""
if health_status == "disabled":
return "任务已停用"
if health_status == "running":
return "任务正在执行中"
if health_status == "failed":
return str(latest_failure_summary or "最近一次执行失败").strip()
if health_status == "healthy":
success_text = _normalize_datetime_text(latest_success_at)
return f"最近成功于 {success_text}" if success_text else "任务近期执行正常"
return "暂无执行记录"
@system_jobs_bp.route("/")
@login_required
def page_system_jobs():
@@ -68,31 +34,11 @@ def api_list_jobs():
db_rows = server.system_job_db.list_jobs()
runtime_rows = async_job.get_jobs_snapshot()
runtime_by_key = {row.get("job_key", ""): row for row in runtime_rows if row.get("job_key")}
job_keys = [str(row.get("job_key") or "").strip() for row in db_rows if str(row.get("job_key") or "").strip()]
latest_log_by_key = server.system_job_db.get_latest_logs_map(job_keys)
history_summary_by_key = server.system_job_db.get_job_history_summary_map(job_keys)
result = []
for row in db_rows:
job_key = row.get("job_key")
runtime = runtime_by_key.get(job_key, {})
latest_log = latest_log_by_key.get(job_key, {})
history_summary = history_summary_by_key.get(job_key, {})
last_status = runtime.get("last_status") or latest_log.get("status") or "never"
last_run_at = runtime.get("last_run_at") or latest_log.get("triggered_at")
last_error = runtime.get("last_error") or ""
if not last_error and str(last_status or "").strip().lower() == "failed":
last_error = (
str(latest_log.get("summary") or "").strip()
or str(history_summary.get("latest_failure_summary") or "").strip()
)
health_status = _build_job_health_status(
enabled=bool(row.get("enabled", 0)),
running=bool(runtime.get("running", False)),
last_status=str(last_status or ""),
latest_success_at=history_summary.get("latest_success_at"),
latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
)
result.append(
{
"job_key": job_key,
@@ -105,26 +51,14 @@ def api_list_jobs():
"runtime_enabled": runtime.get("enabled"),
"running": runtime.get("running", False),
"trigger_text": runtime.get("trigger_text", ""),
"last_run_at": _normalize_datetime_text(last_run_at),
"last_status": last_status,
"last_error": last_error,
"last_duration_ms": runtime.get("last_duration_ms") or latest_log.get("duration_ms"),
"last_run_at": _normalize_datetime_text(runtime.get("last_run_at")),
"last_status": runtime.get("last_status"),
"last_error": runtime.get("last_error"),
"last_duration_ms": runtime.get("last_duration_ms"),
"next_run_at": _normalize_datetime_text(runtime.get("next_run_at")),
"run_count": runtime.get("run_count", 0),
"success_count": runtime.get("success_count", 0),
"fail_count": runtime.get("fail_count", 0),
"latest_success_at": _normalize_datetime_text(history_summary.get("latest_success_at")),
"latest_failed_at": _normalize_datetime_text(history_summary.get("latest_failed_at")),
"latest_failure_summary": str(history_summary.get("latest_failure_summary") or "").strip(),
"history_success_count": int(history_summary.get("history_success_count", 0) or 0),
"history_fail_count": int(history_summary.get("history_fail_count", 0) or 0),
"history_total_count": int(history_summary.get("history_total_count", 0) or 0),
"health_status": health_status,
"health_message": _build_job_health_message(
health_status=health_status,
latest_success_at=history_summary.get("latest_success_at"),
latest_failure_summary=str(history_summary.get("latest_failure_summary") or "").strip(),
),
}
)