1. 在插件治理快照中新增消息插件分发方式摘要,区分前台同步、后台任务、混合模式与非消息插件。 2. 插件详情接口统一复用完整治理快照,避免列表和详情字段不一致。 3. 插件管理页列表、移动端卡片和详情弹窗新增执行方式展示,并支持命令级分发预览。
1807 lines
79 KiB
Python
1807 lines
79 KiB
Python
import importlib
|
||
import inspect
|
||
import os
|
||
import sys
|
||
import threading
|
||
import time
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
|
||
import toml
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
|
||
from base.plugin_common.plugin_registry import PluginRegistry
|
||
from utils.decorator.async_job import async_job
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class PluginManager:
|
||
"""插件管理器,负责加载、卸载、启动、停止插件"""
|
||
|
||
# 单例实例
|
||
_instance = None
|
||
|
||
@classmethod
|
||
def get_instance(cls, plugin_dir=None):
|
||
"""获取单例实例
|
||
|
||
Args:
|
||
plugin_dir: 插件目录,如果已有实例则忽略此参数
|
||
|
||
Returns:
|
||
PluginManager实例
|
||
"""
|
||
if cls._instance is None:
|
||
cls._instance = cls(plugin_dir=plugin_dir or "plugins")
|
||
return cls._instance
|
||
|
||
def __new__(cls, *args, **kwargs):
|
||
"""实现单例模式"""
|
||
if cls._instance is None:
|
||
cls._instance = super(PluginManager, cls).__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self, plugin_dir: str = "plugins"):
|
||
"""
|
||
初始化插件管理器
|
||
|
||
Args:
|
||
plugin_dir: 插件目录
|
||
"""
|
||
self.plugin_dir = plugin_dir
|
||
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典,键为display_name
|
||
self.plugin_modules = {} # 插件模块字典,键为module_name
|
||
self.module_to_display = {} # 模块名到显示名的映射
|
||
self.system_context = {} # 系统上下文
|
||
self.current_bot: Optional[WechatAPIClient] = None
|
||
# 运行态记录用于给“插件治理中心”提供统一视图:
|
||
# 1. 不仅记录已成功加载的插件,也记录“加载失败 / 配置禁用 / 手动停用”等状态;
|
||
# 2. 后台治理页就不必再从日志里猜某个插件为什么没出现在列表里;
|
||
# 3. 这里按 module_name 维度存储,便于和 plugins 目录天然对齐。
|
||
self.plugin_runtime_records: Dict[str, Dict[str, Any]] = {}
|
||
# 插件执行保护记录:
|
||
# 1. 主要服务于消息插件的“超时保护 / 连续失败熔断 / 自动半开恢复探测”;
|
||
# 2. 记录同样按 module_name 存储,便于和治理快照直接关联;
|
||
# 3. 后续若扩展到定时任务插件,也可以沿用同一套结构。
|
||
self.plugin_guard_records: Dict[str, Dict[str, Any]] = {}
|
||
self._guard_lock = threading.RLock()
|
||
|
||
# 热加载相关
|
||
self._watcher_thread: Optional[threading.Thread] = None
|
||
self._watcher_stop_event = threading.Event()
|
||
# 默认每 60 秒扫描一次插件目录,降低线上资源消耗
|
||
self._watcher_interval = 60.0
|
||
self._module_file_state: Dict[str, Dict[str, float]] = {}
|
||
self._watcher_lock = threading.RLock()
|
||
|
||
self.LOG = logger
|
||
# 确保插件目录存在
|
||
if not os.path.exists(self.plugin_dir):
|
||
os.makedirs(self.plugin_dir)
|
||
|
||
# 将插件目录添加到Python路径
|
||
if self.plugin_dir not in sys.path:
|
||
sys.path.insert(0, self.plugin_dir)
|
||
|
||
def _record_module_runtime_state(
|
||
self,
|
||
module_name: str,
|
||
state: str,
|
||
message: str = "",
|
||
detail: Optional[Dict[str, Any]] = None,
|
||
) -> None:
|
||
"""记录插件模块的运行态快照。"""
|
||
if not module_name:
|
||
return
|
||
self.plugin_runtime_records[module_name] = {
|
||
"state": str(state or "").strip().lower() or "unknown",
|
||
"message": str(message or "").strip(),
|
||
"detail": dict(detail or {}),
|
||
"updated_at": float(time.time()),
|
||
}
|
||
|
||
def _get_module_runtime_state(self, module_name: str) -> Dict[str, Any]:
|
||
"""读取插件模块的最近一次运行态记录。"""
|
||
return dict(self.plugin_runtime_records.get(module_name, {}) or {})
|
||
|
||
def _build_default_guard_record(self) -> Dict[str, Any]:
|
||
"""构建插件执行保护的默认记录。"""
|
||
return {
|
||
"circuit_state": "closed",
|
||
"consecutive_failures": 0,
|
||
"consecutive_timeouts": 0,
|
||
"failure_count_total": 0,
|
||
"timeout_count_total": 0,
|
||
"success_count_total": 0,
|
||
"last_failure_at": 0.0,
|
||
"last_success_at": 0.0,
|
||
"last_error_message": "",
|
||
"last_failure_type": "",
|
||
"last_process_time_ms": 0.0,
|
||
"last_timeout_seconds": 0,
|
||
"circuit_opened_at": 0.0,
|
||
"circuit_until": 0.0,
|
||
"half_open_in_flight": False,
|
||
}
|
||
|
||
def _get_or_create_plugin_guard_record(self, module_name: str) -> Dict[str, Any]:
|
||
"""读取或初始化插件执行保护记录。"""
|
||
if module_name not in self.plugin_guard_records:
|
||
self.plugin_guard_records[module_name] = self._build_default_guard_record()
|
||
return self.plugin_guard_records[module_name]
|
||
|
||
def reset_plugin_guard_state(self, plugin_or_module_name) -> None:
|
||
"""重置插件执行保护状态。"""
|
||
module_name = ""
|
||
if isinstance(plugin_or_module_name, PluginInterface):
|
||
module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
|
||
else:
|
||
module_name = str(plugin_or_module_name or "").strip()
|
||
if not module_name:
|
||
return
|
||
|
||
with self._guard_lock:
|
||
self.plugin_guard_records[module_name] = self._build_default_guard_record()
|
||
|
||
def get_plugin_guard_snapshot(self, plugin_or_module_name) -> Dict[str, Any]:
|
||
"""读取适合后台展示的插件执行保护快照。"""
|
||
module_name = ""
|
||
if isinstance(plugin_or_module_name, PluginInterface):
|
||
module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
|
||
else:
|
||
module_name = str(plugin_or_module_name or "").strip()
|
||
|
||
if not module_name:
|
||
return self._build_default_guard_record()
|
||
|
||
with self._guard_lock:
|
||
record = dict(self._get_or_create_plugin_guard_record(module_name))
|
||
|
||
now_ts = time.time()
|
||
circuit_until = float(record.get("circuit_until") or 0.0)
|
||
open_remaining_seconds = max(0, int(circuit_until - now_ts)) if circuit_until > 0 else 0
|
||
record["open_remaining_seconds"] = open_remaining_seconds
|
||
return record
|
||
|
||
def try_acquire_plugin_execution(
|
||
self,
|
||
plugin: PluginInterface,
|
||
*,
|
||
recovery_seconds: int,
|
||
) -> Dict[str, Any]:
|
||
"""判断插件当前是否允许继续执行。"""
|
||
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
|
||
now_ts = time.time()
|
||
|
||
with self._guard_lock:
|
||
record = self._get_or_create_plugin_guard_record(module_name)
|
||
circuit_state = str(record.get("circuit_state", "closed") or "closed").strip().lower()
|
||
circuit_until = float(record.get("circuit_until") or 0.0)
|
||
recovery_seconds = max(int(recovery_seconds or 0), 30)
|
||
|
||
# 熔断打开期间直接拒绝新流量:
|
||
# 1. 目标是尽快切断持续失败插件对主链路的影响;
|
||
# 2. 若冷却时间未到,则所有新消息都应跳过该插件;
|
||
# 3. 返回剩余秒数,便于日志和后台展示更可读。
|
||
if circuit_state == "open" and circuit_until > now_ts:
|
||
return {
|
||
"allowed": False,
|
||
"reason": "circuit_open",
|
||
"open_remaining_seconds": max(0, int(circuit_until - now_ts)),
|
||
"circuit_state": circuit_state,
|
||
}
|
||
|
||
# 熔断窗口已过,放一个半开探测请求:
|
||
# 1. 只允许一个探测请求进入,避免刚恢复时被瞬时并发打爆;
|
||
# 2. 成功则后续 record_success 会关闭熔断并清零计数;
|
||
# 3. 失败则 record_failure 会重新打开熔断窗口。
|
||
if circuit_state == "open" and circuit_until <= now_ts:
|
||
record["circuit_state"] = "half_open"
|
||
record["half_open_in_flight"] = True
|
||
record["circuit_until"] = now_ts + recovery_seconds
|
||
return {
|
||
"allowed": True,
|
||
"reason": "half_open_probe",
|
||
"open_remaining_seconds": 0,
|
||
"circuit_state": "half_open",
|
||
}
|
||
|
||
# 半开状态下,如果已经有探测请求在飞,就继续拒绝后续请求。
|
||
if circuit_state == "half_open":
|
||
if bool(record.get("half_open_in_flight", False)):
|
||
return {
|
||
"allowed": False,
|
||
"reason": "half_open_probe_in_flight",
|
||
"open_remaining_seconds": max(0, int(float(record.get("circuit_until") or 0.0) - now_ts)),
|
||
"circuit_state": "half_open",
|
||
}
|
||
record["half_open_in_flight"] = True
|
||
return {
|
||
"allowed": True,
|
||
"reason": "half_open_probe",
|
||
"open_remaining_seconds": 0,
|
||
"circuit_state": "half_open",
|
||
}
|
||
|
||
return {
|
||
"allowed": True,
|
||
"reason": "closed",
|
||
"open_remaining_seconds": 0,
|
||
"circuit_state": "closed",
|
||
}
|
||
|
||
def record_plugin_execution_success(
|
||
self,
|
||
plugin: PluginInterface,
|
||
*,
|
||
process_time_ms: float,
|
||
) -> None:
|
||
"""记录插件一次成功执行。"""
|
||
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
|
||
now_ts = time.time()
|
||
|
||
with self._guard_lock:
|
||
record = self._get_or_create_plugin_guard_record(module_name)
|
||
record["success_count_total"] = int(record.get("success_count_total", 0) or 0) + 1
|
||
record["consecutive_failures"] = 0
|
||
record["consecutive_timeouts"] = 0
|
||
record["last_success_at"] = now_ts
|
||
record["last_process_time_ms"] = float(process_time_ms or 0.0)
|
||
record["last_failure_type"] = ""
|
||
record["last_error_message"] = ""
|
||
record["last_timeout_seconds"] = 0
|
||
record["half_open_in_flight"] = False
|
||
# 半开探测成功后立即闭合熔断:
|
||
# 1. 说明插件已经至少恢复到可以完成一次请求;
|
||
# 2. 这里不保留历史 consecutive_failures,避免恢复后仍长期背着旧债;
|
||
# 3. 同时把 open 时间清空,后台能立即看到状态回归 closed。
|
||
record["circuit_state"] = "closed"
|
||
record["circuit_opened_at"] = 0.0
|
||
record["circuit_until"] = 0.0
|
||
|
||
def record_plugin_execution_failure(
|
||
self,
|
||
plugin: PluginInterface,
|
||
*,
|
||
failure_type: str,
|
||
error_message: str,
|
||
process_time_ms: float,
|
||
timeout_seconds: int,
|
||
failure_threshold: int,
|
||
recovery_seconds: int,
|
||
) -> Dict[str, Any]:
|
||
"""记录插件一次失败执行,并根据连续失败次数决定是否打开熔断。"""
|
||
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
|
||
now_ts = time.time()
|
||
normalized_failure_type = str(failure_type or "error").strip().lower() or "error"
|
||
failure_threshold = max(int(failure_threshold or 0), 2)
|
||
recovery_seconds = max(int(recovery_seconds or 0), 30)
|
||
|
||
with self._guard_lock:
|
||
record = self._get_or_create_plugin_guard_record(module_name)
|
||
record["failure_count_total"] = int(record.get("failure_count_total", 0) or 0) + 1
|
||
record["consecutive_failures"] = int(record.get("consecutive_failures", 0) or 0) + 1
|
||
record["last_failure_at"] = now_ts
|
||
record["last_failure_type"] = normalized_failure_type
|
||
record["last_error_message"] = str(error_message or "").strip()
|
||
record["last_process_time_ms"] = float(process_time_ms or 0.0)
|
||
record["last_timeout_seconds"] = int(timeout_seconds or 0)
|
||
record["half_open_in_flight"] = False
|
||
|
||
if normalized_failure_type == "timeout":
|
||
record["timeout_count_total"] = int(record.get("timeout_count_total", 0) or 0) + 1
|
||
record["consecutive_timeouts"] = int(record.get("consecutive_timeouts", 0) or 0) + 1
|
||
else:
|
||
record["consecutive_timeouts"] = 0
|
||
|
||
should_open_circuit = (
|
||
str(record.get("circuit_state", "closed") or "closed").strip().lower() == "half_open"
|
||
or int(record.get("consecutive_failures", 0) or 0) >= failure_threshold
|
||
)
|
||
|
||
if should_open_circuit:
|
||
record["circuit_state"] = "open"
|
||
record["circuit_opened_at"] = now_ts
|
||
record["circuit_until"] = now_ts + recovery_seconds
|
||
else:
|
||
record["circuit_state"] = "closed"
|
||
|
||
return dict(record)
|
||
|
||
@staticmethod
|
||
def _is_sensitive_config_key(key: str) -> bool:
|
||
"""判断配置键是否属于敏感信息。"""
|
||
lowered_key = str(key or "").strip().lower()
|
||
return any(
|
||
keyword in lowered_key
|
||
for keyword in ["password", "secret", "token", "api_key", "apikey", "cookie", "client_secret"]
|
||
)
|
||
|
||
def _build_config_overview_from_mapping(
|
||
self,
|
||
config_obj: Optional[Dict[str, Any]],
|
||
*,
|
||
config_path: str,
|
||
file_exists: bool,
|
||
parse_ok: bool,
|
||
parse_error: str = "",
|
||
) -> Dict[str, Any]:
|
||
"""从配置对象构建统一的配置概览。"""
|
||
config_obj = dict(config_obj or {})
|
||
top_level_keys = list(config_obj.keys())
|
||
dict_section_names = []
|
||
enabled_sections = []
|
||
disabled_sections = []
|
||
sensitive_paths = []
|
||
|
||
def _walk_sensitive_fields(node, path: str) -> None:
|
||
if isinstance(node, dict):
|
||
for key, value in node.items():
|
||
next_path = f"{path}.{key}"
|
||
if isinstance(value, str) and self._is_sensitive_config_key(key) and str(value or "").strip():
|
||
sensitive_paths.append(next_path)
|
||
_walk_sensitive_fields(value, next_path)
|
||
return
|
||
if isinstance(node, list):
|
||
for index, value in enumerate(node):
|
||
_walk_sensitive_fields(value, f"{path}[{index}]")
|
||
|
||
for section_name, section_value in config_obj.items():
|
||
if not isinstance(section_value, dict):
|
||
continue
|
||
dict_section_names.append(section_name)
|
||
if "enable" not in section_value:
|
||
continue
|
||
if bool(section_value.get("enable", True)):
|
||
enabled_sections.append(section_name)
|
||
else:
|
||
disabled_sections.append(section_name)
|
||
|
||
_walk_sensitive_fields(config_obj, "config")
|
||
|
||
return {
|
||
"path": config_path,
|
||
"exists": bool(file_exists),
|
||
"parse_ok": bool(parse_ok),
|
||
"parse_error": str(parse_error or ""),
|
||
"top_level_keys": top_level_keys,
|
||
"top_level_key_count": len(top_level_keys),
|
||
"dict_section_names": dict_section_names,
|
||
"section_count": len(dict_section_names),
|
||
"enabled_sections": enabled_sections,
|
||
"enabled_section_count": len(enabled_sections),
|
||
"disabled_sections": disabled_sections,
|
||
"disabled_section_count": len(disabled_sections),
|
||
"sensitive_field_paths": sensitive_paths,
|
||
"sensitive_field_count": len(sensitive_paths),
|
||
}
|
||
|
||
def _read_plugin_config_overview(self, config_path: str) -> Dict[str, Any]:
|
||
"""读取插件配置文件并返回概览。"""
|
||
if not config_path:
|
||
return self._build_config_overview_from_mapping(
|
||
{},
|
||
config_path="",
|
||
file_exists=False,
|
||
parse_ok=False,
|
||
parse_error="配置路径为空",
|
||
)
|
||
|
||
if not os.path.exists(config_path):
|
||
return self._build_config_overview_from_mapping(
|
||
{},
|
||
config_path=config_path,
|
||
file_exists=False,
|
||
parse_ok=True,
|
||
)
|
||
|
||
try:
|
||
with open(config_path, "r", encoding="utf-8") as config_file:
|
||
config_obj = toml.load(config_file)
|
||
return self._build_config_overview_from_mapping(
|
||
config_obj,
|
||
config_path=config_path,
|
||
file_exists=True,
|
||
parse_ok=True,
|
||
)
|
||
except Exception as e:
|
||
return self._build_config_overview_from_mapping(
|
||
{},
|
||
config_path=config_path,
|
||
file_exists=True,
|
||
parse_ok=False,
|
||
parse_error=str(e),
|
||
)
|
||
|
||
@staticmethod
|
||
def _build_diagnostic(level: str, code: str, message: str) -> Dict[str, str]:
|
||
"""统一治理诊断项结构。"""
|
||
return {
|
||
"level": str(level or "").strip().lower() or "info",
|
||
"code": str(code or "").strip(),
|
||
"message": str(message or "").strip(),
|
||
}
|
||
|
||
def _collect_plugin_types(self, plugin: PluginInterface) -> List[str]:
|
||
"""识别插件能力类型。"""
|
||
plugin_types = []
|
||
if isinstance(plugin, MessagePluginInterface):
|
||
plugin_types.append("message")
|
||
if isinstance(plugin, ScheduledPluginInterface):
|
||
plugin_types.append("scheduled")
|
||
if not plugin_types:
|
||
plugin_types.append("generic")
|
||
return plugin_types
|
||
|
||
def _collect_plugin_commands(self, plugin: PluginInterface) -> List[str]:
|
||
"""统一读取插件声明的命令列表。"""
|
||
commands = getattr(plugin, "commands", []) or getattr(plugin, "_commands", []) or []
|
||
if isinstance(commands, (list, tuple, set)):
|
||
return [str(item).strip() for item in commands if str(item or "").strip()]
|
||
if isinstance(commands, str) and commands.strip():
|
||
return [commands.strip()]
|
||
return []
|
||
|
||
@staticmethod
|
||
def _dispatch_mode_label(mode: str) -> str:
|
||
"""把消息分发模式转换成后台可读中文。"""
|
||
normalized_mode = str(mode or "").strip().lower()
|
||
mapping = {
|
||
"sync": "前台同步",
|
||
"background": "后台任务",
|
||
"mixed": "混合模式",
|
||
"non_message": "非消息插件",
|
||
"unknown": "未知",
|
||
}
|
||
return mapping.get(normalized_mode, "未知")
|
||
|
||
def _build_dispatch_preview_message(self, plugin: PluginInterface, command: str = "") -> Dict[str, Any]:
|
||
"""构造用于后台预览分发模式的假消息。
|
||
|
||
说明:
|
||
1. 后台只需要知道“这个插件命中后大概率走前台还是后台”,不需要真实微信上下文;
|
||
2. 因此这里构造一份最小可用消息,尽量覆盖多数插件 `get_message_dispatch_mode()` 的读取字段;
|
||
3. 如果某些插件依赖更复杂上下文,后续会在调用层捕获异常并回退为 `unknown`,不会影响治理页整体可用性。
|
||
"""
|
||
command_text = str(command or "").strip()
|
||
command_prefix = str(getattr(plugin, "command_prefix", "") or "").strip()
|
||
content = command_text
|
||
if command_text and command_prefix and not command_text.startswith(command_prefix):
|
||
content = f"{command_prefix}{command_text}"
|
||
|
||
return {
|
||
"type": "dashboard_preview",
|
||
"content": content,
|
||
"sender": "dashboard_preview",
|
||
"roomid": "dashboard_preview_room",
|
||
"is_at": False,
|
||
"timestamp": 0,
|
||
"trace_id": "dashboard-preview",
|
||
"all_contacts": {},
|
||
"full_wx_msg": None,
|
||
"gbm": None,
|
||
"bot": None,
|
||
"revoke": None,
|
||
}
|
||
|
||
def _preview_plugin_dispatch_mode(self, plugin: PluginInterface, preview_message: Dict[str, Any]) -> str:
|
||
"""安全预览插件在给定消息下的分发模式。"""
|
||
try:
|
||
raw_mode = plugin.get_message_dispatch_mode(preview_message)
|
||
return MessagePluginInterface.normalize_message_dispatch_mode(raw_mode)
|
||
except Exception as e:
|
||
module_name = self._get_module_name_from_plugin(plugin) or getattr(plugin, "name", "unknown")
|
||
self.LOG.debug(f"插件分发模式预览失败: module={module_name}, error={e}")
|
||
return "unknown"
|
||
|
||
def _build_plugin_dispatch_summary(
|
||
self,
|
||
plugin: PluginInterface,
|
||
commands: Optional[List[str]] = None,
|
||
) -> Dict[str, Any]:
|
||
"""构建插件“前台 / 后台”执行方式摘要。
|
||
|
||
设计目标:
|
||
1. 后台插件治理页需要一眼看出消息插件是“前台同步”“后台任务”还是“按命令混合切换”;
|
||
2. 同一个插件里可能既有轻命令又有长命令,因此不能只看静态配置,还要做命令级预览;
|
||
3. 统一在快照层产出摘要后,列表、详情、移动端卡片都能直接复用,不必各写一套判断逻辑。
|
||
"""
|
||
if not isinstance(plugin, MessagePluginInterface):
|
||
return {
|
||
"mode": "non_message",
|
||
"label": self._dispatch_mode_label("non_message"),
|
||
"description": "该插件不参与消息主链路分发,不区分前台同步或后台任务。",
|
||
"is_message_plugin": False,
|
||
"supports_dynamic_dispatch": False,
|
||
"sync_command_count": 0,
|
||
"background_command_count": 0,
|
||
"unknown_command_count": 0,
|
||
"preview_failed_count": 0,
|
||
"sampled_command_count": 0,
|
||
"command_modes": [],
|
||
}
|
||
|
||
commands = list(commands or self._collect_plugin_commands(plugin))
|
||
preview_items: List[Tuple[str, Dict[str, Any]]] = []
|
||
if commands:
|
||
for command in commands:
|
||
preview_items.append((command, self._build_dispatch_preview_message(plugin, command)))
|
||
else:
|
||
# 没有显式命令声明的消息插件,至少用一条空消息预览默认分发模式:
|
||
# 1. 例如链接解析类插件可能靠正则命中,而不是 commands 数组;
|
||
# 2. 这类插件通常不会做命令级动态切换,空消息预览已经足够判断默认模式;
|
||
# 3. 后台仍会标记 sampled_command_count=0,避免用户误以为这里真的存在命令清单。
|
||
preview_items.append(("", self._build_dispatch_preview_message(plugin, "")))
|
||
|
||
mode_counters = {"sync": 0, "background": 0, "unknown": 0}
|
||
command_modes = []
|
||
for command, preview_message in preview_items:
|
||
preview_mode = self._preview_plugin_dispatch_mode(plugin, preview_message)
|
||
if preview_mode not in mode_counters:
|
||
preview_mode = "unknown"
|
||
mode_counters[preview_mode] += 1
|
||
|
||
if command:
|
||
command_modes.append(
|
||
{
|
||
"command": command,
|
||
"mode": preview_mode,
|
||
"label": self._dispatch_mode_label(preview_mode),
|
||
}
|
||
)
|
||
|
||
known_modes = []
|
||
if mode_counters["sync"] > 0:
|
||
known_modes.append("sync")
|
||
if mode_counters["background"] > 0:
|
||
known_modes.append("background")
|
||
|
||
if len(known_modes) > 1:
|
||
summary_mode = "mixed"
|
||
description = (
|
||
f"插件会按命令动态切换执行链路:前台同步 {mode_counters['sync']} 个,"
|
||
f"后台任务 {mode_counters['background']} 个。"
|
||
)
|
||
elif len(known_modes) == 1:
|
||
summary_mode = known_modes[0]
|
||
if known_modes[0] == "background":
|
||
if commands:
|
||
description = f"当前采样的 {len(commands)} 个命令均会转入后台任务池执行。"
|
||
else:
|
||
description = "该消息插件默认转入后台任务池执行。"
|
||
else:
|
||
if commands:
|
||
description = f"当前采样的 {len(commands)} 个命令均在前台消息链路同步执行。"
|
||
else:
|
||
description = "该消息插件默认在前台消息链路同步执行。"
|
||
else:
|
||
summary_mode = "unknown"
|
||
description = "插件已加载,但当前无法稳定预览其前台 / 后台执行方式。"
|
||
|
||
if mode_counters["unknown"] > 0:
|
||
description = (
|
||
f"{description} 另有 {mode_counters['unknown']} 条预览样本无法识别,"
|
||
"可能依赖更完整的运行时上下文。"
|
||
)
|
||
|
||
return {
|
||
"mode": summary_mode,
|
||
"label": self._dispatch_mode_label(summary_mode),
|
||
"description": description,
|
||
"is_message_plugin": True,
|
||
"supports_dynamic_dispatch": summary_mode == "mixed",
|
||
"sync_command_count": mode_counters["sync"],
|
||
"background_command_count": mode_counters["background"],
|
||
"unknown_command_count": mode_counters["unknown"],
|
||
"preview_failed_count": mode_counters["unknown"],
|
||
"sampled_command_count": len(commands),
|
||
"command_modes": command_modes,
|
||
}
|
||
|
||
def _build_governance_diagnostics(
|
||
self,
|
||
*,
|
||
plugin: Optional[PluginInterface],
|
||
module_name: str,
|
||
config_overview: Dict[str, Any],
|
||
runtime_record: Dict[str, Any],
|
||
guard_snapshot: Optional[Dict[str, Any]] = None,
|
||
) -> List[Dict[str, str]]:
|
||
"""根据插件元信息、配置和运行态生成治理诊断。"""
|
||
diagnostics = []
|
||
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
|
||
runtime_message = str(runtime_record.get("message", "") or "").strip()
|
||
guard_snapshot = dict(guard_snapshot or {})
|
||
|
||
if runtime_state == "load_failed":
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"error",
|
||
"load_failed",
|
||
runtime_message or f"插件模块 `{module_name}` 加载失败,请先排查导入、初始化或依赖问题。",
|
||
)
|
||
)
|
||
elif runtime_state == "disabled_by_config":
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"info",
|
||
"disabled_by_config",
|
||
runtime_message or "插件已在配置中禁用,当前未进入运行态。",
|
||
)
|
||
)
|
||
|
||
if not config_overview.get("exists"):
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"info",
|
||
"config_missing",
|
||
"未发现 config.toml,当前插件将完全依赖默认参数或代码内置配置。",
|
||
)
|
||
)
|
||
elif not config_overview.get("parse_ok"):
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"error",
|
||
"config_parse_failed",
|
||
f"配置文件解析失败:{config_overview.get('parse_error', '未知错误')}",
|
||
)
|
||
)
|
||
|
||
sensitive_count = int(config_overview.get("sensitive_field_count", 0) or 0)
|
||
if sensitive_count > 0:
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"warning",
|
||
"config_contains_sensitive_fields",
|
||
f"配置文件中检测到 {sensitive_count} 个敏感字段,建议逐步迁移到全局配置或环境变量。",
|
||
)
|
||
)
|
||
|
||
circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
|
||
consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
|
||
open_remaining_seconds = int(guard_snapshot.get("open_remaining_seconds", 0) or 0)
|
||
if circuit_state == "open":
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"error",
|
||
"circuit_open",
|
||
f"插件保护熔断已打开,预计 {open_remaining_seconds} 秒后再尝试半开恢复。",
|
||
)
|
||
)
|
||
elif circuit_state == "half_open":
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"warning",
|
||
"circuit_half_open",
|
||
"插件当前处于半开探测状态,正在等待恢复结果。",
|
||
)
|
||
)
|
||
elif consecutive_failures > 0:
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"info",
|
||
"recent_failures",
|
||
f"插件最近存在连续失败记录,当前连续失败次数为 {consecutive_failures}。",
|
||
)
|
||
)
|
||
|
||
if plugin is None:
|
||
return diagnostics
|
||
|
||
version = str(getattr(plugin, "version", "") or "").strip()
|
||
author = str(getattr(plugin, "author", "") or "").strip()
|
||
description = str(getattr(plugin, "description", "") or "").strip()
|
||
if not version or version.upper() == "N/A":
|
||
diagnostics.append(self._build_diagnostic("warning", "missing_version", "插件未声明版本号,不利于后续升级和兼容治理。"))
|
||
if not author or author.upper() == "N/A":
|
||
diagnostics.append(self._build_diagnostic("info", "missing_author", "插件未声明作者信息,后续定位维护人会比较困难。"))
|
||
if not description or description.upper() == "N/A":
|
||
diagnostics.append(self._build_diagnostic("info", "missing_description", "插件未声明描述信息,后台可读性较弱。"))
|
||
|
||
dependencies = list(getattr(plugin, "dependencies", []) or [])
|
||
for dependency_name in dependencies:
|
||
if dependency_name not in self.plugins:
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"warning",
|
||
"missing_dependency",
|
||
f"声明依赖插件 `{dependency_name}` 当前未加载,存在运行时能力缺失风险。",
|
||
)
|
||
)
|
||
|
||
if isinstance(plugin, MessagePluginInterface):
|
||
commands = self._collect_plugin_commands(plugin)
|
||
if not commands:
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"info",
|
||
"missing_commands",
|
||
"消息插件未声明命令列表,后台无法准确展示其触发入口。",
|
||
)
|
||
)
|
||
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
|
||
if not feature_key:
|
||
diagnostics.append(
|
||
self._build_diagnostic(
|
||
"info",
|
||
"missing_feature_key",
|
||
"消息插件未声明 feature_key,将无法纳入统一群级权限治理。",
|
||
)
|
||
)
|
||
|
||
return diagnostics
|
||
|
||
@staticmethod
|
||
def _summarize_governance_status(diagnostics: List[Dict[str, str]]) -> Dict[str, Any]:
|
||
"""把诊断列表汇总为后台更容易消费的治理状态。"""
|
||
level_priority = {"healthy": 0, "info": 1, "warning": 2, "error": 3}
|
||
level_counts = {"error": 0, "warning": 0, "info": 0}
|
||
governance_status = "healthy"
|
||
|
||
for item in diagnostics or []:
|
||
level = str(item.get("level", "") or "info").strip().lower()
|
||
if level in level_counts:
|
||
level_counts[level] += 1
|
||
if level_priority.get(level, 0) > level_priority.get(governance_status, 0):
|
||
governance_status = level
|
||
|
||
if governance_status == "info" and level_counts["warning"] == 0 and level_counts["error"] == 0:
|
||
governance_status = "healthy"
|
||
|
||
return {
|
||
"status": governance_status,
|
||
"error_count": level_counts["error"],
|
||
"warning_count": level_counts["warning"],
|
||
"info_count": level_counts["info"],
|
||
}
|
||
|
||
@staticmethod
|
||
def _format_runtime_timestamp(timestamp_value: Any) -> str:
|
||
"""把运行态中的 unix 时间戳转成后台可读文本。"""
|
||
try:
|
||
normalized = float(timestamp_value or 0.0)
|
||
except (TypeError, ValueError):
|
||
return ""
|
||
if normalized <= 0:
|
||
return ""
|
||
try:
|
||
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(normalized))
|
||
except (OverflowError, OSError, ValueError):
|
||
return ""
|
||
|
||
@staticmethod
|
||
def _safe_percent(numerator: Any, denominator: Any) -> float:
|
||
"""安全计算百分比,避免分母为空时抛异常。"""
|
||
try:
|
||
denominator_value = float(denominator or 0.0)
|
||
if denominator_value <= 0:
|
||
return 0.0
|
||
return round((float(numerator or 0.0) / denominator_value) * 100, 2)
|
||
except (TypeError, ValueError, ZeroDivisionError):
|
||
return 0.0
|
||
|
||
def _build_execution_summary(self, guard_snapshot: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""把执行保护记录转换成更适合后台页面展示的执行摘要。
|
||
|
||
设计考虑:
|
||
1. 原始 execution_guard 更偏底层状态,前端直接消费会充满规则判断;
|
||
2. 这里统一补出成功率、总执行次数、最近成功/失败时间、最近错误摘要;
|
||
3. 未来如果还要做“高风险插件排行”“慢插件排行”,也能直接复用该摘要。
|
||
"""
|
||
guard_snapshot = dict(guard_snapshot or {})
|
||
success_count_total = int(guard_snapshot.get("success_count_total", 0) or 0)
|
||
failure_count_total = int(guard_snapshot.get("failure_count_total", 0) or 0)
|
||
timeout_count_total = int(guard_snapshot.get("timeout_count_total", 0) or 0)
|
||
consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
|
||
consecutive_timeouts = int(guard_snapshot.get("consecutive_timeouts", 0) or 0)
|
||
last_process_time_ms = round(float(guard_snapshot.get("last_process_time_ms", 0.0) or 0.0), 2)
|
||
circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
|
||
last_error_message = str(guard_snapshot.get("last_error_message") or "").strip()
|
||
if len(last_error_message) > 240:
|
||
last_error_message = f"{last_error_message[:237]}..."
|
||
|
||
total_executions = success_count_total + failure_count_total
|
||
success_rate = self._safe_percent(success_count_total, total_executions)
|
||
timeout_rate = self._safe_percent(timeout_count_total, total_executions)
|
||
last_success_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_success_at"))
|
||
last_failure_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_failure_at"))
|
||
|
||
status = "info"
|
||
summary = "暂无执行样本"
|
||
if total_executions > 0:
|
||
status = "healthy"
|
||
summary = (
|
||
f"累计执行 {total_executions} 次,成功率 {success_rate}%,"
|
||
f"最近耗时 {last_process_time_ms}ms"
|
||
)
|
||
|
||
# 熔断打开是最明确的高风险信号,应优先标记为 error。
|
||
if circuit_state == "open":
|
||
status = "error"
|
||
summary = (
|
||
f"插件当前处于熔断中,连续失败 {consecutive_failures} 次,"
|
||
f"恢复剩余 {int(guard_snapshot.get('open_remaining_seconds', 0) or 0)}s"
|
||
)
|
||
elif failure_count_total > 0 or timeout_count_total > 0 or consecutive_failures > 0 or consecutive_timeouts > 0:
|
||
status = "warning"
|
||
summary = (
|
||
f"累计失败 {failure_count_total} 次,超时 {timeout_count_total} 次,"
|
||
f"成功率 {success_rate}%"
|
||
)
|
||
|
||
return {
|
||
"status": status,
|
||
"summary": summary,
|
||
"total_executions": total_executions,
|
||
"success_count_total": success_count_total,
|
||
"failure_count_total": failure_count_total,
|
||
"timeout_count_total": timeout_count_total,
|
||
"success_rate": success_rate,
|
||
"timeout_rate": timeout_rate,
|
||
"consecutive_failures": consecutive_failures,
|
||
"consecutive_timeouts": consecutive_timeouts,
|
||
"last_process_time_ms": last_process_time_ms,
|
||
"last_success_at_text": last_success_at_text,
|
||
"last_failure_at_text": last_failure_at_text,
|
||
"last_error_message": last_error_message,
|
||
"last_failure_type": str(guard_snapshot.get("last_failure_type") or "").strip(),
|
||
"last_timeout_seconds": int(guard_snapshot.get("last_timeout_seconds", 0) or 0),
|
||
"circuit_state": circuit_state,
|
||
"open_remaining_seconds": int(guard_snapshot.get("open_remaining_seconds", 0) or 0),
|
||
}
|
||
|
||
def _build_plugin_snapshot(self, plugin: PluginInterface) -> Dict[str, Any]:
|
||
"""为已加载插件生成标准治理快照。"""
|
||
module_name = self._get_module_name_from_plugin(plugin) or "unknown"
|
||
runtime_record = self._get_module_runtime_state(module_name)
|
||
guard_snapshot = self.get_plugin_guard_snapshot(module_name)
|
||
execution_summary = self._build_execution_summary(guard_snapshot)
|
||
config_path = plugin.get_config_path()
|
||
config_overview = self._read_plugin_config_overview(config_path)
|
||
commands = self._collect_plugin_commands(plugin)
|
||
dispatch_summary = self._build_plugin_dispatch_summary(plugin, commands)
|
||
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
|
||
feature_description = str(getattr(plugin, "feature_description", "") or "").strip()
|
||
governance_diagnostics = self._build_governance_diagnostics(
|
||
plugin=plugin,
|
||
module_name=module_name,
|
||
config_overview=config_overview,
|
||
runtime_record=runtime_record,
|
||
guard_snapshot=guard_snapshot,
|
||
)
|
||
governance_summary = self._summarize_governance_status(governance_diagnostics)
|
||
|
||
return {
|
||
"name": plugin.name,
|
||
"module_name": module_name,
|
||
"version": getattr(plugin, "version", "N/A"),
|
||
"author": getattr(plugin, "author", "N/A"),
|
||
"description": getattr(plugin, "description", "N/A"),
|
||
"status": plugin.status.name if hasattr(plugin, "status") else "UNKNOWN",
|
||
"status_label": self._status_to_label(plugin.status.name if hasattr(plugin, "status") else "UNKNOWN"),
|
||
"plugin_types": self._collect_plugin_types(plugin),
|
||
"commands": commands,
|
||
"command_count": len(commands),
|
||
"command_prefix": getattr(plugin, "command_prefix", ""),
|
||
"dispatch_mode": dispatch_summary["mode"],
|
||
"dispatch_summary": dispatch_summary,
|
||
"dependencies": list(getattr(plugin, "dependencies", []) or []),
|
||
"feature_key": feature_key,
|
||
"feature_description": feature_description,
|
||
"supports_group_switch": bool(getattr(plugin, "feature", None)),
|
||
"config": getattr(plugin, "_config", {}),
|
||
"config_path": config_path,
|
||
"config_overview": config_overview,
|
||
"governance_diagnostics": governance_diagnostics,
|
||
"governance_status": governance_summary["status"],
|
||
"governance_error_count": governance_summary["error_count"],
|
||
"governance_warning_count": governance_summary["warning_count"],
|
||
"governance_info_count": governance_summary["info_count"],
|
||
"runtime_state": runtime_record.get("state", "loaded"),
|
||
"runtime_message": runtime_record.get("message", ""),
|
||
"execution_guard": guard_snapshot,
|
||
"execution_summary": execution_summary,
|
||
}
|
||
|
||
def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]:
|
||
"""为未成功加载的插件模块生成治理快照。"""
|
||
runtime_record = self._get_module_runtime_state(module_name)
|
||
guard_snapshot = self.get_plugin_guard_snapshot(module_name)
|
||
execution_summary = self._build_execution_summary(guard_snapshot)
|
||
config_path = os.path.join(self.plugin_dir, module_name, "config.toml")
|
||
if not os.path.exists(config_path):
|
||
config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml")
|
||
config_overview = self._read_plugin_config_overview(config_path)
|
||
governance_diagnostics = self._build_governance_diagnostics(
|
||
plugin=None,
|
||
module_name=module_name,
|
||
config_overview=config_overview,
|
||
runtime_record=runtime_record,
|
||
guard_snapshot=guard_snapshot,
|
||
)
|
||
governance_summary = self._summarize_governance_status(governance_diagnostics)
|
||
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
|
||
status = "DISCOVERED"
|
||
if runtime_state == "load_failed":
|
||
status = "ERROR"
|
||
elif runtime_state == "disabled_by_config":
|
||
status = "STOPPED"
|
||
dispatch_summary = {
|
||
"mode": "unknown",
|
||
"label": self._dispatch_mode_label("unknown"),
|
||
"description": "插件未成功加载,暂时无法判断其前台 / 后台执行方式。",
|
||
"is_message_plugin": False,
|
||
"supports_dynamic_dispatch": False,
|
||
"sync_command_count": 0,
|
||
"background_command_count": 0,
|
||
"unknown_command_count": 0,
|
||
"preview_failed_count": 0,
|
||
"sampled_command_count": 0,
|
||
"command_modes": [],
|
||
}
|
||
|
||
return {
|
||
"name": module_name,
|
||
"module_name": module_name,
|
||
"version": "N/A",
|
||
"author": "N/A",
|
||
"description": runtime_record.get("message", "") or "插件模块已发现,但当前未进入加载态。",
|
||
"status": status,
|
||
"status_label": self._status_to_label(status),
|
||
"plugin_types": ["unknown"],
|
||
"commands": [],
|
||
"command_count": 0,
|
||
"command_prefix": "",
|
||
"dispatch_mode": dispatch_summary["mode"],
|
||
"dispatch_summary": dispatch_summary,
|
||
"dependencies": [],
|
||
"feature_key": "",
|
||
"feature_description": "",
|
||
"supports_group_switch": False,
|
||
"config": {},
|
||
"config_path": config_path,
|
||
"config_overview": config_overview,
|
||
"governance_diagnostics": governance_diagnostics,
|
||
"governance_status": governance_summary["status"],
|
||
"governance_error_count": governance_summary["error_count"],
|
||
"governance_warning_count": governance_summary["warning_count"],
|
||
"governance_info_count": governance_summary["info_count"],
|
||
"runtime_state": runtime_state or "discovered",
|
||
"runtime_message": runtime_record.get("message", ""),
|
||
"execution_guard": guard_snapshot,
|
||
"execution_summary": execution_summary,
|
||
}
|
||
|
||
@staticmethod
|
||
def _normalize_snapshot_dependency_key(value: Any) -> str:
|
||
"""把依赖引用统一规整成可匹配的 key。"""
|
||
return str(value or "").strip().lower()
|
||
|
||
def _enrich_dependency_relationships(self, snapshots: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""为插件快照补齐依赖关系摘要。
|
||
|
||
设计目标:
|
||
1. 后台既要看“我依赖谁”,也要看“谁依赖我”;
|
||
2. 依赖声明有可能写插件名,也可能写模块名,因此这里做双 key 兼容;
|
||
3. 缺失依赖需要单独产出,便于页面上做风险聚合与高亮。
|
||
"""
|
||
normalized_snapshots = [dict(item or {}) for item in (snapshots or [])]
|
||
lookup_by_key: Dict[str, Dict[str, Any]] = {}
|
||
|
||
for snapshot in normalized_snapshots:
|
||
module_name = str(snapshot.get("module_name") or "").strip()
|
||
display_name = str(snapshot.get("name") or "").strip()
|
||
if module_name:
|
||
lookup_by_key[self._normalize_snapshot_dependency_key(module_name)] = snapshot
|
||
if display_name:
|
||
lookup_by_key[self._normalize_snapshot_dependency_key(display_name)] = snapshot
|
||
|
||
for snapshot in normalized_snapshots:
|
||
snapshot["resolved_dependencies"] = []
|
||
snapshot["missing_dependencies"] = []
|
||
snapshot["dependent_plugins"] = []
|
||
|
||
for snapshot in normalized_snapshots:
|
||
dependencies = list(snapshot.get("dependencies", []) or [])
|
||
resolved_dependencies = []
|
||
missing_dependencies = []
|
||
|
||
for dependency_name in dependencies:
|
||
normalized_key = self._normalize_snapshot_dependency_key(dependency_name)
|
||
target_snapshot = lookup_by_key.get(normalized_key)
|
||
if target_snapshot:
|
||
dependency_row = {
|
||
"name": str(target_snapshot.get("name") or "").strip(),
|
||
"module_name": str(target_snapshot.get("module_name") or "").strip(),
|
||
"status": str(target_snapshot.get("status") or "").strip(),
|
||
"status_label": str(target_snapshot.get("status_label") or "").strip(),
|
||
"governance_status": str(target_snapshot.get("governance_status") or "").strip(),
|
||
}
|
||
resolved_dependencies.append(dependency_row)
|
||
target_snapshot.setdefault("dependent_plugins", []).append(
|
||
{
|
||
"name": str(snapshot.get("name") or "").strip(),
|
||
"module_name": str(snapshot.get("module_name") or "").strip(),
|
||
"status": str(snapshot.get("status") or "").strip(),
|
||
"status_label": str(snapshot.get("status_label") or "").strip(),
|
||
"governance_status": str(snapshot.get("governance_status") or "").strip(),
|
||
}
|
||
)
|
||
else:
|
||
missing_dependencies.append(
|
||
{
|
||
"name": str(dependency_name or "").strip(),
|
||
}
|
||
)
|
||
|
||
snapshot["resolved_dependencies"] = resolved_dependencies
|
||
snapshot["missing_dependencies"] = missing_dependencies
|
||
snapshot["dependency_summary"] = {
|
||
"declared_count": len(dependencies),
|
||
"resolved_count": len(resolved_dependencies),
|
||
"missing_count": len(missing_dependencies),
|
||
"dependent_count": len(snapshot.get("dependent_plugins", []) or []),
|
||
"has_missing": len(missing_dependencies) > 0,
|
||
}
|
||
|
||
for snapshot in normalized_snapshots:
|
||
dependent_plugins = list(snapshot.get("dependent_plugins", []) or [])
|
||
dependent_plugins.sort(key=lambda item: (str(item.get("name") or ""), str(item.get("module_name") or "")))
|
||
snapshot["dependent_plugins"] = dependent_plugins
|
||
|
||
return normalized_snapshots
|
||
|
||
@staticmethod
|
||
def _status_to_label(status: str) -> str:
|
||
"""把运行态状态码转换成中文展示文案。"""
|
||
status_map = {
|
||
"RUNNING": "运行中",
|
||
"STOPPED": "已停用",
|
||
"LOADED": "已加载",
|
||
"UNLOADED": "未加载",
|
||
"ERROR": "异常",
|
||
"DISCOVERED": "待处理",
|
||
"UNKNOWN": "未知",
|
||
}
|
||
return status_map.get(str(status or "").strip().upper(), "未知")
|
||
|
||
def get_plugin_snapshots(self) -> List[Dict[str, Any]]:
|
||
"""返回插件治理中心使用的统一快照列表。"""
|
||
snapshots = []
|
||
loaded_module_names = set()
|
||
discovered_module_names = set(self.discover_plugins())
|
||
|
||
for plugin in self.plugins.values():
|
||
snapshot = self._build_plugin_snapshot(plugin)
|
||
snapshots.append(snapshot)
|
||
loaded_module_names.add(snapshot["module_name"])
|
||
|
||
# 这里把“目录已存在但插件未成功加载”的模块也补进列表:
|
||
# 1. 否则后台只能看到成功插件,看不到真正需要排查的失败模块;
|
||
# 2. 这类插件往往正是治理中心最该暴露的问题;
|
||
# 3. 统一补成快照后,前端无需区分“已加载”与“未加载”两套数据源。
|
||
for module_name in sorted(discovered_module_names - loaded_module_names):
|
||
snapshots.append(self._build_unloaded_plugin_snapshot(module_name))
|
||
|
||
snapshots = self._enrich_dependency_relationships(snapshots)
|
||
|
||
snapshots.sort(
|
||
key=lambda item: (
|
||
0 if item.get("status") == "RUNNING" else 1,
|
||
0 if item.get("governance_status") == "error" else 1 if item.get("governance_status") == "warning" else 2,
|
||
str(item.get("module_name", "")),
|
||
)
|
||
)
|
||
return snapshots
|
||
|
||
def get_plugin_snapshot(self, name: str) -> Optional[Dict[str, Any]]:
|
||
"""按模块名或展示名获取单个插件治理快照。"""
|
||
target_name = str(name or "").strip()
|
||
if not target_name:
|
||
return None
|
||
|
||
# 详情页也统一走“完整治理快照列表”:
|
||
# 1. 列表页已经会补齐依赖关系、执行摘要、分发模式等增强字段;
|
||
# 2. 如果详情页单独重建快照,容易出现“列表里有,详情里没有”的字段不一致;
|
||
# 3. 这里直接复用同一套结果,保证前后端看到的是同一份治理视图。
|
||
for snapshot in self.get_plugin_snapshots():
|
||
if snapshot.get("module_name") == target_name or snapshot.get("name") == target_name:
|
||
return snapshot
|
||
return None
|
||
|
||
def set_system_context(self, context: Dict[str, Any]):
|
||
"""
|
||
设置系统上下文
|
||
|
||
Args:
|
||
context: 系统上下文
|
||
"""
|
||
self.system_context = context
|
||
bot = context.get("bot")
|
||
if bot is not None:
|
||
self.current_bot = bot
|
||
|
||
def _build_module_file_state(self, module_name: str) -> Optional[Dict[str, float]]:
|
||
"""
|
||
构建模块的文件状态快照,用于检测文件变更
|
||
"""
|
||
plugin_folder = os.path.join(self.plugin_dir, module_name)
|
||
file_state: Dict[str, float] = {}
|
||
|
||
if os.path.isdir(plugin_folder):
|
||
for root, _, files in os.walk(plugin_folder):
|
||
for filename in files:
|
||
if not (filename.endswith(".py") or filename.endswith(".toml")):
|
||
continue
|
||
file_path = os.path.join(root, filename)
|
||
try:
|
||
file_state[os.path.abspath(file_path)] = os.path.getmtime(file_path)
|
||
except OSError:
|
||
continue
|
||
return file_state if file_state else None
|
||
|
||
single_file = os.path.join(self.plugin_dir, f"{module_name}.py")
|
||
if os.path.exists(single_file):
|
||
try:
|
||
file_state[os.path.abspath(single_file)] = os.path.getmtime(single_file)
|
||
except OSError:
|
||
return None
|
||
return file_state
|
||
|
||
return None
|
||
|
||
def _refresh_module_file_state(self, module_name: str):
|
||
state = self._build_module_file_state(module_name)
|
||
if state is None:
|
||
self._module_file_state.pop(module_name, None)
|
||
else:
|
||
self._module_file_state[module_name] = state
|
||
|
||
def _inject_bot_to_plugin(self, plugin: PluginInterface):
|
||
bot = self.current_bot or self.system_context.get("bot")
|
||
if not bot:
|
||
return
|
||
if hasattr(plugin, "set_bot"):
|
||
try:
|
||
plugin.set_bot(bot)
|
||
except Exception as e:
|
||
self.LOG.error(f"自动注入 bot 到插件 {plugin.name} 失败: {e}")
|
||
|
||
def start_hot_reload_watcher(self, interval_seconds: float = 60.0):
|
||
"""
|
||
启动插件目录变更监听线程(轮询)
|
||
|
||
Args:
|
||
interval_seconds: 轮询间隔秒数,默认 60 秒
|
||
"""
|
||
with self._watcher_lock:
|
||
if self._watcher_thread and self._watcher_thread.is_alive():
|
||
self.LOG.debug("PluginManager:热加载监听线程已运行,跳过重复启动")
|
||
return
|
||
|
||
self._watcher_interval = max(float(interval_seconds), 0.5)
|
||
self._watcher_stop_event.clear()
|
||
|
||
# 初始化快照
|
||
for module_name in self.discover_plugins():
|
||
self._refresh_module_file_state(module_name)
|
||
|
||
self._watcher_thread = threading.Thread(
|
||
target=self._hot_reload_watch_loop,
|
||
name="plugin-hot-reload-watcher",
|
||
daemon=True,
|
||
)
|
||
self._watcher_thread.start()
|
||
self.LOG.info(f"PluginManager:插件热加载监听已启动,轮询间隔 {self._watcher_interval}s")
|
||
|
||
def stop_hot_reload_watcher(self):
|
||
"""
|
||
停止插件目录变更监听线程
|
||
"""
|
||
with self._watcher_lock:
|
||
if not self._watcher_thread:
|
||
return
|
||
self._watcher_stop_event.set()
|
||
thread = self._watcher_thread
|
||
self._watcher_thread = None
|
||
|
||
if thread.is_alive():
|
||
thread.join(timeout=2.0)
|
||
self.LOG.info("PluginManager:插件热加载监听已停止")
|
||
|
||
def _hot_reload_watch_loop(self):
|
||
while not self._watcher_stop_event.is_set():
|
||
try:
|
||
discovered = set(self.discover_plugins())
|
||
loaded_modules = set(self.module_to_display.keys())
|
||
|
||
# 1. 新增插件 -> 自动加载并启动
|
||
new_modules = discovered - loaded_modules
|
||
for module_name in new_modules:
|
||
plugin = self.load_plugin(module_name)
|
||
if plugin:
|
||
self.start_plugin(plugin.name)
|
||
self.LOG.info(f"PluginManager:检测到新增插件 {module_name},已自动加载")
|
||
self._refresh_module_file_state(module_name)
|
||
|
||
# 2. 已删除插件 -> 自动卸载
|
||
removed_modules = loaded_modules - discovered
|
||
for module_name in removed_modules:
|
||
if self.unload_plugin(module_name):
|
||
self.LOG.info(f"PluginManager:检测到插件 {module_name} 已删除,已自动卸载")
|
||
self._module_file_state.pop(module_name, None)
|
||
|
||
# 3. 文件变更 -> 自动重载
|
||
for module_name in list(self.module_to_display.keys()):
|
||
new_state = self._build_module_file_state(module_name)
|
||
old_state = self._module_file_state.get(module_name)
|
||
if new_state is None:
|
||
continue
|
||
if old_state is None:
|
||
self._module_file_state[module_name] = new_state
|
||
continue
|
||
|
||
if new_state != old_state:
|
||
reloaded = self.reload_plugin(module_name)
|
||
if reloaded:
|
||
self.LOG.info(f"PluginManager:检测到插件 {module_name} 文件变更,已自动重载")
|
||
self._module_file_state[module_name] = new_state
|
||
else:
|
||
self.LOG.warning(f"PluginManager:插件 {module_name} 自动重载失败")
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"PluginManager:热加载监听异常: {e}", exc_info=True)
|
||
|
||
time.sleep(self._watcher_interval)
|
||
|
||
def discover_plugins(self) -> List[str]:
|
||
"""
|
||
发现可用插件
|
||
|
||
Returns:
|
||
插件模块名称列表(module_name)
|
||
"""
|
||
module_names = []
|
||
|
||
# 遍历插件目录
|
||
for item in os.listdir(self.plugin_dir):
|
||
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
|
||
# 检查是否有main.py文件
|
||
if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")):
|
||
module_names.append(item)
|
||
elif item.endswith(".py") and not item.startswith("__"):
|
||
# 单文件插件
|
||
module_names.append(item[:-3])
|
||
self.LOG.debug(f"PluginManager:发现插件模块: {module_names}")
|
||
return module_names
|
||
|
||
def load_all_plugins(self) -> Dict[str, PluginInterface]:
|
||
"""
|
||
加载所有插件
|
||
|
||
Returns:
|
||
插件实例字典,键为display_name
|
||
"""
|
||
module_names = self.discover_plugins()
|
||
loaded_modules = []
|
||
failed_modules = []
|
||
|
||
# 记录开始加载的插件列表
|
||
self.LOG.debug(f"PluginManager:开始加载插件列表: {module_names}")
|
||
|
||
for module_name in module_names:
|
||
try:
|
||
plugin = self.load_plugin(module_name)
|
||
if plugin:
|
||
loaded_modules.append(module_name)
|
||
# 自动启动插件
|
||
self.start_plugin(plugin.name)
|
||
else:
|
||
failed_modules.append(module_name)
|
||
except Exception as e:
|
||
self.LOG.error(f"PluginManager:加载插件模块 {module_name} 时发生错误: {str(e)}", exc_info=True)
|
||
failed_modules.append(module_name)
|
||
|
||
# 验证所有已加载插件的模块映射
|
||
for display_name, plugin in self.plugins.items():
|
||
try:
|
||
# 尝试从类模块路径获取模块名
|
||
module_name = self._get_module_name_from_plugin(plugin)
|
||
if module_name and module_name not in self.module_to_display:
|
||
self.module_to_display[module_name] = display_name
|
||
self.LOG.debug(f"PluginManager:补充缺失的模块映射 {module_name} -> {display_name}")
|
||
except Exception as e:
|
||
self.LOG.warning(f"PluginManager:获取插件 {display_name} 的模块名时出错: {e}")
|
||
# 使用插件显示名称作为备选模块名
|
||
folder_name = display_name.lower().replace(' ', '_')
|
||
if folder_name not in self.module_to_display:
|
||
self.module_to_display[folder_name] = display_name
|
||
self.LOG.debug(f"PluginManager:使用目录名作为模块映射 {folder_name} -> {display_name}")
|
||
|
||
# 检查是否有重复或无效的映射
|
||
invalid_mappings = []
|
||
for module_name, display_name in self.module_to_display.items():
|
||
if display_name not in self.plugins:
|
||
invalid_mappings.append(module_name)
|
||
self.LOG.warning(f"PluginManager:发现无效的模块映射 {module_name} -> {display_name}")
|
||
|
||
# 清理无效的映射
|
||
for module_name in invalid_mappings:
|
||
del self.module_to_display[module_name]
|
||
self.LOG.debug(f"PluginManager:清理无效的模块映射 {module_name}")
|
||
|
||
# 记录最终状态
|
||
self.LOG.debug(f"PluginManager:加载成功的插件模块: {loaded_modules}")
|
||
if failed_modules:
|
||
self.LOG.warning(f"PluginManager:加载失败的插件模块: {failed_modules}")
|
||
self.LOG.debug(f"PluginManager:当前已加载的插件实例: {list(self.plugins.keys())}")
|
||
self.LOG.debug(f"PluginManager:最终的模块映射关系: {self.module_to_display}")
|
||
|
||
return self.plugins
|
||
|
||
def _get_module_name_from_plugin(self, plugin: PluginInterface) -> Optional[str]:
|
||
"""
|
||
从插件实例获取模块名
|
||
|
||
Args:
|
||
plugin: 插件实例
|
||
|
||
Returns:
|
||
模块名,获取失败返回None
|
||
"""
|
||
try:
|
||
# 获取完整模块路径
|
||
full_module = plugin.__class__.__module__
|
||
module_parts = full_module.split('.')
|
||
|
||
# 处理不同的模块路径情况
|
||
if len(module_parts) >= 2 and module_parts[0] == 'plugins':
|
||
# 对于目录插件,模块名在第二个位置
|
||
return module_parts[1]
|
||
elif len(module_parts) >= 2:
|
||
# 其他情况,取倒数第二个
|
||
return module_parts[-2]
|
||
else:
|
||
# 单文件插件,直接返回
|
||
return full_module
|
||
except (IndexError, AttributeError) as e:
|
||
self.LOG.warning(f"获取插件 {plugin.name} 的模块名时出错: {e}")
|
||
return None
|
||
|
||
def load_plugin(self, module_name: str) -> Optional[PluginInterface]:
|
||
"""
|
||
加载插件
|
||
|
||
Args:
|
||
module_name: 插件模块名
|
||
|
||
Returns:
|
||
插件实例,加载失败返回None
|
||
"""
|
||
try:
|
||
# 检查是否已有同名模块的插件加载
|
||
for display_name, plugin in self.plugins.items():
|
||
try:
|
||
plugin_module_name = self._get_module_name_from_plugin(plugin)
|
||
if plugin_module_name == module_name:
|
||
self.LOG.debug(f"PluginManager:插件模块 {module_name} 已加载为 {display_name}")
|
||
# 确保模块名到显示名的映射存在
|
||
if module_name not in self.module_to_display:
|
||
self.module_to_display[module_name] = display_name
|
||
self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}")
|
||
self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。")
|
||
self.reset_plugin_guard_state(module_name)
|
||
self._inject_bot_to_plugin(plugin)
|
||
return plugin
|
||
except Exception as e:
|
||
self.LOG.warning(f"获取插件 {display_name} 的模块名时出错: {e}")
|
||
continue
|
||
|
||
# 确定插件路径和模块路径
|
||
plugin_path = os.path.join(self.plugin_dir, module_name)
|
||
|
||
# 加载模块
|
||
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")):
|
||
# 目录插件,从main.py加载
|
||
module_path = f"plugins.{module_name}.main"
|
||
try:
|
||
module = importlib.import_module(module_path)
|
||
self.plugin_modules[module_name] = module
|
||
except ImportError as e:
|
||
self.LOG.error(f"PluginManager:导入插件模块 {module_path} 失败: {e}")
|
||
self._record_module_runtime_state(module_name, "load_failed", f"导入插件模块失败: {e}")
|
||
return None
|
||
else:
|
||
# 单文件插件
|
||
plugin_path = self.plugin_dir
|
||
try:
|
||
module = importlib.import_module(module_name)
|
||
self.plugin_modules[module_name] = module
|
||
except ImportError as e:
|
||
self.LOG.error(f"PluginManager:导入单文件插件 {module_name} 失败: {e}")
|
||
self._record_module_runtime_state(module_name, "load_failed", f"导入单文件插件失败: {e}")
|
||
return None
|
||
|
||
# 查找插件类
|
||
plugin_class = None
|
||
for name, obj in inspect.getmembers(module):
|
||
if (inspect.isclass(obj) and
|
||
issubclass(obj, PluginInterface) and
|
||
obj != PluginInterface and
|
||
obj != MessagePluginInterface and
|
||
obj != ScheduledPluginInterface):
|
||
plugin_class = obj
|
||
break
|
||
|
||
# 如果没有找到插件类,尝试查找get_plugin函数
|
||
if plugin_class is None:
|
||
get_plugin_func = getattr(module, "get_plugin", None)
|
||
if callable(get_plugin_func):
|
||
plugin = get_plugin_func()
|
||
if isinstance(plugin, PluginInterface):
|
||
# 设置插件路径
|
||
plugin.set_plugin_path(plugin_path)
|
||
|
||
# 加载插件配置
|
||
if not plugin.load_config():
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败")
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
|
||
async_job.remove_jobs_by_owner(plugin)
|
||
return None
|
||
|
||
# 初始化插件
|
||
if not plugin.initialize(self.system_context):
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败")
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
|
||
async_job.remove_jobs_by_owner(plugin)
|
||
return None
|
||
self._inject_bot_to_plugin(plugin)
|
||
|
||
# 注册插件
|
||
PluginRegistry().register(plugin)
|
||
|
||
# 获取显示名称
|
||
display_name = plugin.name
|
||
|
||
# 存储插件实例
|
||
self.plugins[display_name] = plugin
|
||
|
||
# 添加模块名到显示名的映射
|
||
self.module_to_display[module_name] = display_name
|
||
self._refresh_module_file_state(module_name)
|
||
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
|
||
self.reset_plugin_guard_state(module_name)
|
||
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
|
||
|
||
return plugin
|
||
else:
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例")
|
||
self._record_module_runtime_state(module_name, "load_failed", "get_plugin() 未返回有效的插件实例。")
|
||
else:
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 中未找到有效的插件类或 get_plugin 函数")
|
||
self._record_module_runtime_state(module_name, "load_failed", "未找到有效的插件类或 get_plugin 函数。")
|
||
return None
|
||
|
||
# 实例化插件
|
||
plugin = plugin_class()
|
||
plugin.status = PluginStatus.LOADED
|
||
|
||
# 设置插件路径
|
||
plugin.set_plugin_path(plugin_path)
|
||
|
||
# 在load_plugin方法中,找到加载配置后的位置(约在第298行)
|
||
# 加载插件配置
|
||
if not plugin.load_config():
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败")
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
|
||
async_job.remove_jobs_by_owner(plugin)
|
||
return None
|
||
|
||
# 修改检查enable状态的代码:遍历所有配置节点
|
||
for section in plugin._config.values():
|
||
if isinstance(section, dict) and not section.get("enable", True):
|
||
self.LOG.debug(f"PluginManager:插件 {module_name} 已禁用,跳过加载")
|
||
self._record_module_runtime_state(module_name, "disabled_by_config", "插件在配置中已禁用,启动时已跳过加载。")
|
||
async_job.remove_jobs_by_owner(plugin)
|
||
return None
|
||
|
||
# 初始化插件
|
||
if not plugin.initialize(self.system_context):
|
||
self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败")
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
|
||
async_job.remove_jobs_by_owner(plugin)
|
||
return None
|
||
self._inject_bot_to_plugin(plugin)
|
||
|
||
# 注册插件
|
||
PluginRegistry().register(plugin)
|
||
|
||
# 获取显示名称
|
||
display_name = plugin.name
|
||
|
||
# 存储插件实例
|
||
self.plugins[display_name] = plugin
|
||
|
||
# 添加模块名到显示名的映射
|
||
self.module_to_display[module_name] = display_name
|
||
self._refresh_module_file_state(module_name)
|
||
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
|
||
self.reset_plugin_guard_state(module_name)
|
||
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
|
||
|
||
return plugin
|
||
|
||
except Exception as e:
|
||
plugin_obj = locals().get("plugin")
|
||
if plugin_obj is not None:
|
||
async_job.remove_jobs_by_owner(plugin_obj)
|
||
self._record_module_runtime_state(module_name, "load_failed", f"插件加载异常: {e}")
|
||
self.LOG.exception(f"PluginManager:加载插件模块 {module_name} 失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
def unload_plugin(self, name: str) -> bool:
|
||
"""
|
||
卸载插件
|
||
|
||
Args:
|
||
name: 插件名称(可以是模块名或显示名称)
|
||
|
||
Returns:
|
||
卸载是否成功
|
||
"""
|
||
# 查找插件
|
||
display_name, plugin = self.find_plugin_by_name(name)
|
||
|
||
if not plugin:
|
||
self.LOG.debug(f"PluginManager:插件 {name} 未加载")
|
||
return False
|
||
|
||
# 停止插件
|
||
if plugin.status == PluginStatus.RUNNING:
|
||
if not plugin.stop():
|
||
self.LOG.debug(f"PluginManager:停止插件 {display_name} 失败")
|
||
return False
|
||
plugin.status = PluginStatus.STOPPED # 确保状态更新
|
||
|
||
removed_jobs = async_job.remove_jobs_by_owner(plugin)
|
||
if removed_jobs:
|
||
self.LOG.debug(f"PluginManager:已移除插件 {display_name} 的定时任务 {removed_jobs} 个")
|
||
|
||
# 清理插件资源
|
||
if not plugin.cleanup():
|
||
self.LOG.debug(f"PluginManager:清理插件 {display_name} 资源失败")
|
||
return False
|
||
|
||
# 设置状态为未加载
|
||
plugin.status = PluginStatus.UNLOADED
|
||
|
||
# 注销插件
|
||
PluginRegistry().unregister(display_name)
|
||
|
||
# 获取模块名,用于清理映射
|
||
module_name = self._get_module_name_from_plugin(plugin)
|
||
if module_name and module_name in self.module_to_display:
|
||
del self.module_to_display[module_name]
|
||
self.LOG.debug(f"PluginManager:清理模块映射 {module_name} -> {display_name}")
|
||
self._module_file_state.pop(module_name, None)
|
||
|
||
# 移除插件实例
|
||
del self.plugins[display_name]
|
||
|
||
return True
|
||
|
||
def reload_plugin(self, name: str) -> Optional[PluginInterface]:
|
||
"""
|
||
重新加载插件
|
||
|
||
Args:
|
||
name: 插件名称(可以是模块名或显示名称)
|
||
|
||
Returns:
|
||
插件实例,重新加载失败返回None
|
||
"""
|
||
# 查找插件
|
||
display_name, plugin = self.find_plugin_by_name(name)
|
||
|
||
if not plugin:
|
||
self.LOG.debug(f"PluginManager:插件 {name} 未加载,无法重载")
|
||
return None
|
||
|
||
# 记录原插件状态和模块名
|
||
was_running = plugin.status == PluginStatus.RUNNING
|
||
module_name = self._get_module_name_from_plugin(plugin)
|
||
|
||
if not module_name:
|
||
self.LOG.error(f"无法获取插件 {display_name} 的模块名,重载失败")
|
||
return None
|
||
|
||
# 卸载插件
|
||
if not self.unload_plugin(display_name):
|
||
self.LOG.debug(f"卸载插件 {display_name} 失败,无法重载")
|
||
return None
|
||
|
||
# 重新导入模块
|
||
if module_name in self.plugin_modules:
|
||
try:
|
||
importlib.reload(self.plugin_modules[module_name])
|
||
except Exception as e:
|
||
self.LOG.debug(f"重新导入插件模块 {module_name} 失败: {e}")
|
||
return None
|
||
|
||
# 加载插件
|
||
plugin = self.load_plugin(module_name)
|
||
|
||
# 如果原来是运行状态,则重新启动
|
||
if plugin and was_running:
|
||
self.start_plugin(plugin.name)
|
||
|
||
return plugin
|
||
|
||
def start_plugin(self, name: str) -> bool:
|
||
"""
|
||
启动插件
|
||
|
||
Args:
|
||
name: 插件名称(可以是模块名或显示名称)
|
||
|
||
Returns:
|
||
启动是否成功
|
||
"""
|
||
# 查找插件
|
||
display_name, plugin = self.find_plugin_by_name(name)
|
||
|
||
if not plugin:
|
||
self.LOG.debug(f"PluginManager:插件 {name} 未加载")
|
||
return False
|
||
|
||
if plugin.status == PluginStatus.RUNNING:
|
||
self.LOG.debug(f"PluginManager:插件 {display_name} 已经在运行")
|
||
return True
|
||
|
||
if plugin.start():
|
||
plugin.status = PluginStatus.RUNNING
|
||
module_name = self._get_module_name_from_plugin(plugin) or name
|
||
self.reset_plugin_guard_state(module_name)
|
||
self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。")
|
||
self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为在运行")
|
||
return True
|
||
else:
|
||
plugin.status = PluginStatus.ERROR
|
||
module_name = self._get_module_name_from_plugin(plugin) or name
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件启动失败,状态已标记为异常。")
|
||
self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为异常")
|
||
return False
|
||
|
||
def stop_plugin(self, name: str) -> bool:
|
||
"""
|
||
停止插件
|
||
|
||
Args:
|
||
name: 插件名称(可以是模块名或显示名称)
|
||
|
||
Returns:
|
||
停止是否成功
|
||
"""
|
||
# 查找插件
|
||
display_name, plugin = self.find_plugin_by_name(name)
|
||
|
||
if not plugin:
|
||
self.LOG.debug(f"插件 {name} 未加载")
|
||
return False
|
||
|
||
if plugin.status != PluginStatus.RUNNING:
|
||
self.LOG.debug(f"插件 {display_name} 未在运行")
|
||
return True
|
||
|
||
if plugin.stop():
|
||
plugin.status = PluginStatus.STOPPED
|
||
module_name = self._get_module_name_from_plugin(plugin) or name
|
||
self._record_module_runtime_state(module_name, "stopped", "插件已手动停用。")
|
||
self.LOG.debug(f"插件 {display_name} 状态变更为已停止")
|
||
return True
|
||
else:
|
||
plugin.status = PluginStatus.ERROR
|
||
module_name = self._get_module_name_from_plugin(plugin) or name
|
||
self._record_module_runtime_state(module_name, "load_failed", "插件停用失败,状态已标记为异常。")
|
||
self.LOG.debug(f"插件 {display_name} 状态变更为异常")
|
||
return False
|
||
|
||
def shutdown_plugins(self) -> bool:
|
||
"""
|
||
卸载所有插件
|
||
|
||
Returns:
|
||
是否全部成功卸载
|
||
"""
|
||
success = True
|
||
self.stop_hot_reload_watcher()
|
||
# 创建插件名称的副本,因为在卸载过程中会修改self.plugins字典
|
||
display_names = list(self.plugins.keys())
|
||
|
||
for display_name in display_names:
|
||
if not self.unload_plugin(display_name):
|
||
self.LOG.error(f"卸载插件 {display_name} 失败")
|
||
success = False
|
||
|
||
# 清空插件模块字典
|
||
self.plugin_modules.clear()
|
||
self.module_to_display.clear()
|
||
|
||
# 确保插件字典为空
|
||
if self.plugins:
|
||
self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留")
|
||
success = False
|
||
|
||
return success
|
||
|
||
def find_plugin_by_name(self, name: str) -> Tuple[Optional[str], Optional[PluginInterface]]:
|
||
"""
|
||
根据插件名称或模块名查找插件
|
||
|
||
Args:
|
||
name: 插件名称或模块名
|
||
|
||
Returns:
|
||
(插件显示名称, 插件实例) 元组,未找到返回 (None, None)
|
||
"""
|
||
# 直接通过显示名称查找
|
||
if name in self.plugins:
|
||
return name, self.plugins[name]
|
||
|
||
# 通过模块名查找
|
||
if name in self.module_to_display:
|
||
display_name = self.module_to_display[name]
|
||
return display_name, self.plugins.get(display_name)
|
||
|
||
# 遍历所有插件查找匹配的模块名
|
||
for display_name, plugin in self.plugins.items():
|
||
try:
|
||
module_name = self._get_module_name_from_plugin(plugin)
|
||
if module_name and module_name == name:
|
||
# 顺便更新映射
|
||
if module_name not in self.module_to_display:
|
||
self.module_to_display[module_name] = display_name
|
||
self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}")
|
||
return display_name, plugin
|
||
|
||
# 不区分大小写比较
|
||
if module_name and module_name.lower() == name.lower():
|
||
if module_name not in self.module_to_display:
|
||
self.module_to_display[module_name] = display_name
|
||
return display_name, plugin
|
||
|
||
# 检查名称是否包含在模块名中(不区分大小写)
|
||
if module_name and name.lower() in module_name.lower():
|
||
return display_name, plugin
|
||
|
||
# 检查模块名是否包含在名称中(不区分大小写)
|
||
if module_name and module_name.lower() in name.lower():
|
||
return display_name, plugin
|
||
|
||
# 检查名称是否包含在显示名称中(不区分大小写)
|
||
if name.lower() in display_name.lower():
|
||
return display_name, plugin
|
||
except Exception:
|
||
continue
|
||
|
||
# 记录未找到插件的详细信息,帮助调试
|
||
self.LOG.warning(f"未找到插件: {name},当前已加载插件: {list(self.plugins.keys())}")
|
||
self.LOG.warning(f"模块映射: {self.module_to_display}")
|
||
|
||
return None, None
|
||
|
||
def inject_bot(self, bot: WechatAPIClient):
|
||
self.current_bot = bot
|
||
self.system_context["bot"] = bot
|
||
for name, plugin in self.plugins.items():
|
||
# self.LOG.debug(f"plugin name{name}, plugin: {plugin}")
|
||
if hasattr(plugin, "set_bot"):
|
||
try:
|
||
plugin.set_bot(bot)
|
||
self.LOG.debug(f"已成功注入 bot 到插件 {name}")
|
||
except Exception as e:
|
||
self.LOG.error(f"注入 bot 到插件 {name} 失败: {e}")
|