完善插件治理中心第一阶段

- 为 PluginManager 增加统一插件治理快照,补充配置概览、治理诊断、运行态记录与未加载模块展示\n- 更新插件管理后台页面,展示治理健康、能力类型、Feature Key、依赖关系与配置概览信息\n- 优化插件配置保存流程,保存前先做格式校验,并支持对未加载插件查看详情与重新尝试加载\n- 更新工程优化文档,记录插件治理中心第一阶段的当前进展
This commit is contained in:
liuwei
2026-04-30 16:07:02 +08:00
parent 97fc6dc2a4
commit b0e11fb9b5
4 changed files with 738 additions and 92 deletions

View File

@@ -6,6 +6,7 @@ import threading
import time
from typing import Dict, List, Any, Optional, Tuple
import toml
from loguru import logger
from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
@@ -56,6 +57,11 @@ class PluginManager:
self.module_to_display = {} # 模块名到显示名的映射
self.system_context = {} # 系统上下文
self.current_bot: Optional[WechatAPIClient] = None
# 运行态记录用于给“插件治理中心”提供统一视图:
# 1. 不仅记录已成功加载的插件,也记录“加载失败 / 配置禁用 / 手动停用”等状态;
# 2. 后台治理页就不必再从日志里猜某个插件为什么没出现在列表里;
# 3. 这里按 module_name 维度存储,便于和 plugins 目录天然对齐。
self.plugin_runtime_records: Dict[str, Dict[str, Any]] = {}
# 热加载相关
self._watcher_thread: Optional[threading.Thread] = None
@@ -74,6 +80,438 @@ class PluginManager:
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
def _record_module_runtime_state(
self,
module_name: str,
state: str,
message: str = "",
detail: Optional[Dict[str, Any]] = None,
) -> None:
"""记录插件模块的运行态快照。"""
if not module_name:
return
self.plugin_runtime_records[module_name] = {
"state": str(state or "").strip().lower() or "unknown",
"message": str(message or "").strip(),
"detail": dict(detail or {}),
"updated_at": float(time.time()),
}
def _get_module_runtime_state(self, module_name: str) -> Dict[str, Any]:
"""读取插件模块的最近一次运行态记录。"""
return dict(self.plugin_runtime_records.get(module_name, {}) or {})
@staticmethod
def _is_sensitive_config_key(key: str) -> bool:
"""判断配置键是否属于敏感信息。"""
lowered_key = str(key or "").strip().lower()
return any(
keyword in lowered_key
for keyword in ["password", "secret", "token", "api_key", "apikey", "cookie", "client_secret"]
)
def _build_config_overview_from_mapping(
self,
config_obj: Optional[Dict[str, Any]],
*,
config_path: str,
file_exists: bool,
parse_ok: bool,
parse_error: str = "",
) -> Dict[str, Any]:
"""从配置对象构建统一的配置概览。"""
config_obj = dict(config_obj or {})
top_level_keys = list(config_obj.keys())
dict_section_names = []
enabled_sections = []
disabled_sections = []
sensitive_paths = []
def _walk_sensitive_fields(node, path: str) -> None:
if isinstance(node, dict):
for key, value in node.items():
next_path = f"{path}.{key}"
if isinstance(value, str) and self._is_sensitive_config_key(key) and str(value or "").strip():
sensitive_paths.append(next_path)
_walk_sensitive_fields(value, next_path)
return
if isinstance(node, list):
for index, value in enumerate(node):
_walk_sensitive_fields(value, f"{path}[{index}]")
for section_name, section_value in config_obj.items():
if not isinstance(section_value, dict):
continue
dict_section_names.append(section_name)
if "enable" not in section_value:
continue
if bool(section_value.get("enable", True)):
enabled_sections.append(section_name)
else:
disabled_sections.append(section_name)
_walk_sensitive_fields(config_obj, "config")
return {
"path": config_path,
"exists": bool(file_exists),
"parse_ok": bool(parse_ok),
"parse_error": str(parse_error or ""),
"top_level_keys": top_level_keys,
"top_level_key_count": len(top_level_keys),
"dict_section_names": dict_section_names,
"section_count": len(dict_section_names),
"enabled_sections": enabled_sections,
"enabled_section_count": len(enabled_sections),
"disabled_sections": disabled_sections,
"disabled_section_count": len(disabled_sections),
"sensitive_field_paths": sensitive_paths,
"sensitive_field_count": len(sensitive_paths),
}
def _read_plugin_config_overview(self, config_path: str) -> Dict[str, Any]:
"""读取插件配置文件并返回概览。"""
if not config_path:
return self._build_config_overview_from_mapping(
{},
config_path="",
file_exists=False,
parse_ok=False,
parse_error="配置路径为空",
)
if not os.path.exists(config_path):
return self._build_config_overview_from_mapping(
{},
config_path=config_path,
file_exists=False,
parse_ok=True,
)
try:
with open(config_path, "r", encoding="utf-8") as config_file:
config_obj = toml.load(config_file)
return self._build_config_overview_from_mapping(
config_obj,
config_path=config_path,
file_exists=True,
parse_ok=True,
)
except Exception as e:
return self._build_config_overview_from_mapping(
{},
config_path=config_path,
file_exists=True,
parse_ok=False,
parse_error=str(e),
)
@staticmethod
def _build_diagnostic(level: str, code: str, message: str) -> Dict[str, str]:
"""统一治理诊断项结构。"""
return {
"level": str(level or "").strip().lower() or "info",
"code": str(code or "").strip(),
"message": str(message or "").strip(),
}
def _collect_plugin_types(self, plugin: PluginInterface) -> List[str]:
"""识别插件能力类型。"""
plugin_types = []
if isinstance(plugin, MessagePluginInterface):
plugin_types.append("message")
if isinstance(plugin, ScheduledPluginInterface):
plugin_types.append("scheduled")
if not plugin_types:
plugin_types.append("generic")
return plugin_types
def _collect_plugin_commands(self, plugin: PluginInterface) -> List[str]:
"""统一读取插件声明的命令列表。"""
commands = getattr(plugin, "commands", []) or getattr(plugin, "_commands", []) or []
if isinstance(commands, (list, tuple, set)):
return [str(item).strip() for item in commands if str(item or "").strip()]
if isinstance(commands, str) and commands.strip():
return [commands.strip()]
return []
def _build_governance_diagnostics(
self,
*,
plugin: Optional[PluginInterface],
module_name: str,
config_overview: Dict[str, Any],
runtime_record: Dict[str, Any],
) -> List[Dict[str, str]]:
"""根据插件元信息、配置和运行态生成治理诊断。"""
diagnostics = []
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
runtime_message = str(runtime_record.get("message", "") or "").strip()
if runtime_state == "load_failed":
diagnostics.append(
self._build_diagnostic(
"error",
"load_failed",
runtime_message or f"插件模块 `{module_name}` 加载失败,请先排查导入、初始化或依赖问题。",
)
)
elif runtime_state == "disabled_by_config":
diagnostics.append(
self._build_diagnostic(
"info",
"disabled_by_config",
runtime_message or "插件已在配置中禁用,当前未进入运行态。",
)
)
if not config_overview.get("exists"):
diagnostics.append(
self._build_diagnostic(
"info",
"config_missing",
"未发现 config.toml当前插件将完全依赖默认参数或代码内置配置。",
)
)
elif not config_overview.get("parse_ok"):
diagnostics.append(
self._build_diagnostic(
"error",
"config_parse_failed",
f"配置文件解析失败:{config_overview.get('parse_error', '未知错误')}",
)
)
sensitive_count = int(config_overview.get("sensitive_field_count", 0) or 0)
if sensitive_count > 0:
diagnostics.append(
self._build_diagnostic(
"warning",
"config_contains_sensitive_fields",
f"配置文件中检测到 {sensitive_count} 个敏感字段,建议逐步迁移到全局配置或环境变量。",
)
)
if plugin is None:
return diagnostics
version = str(getattr(plugin, "version", "") or "").strip()
author = str(getattr(plugin, "author", "") or "").strip()
description = str(getattr(plugin, "description", "") or "").strip()
if not version or version.upper() == "N/A":
diagnostics.append(self._build_diagnostic("warning", "missing_version", "插件未声明版本号,不利于后续升级和兼容治理。"))
if not author or author.upper() == "N/A":
diagnostics.append(self._build_diagnostic("info", "missing_author", "插件未声明作者信息,后续定位维护人会比较困难。"))
if not description or description.upper() == "N/A":
diagnostics.append(self._build_diagnostic("info", "missing_description", "插件未声明描述信息,后台可读性较弱。"))
dependencies = list(getattr(plugin, "dependencies", []) or [])
for dependency_name in dependencies:
if dependency_name not in self.plugins:
diagnostics.append(
self._build_diagnostic(
"warning",
"missing_dependency",
f"声明依赖插件 `{dependency_name}` 当前未加载,存在运行时能力缺失风险。",
)
)
if isinstance(plugin, MessagePluginInterface):
commands = self._collect_plugin_commands(plugin)
if not commands:
diagnostics.append(
self._build_diagnostic(
"info",
"missing_commands",
"消息插件未声明命令列表,后台无法准确展示其触发入口。",
)
)
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
if not feature_key:
diagnostics.append(
self._build_diagnostic(
"info",
"missing_feature_key",
"消息插件未声明 feature_key将无法纳入统一群级权限治理。",
)
)
return diagnostics
@staticmethod
def _summarize_governance_status(diagnostics: List[Dict[str, str]]) -> Dict[str, Any]:
"""把诊断列表汇总为后台更容易消费的治理状态。"""
level_priority = {"healthy": 0, "info": 1, "warning": 2, "error": 3}
level_counts = {"error": 0, "warning": 0, "info": 0}
governance_status = "healthy"
for item in diagnostics or []:
level = str(item.get("level", "") or "info").strip().lower()
if level in level_counts:
level_counts[level] += 1
if level_priority.get(level, 0) > level_priority.get(governance_status, 0):
governance_status = level
if governance_status == "info" and level_counts["warning"] == 0 and level_counts["error"] == 0:
governance_status = "healthy"
return {
"status": governance_status,
"error_count": level_counts["error"],
"warning_count": level_counts["warning"],
"info_count": level_counts["info"],
}
def _build_plugin_snapshot(self, plugin: PluginInterface) -> Dict[str, Any]:
"""为已加载插件生成标准治理快照。"""
module_name = self._get_module_name_from_plugin(plugin) or "unknown"
runtime_record = self._get_module_runtime_state(module_name)
config_path = plugin.get_config_path()
config_overview = self._read_plugin_config_overview(config_path)
commands = self._collect_plugin_commands(plugin)
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
feature_description = str(getattr(plugin, "feature_description", "") or "").strip()
governance_diagnostics = self._build_governance_diagnostics(
plugin=plugin,
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
return {
"name": plugin.name,
"module_name": module_name,
"version": getattr(plugin, "version", "N/A"),
"author": getattr(plugin, "author", "N/A"),
"description": getattr(plugin, "description", "N/A"),
"status": plugin.status.name if hasattr(plugin, "status") else "UNKNOWN",
"status_label": self._status_to_label(plugin.status.name if hasattr(plugin, "status") else "UNKNOWN"),
"plugin_types": self._collect_plugin_types(plugin),
"commands": commands,
"command_count": len(commands),
"command_prefix": getattr(plugin, "command_prefix", ""),
"dependencies": list(getattr(plugin, "dependencies", []) or []),
"feature_key": feature_key,
"feature_description": feature_description,
"supports_group_switch": bool(getattr(plugin, "feature", None)),
"config": getattr(plugin, "_config", {}),
"config_path": config_path,
"config_overview": config_overview,
"governance_diagnostics": governance_diagnostics,
"governance_status": governance_summary["status"],
"governance_error_count": governance_summary["error_count"],
"governance_warning_count": governance_summary["warning_count"],
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_record.get("state", "loaded"),
"runtime_message": runtime_record.get("message", ""),
}
def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]:
"""为未成功加载的插件模块生成治理快照。"""
runtime_record = self._get_module_runtime_state(module_name)
config_path = os.path.join(self.plugin_dir, module_name, "config.toml")
if not os.path.exists(config_path):
config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml")
config_overview = self._read_plugin_config_overview(config_path)
governance_diagnostics = self._build_governance_diagnostics(
plugin=None,
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
status = "DISCOVERED"
if runtime_state == "load_failed":
status = "ERROR"
elif runtime_state == "disabled_by_config":
status = "STOPPED"
return {
"name": module_name,
"module_name": module_name,
"version": "N/A",
"author": "N/A",
"description": runtime_record.get("message", "") or "插件模块已发现,但当前未进入加载态。",
"status": status,
"status_label": self._status_to_label(status),
"plugin_types": ["unknown"],
"commands": [],
"command_count": 0,
"command_prefix": "",
"dependencies": [],
"feature_key": "",
"feature_description": "",
"supports_group_switch": False,
"config": {},
"config_path": config_path,
"config_overview": config_overview,
"governance_diagnostics": governance_diagnostics,
"governance_status": governance_summary["status"],
"governance_error_count": governance_summary["error_count"],
"governance_warning_count": governance_summary["warning_count"],
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_state or "discovered",
"runtime_message": runtime_record.get("message", ""),
}
@staticmethod
def _status_to_label(status: str) -> str:
"""把运行态状态码转换成中文展示文案。"""
status_map = {
"RUNNING": "运行中",
"STOPPED": "已停用",
"LOADED": "已加载",
"UNLOADED": "未加载",
"ERROR": "异常",
"DISCOVERED": "待处理",
"UNKNOWN": "未知",
}
return status_map.get(str(status or "").strip().upper(), "未知")
def get_plugin_snapshots(self) -> List[Dict[str, Any]]:
"""返回插件治理中心使用的统一快照列表。"""
snapshots = []
loaded_module_names = set()
discovered_module_names = set(self.discover_plugins())
for plugin in self.plugins.values():
snapshot = self._build_plugin_snapshot(plugin)
snapshots.append(snapshot)
loaded_module_names.add(snapshot["module_name"])
# 这里把“目录已存在但插件未成功加载”的模块也补进列表:
# 1. 否则后台只能看到成功插件,看不到真正需要排查的失败模块;
# 2. 这类插件往往正是治理中心最该暴露的问题;
# 3. 统一补成快照后,前端无需区分“已加载”与“未加载”两套数据源。
for module_name in sorted(discovered_module_names - loaded_module_names):
snapshots.append(self._build_unloaded_plugin_snapshot(module_name))
snapshots.sort(
key=lambda item: (
0 if item.get("status") == "RUNNING" else 1,
0 if item.get("governance_status") == "error" else 1 if item.get("governance_status") == "warning" else 2,
str(item.get("module_name", "")),
)
)
return snapshots
def get_plugin_snapshot(self, name: str) -> Optional[Dict[str, Any]]:
"""按模块名或展示名获取单个插件治理快照。"""
target_name = str(name or "").strip()
if not target_name:
return None
display_name, plugin = self.find_plugin_by_name(target_name)
if plugin:
return self._build_plugin_snapshot(plugin)
for snapshot in self.get_plugin_snapshots():
if snapshot.get("module_name") == target_name or snapshot.get("name") == target_name:
return snapshot
return None
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
@@ -354,6 +792,7 @@ class PluginManager:
if module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager添加缺失的模块映射 {module_name} -> {display_name}")
self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。")
self._inject_bot_to_plugin(plugin)
return plugin
except Exception as e:
@@ -372,6 +811,7 @@ class PluginManager:
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入插件模块 {module_path} 失败: {e}")
self._record_module_runtime_state(module_name, "load_failed", f"导入插件模块失败: {e}")
return None
else:
# 单文件插件
@@ -381,6 +821,7 @@ class PluginManager:
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入单文件插件 {module_name} 失败: {e}")
self._record_module_runtime_state(module_name, "load_failed", f"导入单文件插件失败: {e}")
return None
# 查找插件类
@@ -406,12 +847,14 @@ class PluginManager:
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件模块 {module_name} 加载配置失败")
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件模块 {module_name} 初始化失败")
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
@@ -428,13 +871,16 @@ class PluginManager:
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
return plugin
else:
self.LOG.error(f"PluginManager插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例")
self._record_module_runtime_state(module_name, "load_failed", "get_plugin() 未返回有效的插件实例。")
else:
self.LOG.error(f"PluginManager插件模块 {module_name} 中未找到有效的插件类或 get_plugin 函数")
self._record_module_runtime_state(module_name, "load_failed", "未找到有效的插件类或 get_plugin 函数。")
return None
# 实例化插件
@@ -448,6 +894,7 @@ class PluginManager:
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件模块 {module_name} 加载配置失败")
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
@@ -455,12 +902,14 @@ class PluginManager:
for section in plugin._config.values():
if isinstance(section, dict) and not section.get("enable", True):
self.LOG.debug(f"PluginManager插件 {module_name} 已禁用,跳过加载")
self._record_module_runtime_state(module_name, "disabled_by_config", "插件在配置中已禁用,启动时已跳过加载。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件模块 {module_name} 初始化失败")
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
@@ -477,6 +926,7 @@ class PluginManager:
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
return plugin
@@ -485,6 +935,7 @@ class PluginManager:
plugin_obj = locals().get("plugin")
if plugin_obj is not None:
async_job.remove_jobs_by_owner(plugin_obj)
self._record_module_runtime_state(module_name, "load_failed", f"插件加载异常: {e}")
self.LOG.exception(f"PluginManager加载插件模块 {module_name} 失败: {e}", exc_info=True)
return None
@@ -609,10 +1060,14 @@ class PluginManager:
if plugin.start():
plugin.status = PluginStatus.RUNNING
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。")
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为在运行")
return True
else:
plugin.status = PluginStatus.ERROR
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "load_failed", "插件启动失败,状态已标记为异常。")
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为异常")
return False
@@ -639,10 +1094,14 @@ class PluginManager:
if plugin.stop():
plugin.status = PluginStatus.STOPPED
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "stopped", "插件已手动停用。")
self.LOG.debug(f"插件 {display_name} 状态变更为已停止")
return True
else:
plugin.status = PluginStatus.ERROR
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "load_failed", "插件停用失败,状态已标记为异常。")
self.LOG.debug(f"插件 {display_name} 状态变更为异常")
return False