Files
abot/base/plugin_common/plugin_manager.py

1552 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib
import inspect
import os
import sys
import threading
import time
from typing import Dict, List, Any, Optional, Tuple
import toml
from loguru import logger
from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
from base.plugin_common.plugin_registry import PluginRegistry
from utils.decorator.async_job import async_job
from wechat_ipad import WechatAPIClient
class PluginManager:
"""插件管理器,负责加载、卸载、启动、停止插件"""
# 单例实例
_instance = None
@classmethod
def get_instance(cls, plugin_dir=None):
"""获取单例实例
Args:
plugin_dir: 插件目录,如果已有实例则忽略此参数
Returns:
PluginManager实例
"""
if cls._instance is None:
cls._instance = cls(plugin_dir=plugin_dir or "plugins")
return cls._instance
def __new__(cls, *args, **kwargs):
"""实现单例模式"""
if cls._instance is None:
cls._instance = super(PluginManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, plugin_dir: str = "plugins"):
"""
初始化插件管理器
Args:
plugin_dir: 插件目录
"""
self.plugin_dir = plugin_dir
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典键为display_name
self.plugin_modules = {} # 插件模块字典键为module_name
self.module_to_display = {} # 模块名到显示名的映射
self.system_context = {} # 系统上下文
self.current_bot: Optional[WechatAPIClient] = None
# 运行态记录用于给“插件治理中心”提供统一视图:
# 1. 不仅记录已成功加载的插件,也记录“加载失败 / 配置禁用 / 手动停用”等状态;
# 2. 后台治理页就不必再从日志里猜某个插件为什么没出现在列表里;
# 3. 这里按 module_name 维度存储,便于和 plugins 目录天然对齐。
self.plugin_runtime_records: Dict[str, Dict[str, Any]] = {}
# 插件执行保护记录:
# 1. 主要服务于消息插件的“超时保护 / 连续失败熔断 / 自动半开恢复探测”;
# 2. 记录同样按 module_name 存储,便于和治理快照直接关联;
# 3. 后续若扩展到定时任务插件,也可以沿用同一套结构。
self.plugin_guard_records: Dict[str, Dict[str, Any]] = {}
self._guard_lock = threading.RLock()
# 热加载相关
self._watcher_thread: Optional[threading.Thread] = None
self._watcher_stop_event = threading.Event()
# 默认每 60 秒扫描一次插件目录,降低线上资源消耗
self._watcher_interval = 60.0
self._module_file_state: Dict[str, Dict[str, float]] = {}
self._watcher_lock = threading.RLock()
self.LOG = logger
# 确保插件目录存在
if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir)
# 将插件目录添加到Python路径
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
def _record_module_runtime_state(
self,
module_name: str,
state: str,
message: str = "",
detail: Optional[Dict[str, Any]] = None,
) -> None:
"""记录插件模块的运行态快照。"""
if not module_name:
return
self.plugin_runtime_records[module_name] = {
"state": str(state or "").strip().lower() or "unknown",
"message": str(message or "").strip(),
"detail": dict(detail or {}),
"updated_at": float(time.time()),
}
def _get_module_runtime_state(self, module_name: str) -> Dict[str, Any]:
"""读取插件模块的最近一次运行态记录。"""
return dict(self.plugin_runtime_records.get(module_name, {}) or {})
def _build_default_guard_record(self) -> Dict[str, Any]:
"""构建插件执行保护的默认记录。"""
return {
"circuit_state": "closed",
"consecutive_failures": 0,
"consecutive_timeouts": 0,
"failure_count_total": 0,
"timeout_count_total": 0,
"success_count_total": 0,
"last_failure_at": 0.0,
"last_success_at": 0.0,
"last_error_message": "",
"last_failure_type": "",
"last_process_time_ms": 0.0,
"last_timeout_seconds": 0,
"circuit_opened_at": 0.0,
"circuit_until": 0.0,
"half_open_in_flight": False,
}
def _get_or_create_plugin_guard_record(self, module_name: str) -> Dict[str, Any]:
"""读取或初始化插件执行保护记录。"""
if module_name not in self.plugin_guard_records:
self.plugin_guard_records[module_name] = self._build_default_guard_record()
return self.plugin_guard_records[module_name]
def reset_plugin_guard_state(self, plugin_or_module_name) -> None:
"""重置插件执行保护状态。"""
module_name = ""
if isinstance(plugin_or_module_name, PluginInterface):
module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
else:
module_name = str(plugin_or_module_name or "").strip()
if not module_name:
return
with self._guard_lock:
self.plugin_guard_records[module_name] = self._build_default_guard_record()
def get_plugin_guard_snapshot(self, plugin_or_module_name) -> Dict[str, Any]:
"""读取适合后台展示的插件执行保护快照。"""
module_name = ""
if isinstance(plugin_or_module_name, PluginInterface):
module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
else:
module_name = str(plugin_or_module_name or "").strip()
if not module_name:
return self._build_default_guard_record()
with self._guard_lock:
record = dict(self._get_or_create_plugin_guard_record(module_name))
now_ts = time.time()
circuit_until = float(record.get("circuit_until") or 0.0)
open_remaining_seconds = max(0, int(circuit_until - now_ts)) if circuit_until > 0 else 0
record["open_remaining_seconds"] = open_remaining_seconds
return record
def try_acquire_plugin_execution(
self,
plugin: PluginInterface,
*,
recovery_seconds: int,
) -> Dict[str, Any]:
"""判断插件当前是否允许继续执行。"""
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
now_ts = time.time()
with self._guard_lock:
record = self._get_or_create_plugin_guard_record(module_name)
circuit_state = str(record.get("circuit_state", "closed") or "closed").strip().lower()
circuit_until = float(record.get("circuit_until") or 0.0)
recovery_seconds = max(int(recovery_seconds or 0), 30)
# 熔断打开期间直接拒绝新流量:
# 1. 目标是尽快切断持续失败插件对主链路的影响;
# 2. 若冷却时间未到,则所有新消息都应跳过该插件;
# 3. 返回剩余秒数,便于日志和后台展示更可读。
if circuit_state == "open" and circuit_until > now_ts:
return {
"allowed": False,
"reason": "circuit_open",
"open_remaining_seconds": max(0, int(circuit_until - now_ts)),
"circuit_state": circuit_state,
}
# 熔断窗口已过,放一个半开探测请求:
# 1. 只允许一个探测请求进入,避免刚恢复时被瞬时并发打爆;
# 2. 成功则后续 record_success 会关闭熔断并清零计数;
# 3. 失败则 record_failure 会重新打开熔断窗口。
if circuit_state == "open" and circuit_until <= now_ts:
record["circuit_state"] = "half_open"
record["half_open_in_flight"] = True
record["circuit_until"] = now_ts + recovery_seconds
return {
"allowed": True,
"reason": "half_open_probe",
"open_remaining_seconds": 0,
"circuit_state": "half_open",
}
# 半开状态下,如果已经有探测请求在飞,就继续拒绝后续请求。
if circuit_state == "half_open":
if bool(record.get("half_open_in_flight", False)):
return {
"allowed": False,
"reason": "half_open_probe_in_flight",
"open_remaining_seconds": max(0, int(float(record.get("circuit_until") or 0.0) - now_ts)),
"circuit_state": "half_open",
}
record["half_open_in_flight"] = True
return {
"allowed": True,
"reason": "half_open_probe",
"open_remaining_seconds": 0,
"circuit_state": "half_open",
}
return {
"allowed": True,
"reason": "closed",
"open_remaining_seconds": 0,
"circuit_state": "closed",
}
def record_plugin_execution_success(
self,
plugin: PluginInterface,
*,
process_time_ms: float,
) -> None:
"""记录插件一次成功执行。"""
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
now_ts = time.time()
with self._guard_lock:
record = self._get_or_create_plugin_guard_record(module_name)
record["success_count_total"] = int(record.get("success_count_total", 0) or 0) + 1
record["consecutive_failures"] = 0
record["consecutive_timeouts"] = 0
record["last_success_at"] = now_ts
record["last_process_time_ms"] = float(process_time_ms or 0.0)
record["last_failure_type"] = ""
record["last_error_message"] = ""
record["last_timeout_seconds"] = 0
record["half_open_in_flight"] = False
# 半开探测成功后立即闭合熔断:
# 1. 说明插件已经至少恢复到可以完成一次请求;
# 2. 这里不保留历史 consecutive_failures避免恢复后仍长期背着旧债
# 3. 同时把 open 时间清空,后台能立即看到状态回归 closed。
record["circuit_state"] = "closed"
record["circuit_opened_at"] = 0.0
record["circuit_until"] = 0.0
def record_plugin_execution_failure(
self,
plugin: PluginInterface,
*,
failure_type: str,
error_message: str,
process_time_ms: float,
timeout_seconds: int,
failure_threshold: int,
recovery_seconds: int,
) -> Dict[str, Any]:
"""记录插件一次失败执行,并根据连续失败次数决定是否打开熔断。"""
module_name = self._get_module_name_from_plugin(plugin) or plugin.name
now_ts = time.time()
normalized_failure_type = str(failure_type or "error").strip().lower() or "error"
failure_threshold = max(int(failure_threshold or 0), 2)
recovery_seconds = max(int(recovery_seconds or 0), 30)
with self._guard_lock:
record = self._get_or_create_plugin_guard_record(module_name)
record["failure_count_total"] = int(record.get("failure_count_total", 0) or 0) + 1
record["consecutive_failures"] = int(record.get("consecutive_failures", 0) or 0) + 1
record["last_failure_at"] = now_ts
record["last_failure_type"] = normalized_failure_type
record["last_error_message"] = str(error_message or "").strip()
record["last_process_time_ms"] = float(process_time_ms or 0.0)
record["last_timeout_seconds"] = int(timeout_seconds or 0)
record["half_open_in_flight"] = False
if normalized_failure_type == "timeout":
record["timeout_count_total"] = int(record.get("timeout_count_total", 0) or 0) + 1
record["consecutive_timeouts"] = int(record.get("consecutive_timeouts", 0) or 0) + 1
else:
record["consecutive_timeouts"] = 0
should_open_circuit = (
str(record.get("circuit_state", "closed") or "closed").strip().lower() == "half_open"
or int(record.get("consecutive_failures", 0) or 0) >= failure_threshold
)
if should_open_circuit:
record["circuit_state"] = "open"
record["circuit_opened_at"] = now_ts
record["circuit_until"] = now_ts + recovery_seconds
else:
record["circuit_state"] = "closed"
return dict(record)
@staticmethod
def _is_sensitive_config_key(key: str) -> bool:
"""判断配置键是否属于敏感信息。"""
lowered_key = str(key or "").strip().lower()
return any(
keyword in lowered_key
for keyword in ["password", "secret", "token", "api_key", "apikey", "cookie", "client_secret"]
)
def _build_config_overview_from_mapping(
self,
config_obj: Optional[Dict[str, Any]],
*,
config_path: str,
file_exists: bool,
parse_ok: bool,
parse_error: str = "",
) -> Dict[str, Any]:
"""从配置对象构建统一的配置概览。"""
config_obj = dict(config_obj or {})
top_level_keys = list(config_obj.keys())
dict_section_names = []
enabled_sections = []
disabled_sections = []
sensitive_paths = []
def _walk_sensitive_fields(node, path: str) -> None:
if isinstance(node, dict):
for key, value in node.items():
next_path = f"{path}.{key}"
if isinstance(value, str) and self._is_sensitive_config_key(key) and str(value or "").strip():
sensitive_paths.append(next_path)
_walk_sensitive_fields(value, next_path)
return
if isinstance(node, list):
for index, value in enumerate(node):
_walk_sensitive_fields(value, f"{path}[{index}]")
for section_name, section_value in config_obj.items():
if not isinstance(section_value, dict):
continue
dict_section_names.append(section_name)
if "enable" not in section_value:
continue
if bool(section_value.get("enable", True)):
enabled_sections.append(section_name)
else:
disabled_sections.append(section_name)
_walk_sensitive_fields(config_obj, "config")
return {
"path": config_path,
"exists": bool(file_exists),
"parse_ok": bool(parse_ok),
"parse_error": str(parse_error or ""),
"top_level_keys": top_level_keys,
"top_level_key_count": len(top_level_keys),
"dict_section_names": dict_section_names,
"section_count": len(dict_section_names),
"enabled_sections": enabled_sections,
"enabled_section_count": len(enabled_sections),
"disabled_sections": disabled_sections,
"disabled_section_count": len(disabled_sections),
"sensitive_field_paths": sensitive_paths,
"sensitive_field_count": len(sensitive_paths),
}
def _read_plugin_config_overview(self, config_path: str) -> Dict[str, Any]:
"""读取插件配置文件并返回概览。"""
if not config_path:
return self._build_config_overview_from_mapping(
{},
config_path="",
file_exists=False,
parse_ok=False,
parse_error="配置路径为空",
)
if not os.path.exists(config_path):
return self._build_config_overview_from_mapping(
{},
config_path=config_path,
file_exists=False,
parse_ok=True,
)
try:
with open(config_path, "r", encoding="utf-8") as config_file:
config_obj = toml.load(config_file)
return self._build_config_overview_from_mapping(
config_obj,
config_path=config_path,
file_exists=True,
parse_ok=True,
)
except Exception as e:
return self._build_config_overview_from_mapping(
{},
config_path=config_path,
file_exists=True,
parse_ok=False,
parse_error=str(e),
)
@staticmethod
def _build_diagnostic(level: str, code: str, message: str) -> Dict[str, str]:
"""统一治理诊断项结构。"""
return {
"level": str(level or "").strip().lower() or "info",
"code": str(code or "").strip(),
"message": str(message or "").strip(),
}
def _collect_plugin_types(self, plugin: PluginInterface) -> List[str]:
"""识别插件能力类型。"""
plugin_types = []
if isinstance(plugin, MessagePluginInterface):
plugin_types.append("message")
if isinstance(plugin, ScheduledPluginInterface):
plugin_types.append("scheduled")
if not plugin_types:
plugin_types.append("generic")
return plugin_types
def _collect_plugin_commands(self, plugin: PluginInterface) -> List[str]:
"""统一读取插件声明的命令列表。"""
commands = getattr(plugin, "commands", []) or getattr(plugin, "_commands", []) or []
if isinstance(commands, (list, tuple, set)):
return [str(item).strip() for item in commands if str(item or "").strip()]
if isinstance(commands, str) and commands.strip():
return [commands.strip()]
return []
def _build_governance_diagnostics(
self,
*,
plugin: Optional[PluginInterface],
module_name: str,
config_overview: Dict[str, Any],
runtime_record: Dict[str, Any],
guard_snapshot: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, str]]:
"""根据插件元信息、配置和运行态生成治理诊断。"""
diagnostics = []
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
runtime_message = str(runtime_record.get("message", "") or "").strip()
guard_snapshot = dict(guard_snapshot or {})
if runtime_state == "load_failed":
diagnostics.append(
self._build_diagnostic(
"error",
"load_failed",
runtime_message or f"插件模块 `{module_name}` 加载失败,请先排查导入、初始化或依赖问题。",
)
)
elif runtime_state == "disabled_by_config":
diagnostics.append(
self._build_diagnostic(
"info",
"disabled_by_config",
runtime_message or "插件已在配置中禁用,当前未进入运行态。",
)
)
if not config_overview.get("exists"):
diagnostics.append(
self._build_diagnostic(
"info",
"config_missing",
"未发现 config.toml当前插件将完全依赖默认参数或代码内置配置。",
)
)
elif not config_overview.get("parse_ok"):
diagnostics.append(
self._build_diagnostic(
"error",
"config_parse_failed",
f"配置文件解析失败:{config_overview.get('parse_error', '未知错误')}",
)
)
sensitive_count = int(config_overview.get("sensitive_field_count", 0) or 0)
if sensitive_count > 0:
diagnostics.append(
self._build_diagnostic(
"warning",
"config_contains_sensitive_fields",
f"配置文件中检测到 {sensitive_count} 个敏感字段,建议逐步迁移到全局配置或环境变量。",
)
)
circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
open_remaining_seconds = int(guard_snapshot.get("open_remaining_seconds", 0) or 0)
if circuit_state == "open":
diagnostics.append(
self._build_diagnostic(
"error",
"circuit_open",
f"插件保护熔断已打开,预计 {open_remaining_seconds} 秒后再尝试半开恢复。",
)
)
elif circuit_state == "half_open":
diagnostics.append(
self._build_diagnostic(
"warning",
"circuit_half_open",
"插件当前处于半开探测状态,正在等待恢复结果。",
)
)
elif consecutive_failures > 0:
diagnostics.append(
self._build_diagnostic(
"info",
"recent_failures",
f"插件最近存在连续失败记录,当前连续失败次数为 {consecutive_failures}",
)
)
if plugin is None:
return diagnostics
version = str(getattr(plugin, "version", "") or "").strip()
author = str(getattr(plugin, "author", "") or "").strip()
description = str(getattr(plugin, "description", "") or "").strip()
if not version or version.upper() == "N/A":
diagnostics.append(self._build_diagnostic("warning", "missing_version", "插件未声明版本号,不利于后续升级和兼容治理。"))
if not author or author.upper() == "N/A":
diagnostics.append(self._build_diagnostic("info", "missing_author", "插件未声明作者信息,后续定位维护人会比较困难。"))
if not description or description.upper() == "N/A":
diagnostics.append(self._build_diagnostic("info", "missing_description", "插件未声明描述信息,后台可读性较弱。"))
dependencies = list(getattr(plugin, "dependencies", []) or [])
for dependency_name in dependencies:
if dependency_name not in self.plugins:
diagnostics.append(
self._build_diagnostic(
"warning",
"missing_dependency",
f"声明依赖插件 `{dependency_name}` 当前未加载,存在运行时能力缺失风险。",
)
)
if isinstance(plugin, MessagePluginInterface):
commands = self._collect_plugin_commands(plugin)
if not commands:
diagnostics.append(
self._build_diagnostic(
"info",
"missing_commands",
"消息插件未声明命令列表,后台无法准确展示其触发入口。",
)
)
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
if not feature_key:
diagnostics.append(
self._build_diagnostic(
"info",
"missing_feature_key",
"消息插件未声明 feature_key将无法纳入统一群级权限治理。",
)
)
return diagnostics
@staticmethod
def _summarize_governance_status(diagnostics: List[Dict[str, str]]) -> Dict[str, Any]:
"""把诊断列表汇总为后台更容易消费的治理状态。"""
level_priority = {"healthy": 0, "info": 1, "warning": 2, "error": 3}
level_counts = {"error": 0, "warning": 0, "info": 0}
governance_status = "healthy"
for item in diagnostics or []:
level = str(item.get("level", "") or "info").strip().lower()
if level in level_counts:
level_counts[level] += 1
if level_priority.get(level, 0) > level_priority.get(governance_status, 0):
governance_status = level
if governance_status == "info" and level_counts["warning"] == 0 and level_counts["error"] == 0:
governance_status = "healthy"
return {
"status": governance_status,
"error_count": level_counts["error"],
"warning_count": level_counts["warning"],
"info_count": level_counts["info"],
}
@staticmethod
def _format_runtime_timestamp(timestamp_value: Any) -> str:
"""把运行态中的 unix 时间戳转成后台可读文本。"""
try:
normalized = float(timestamp_value or 0.0)
except (TypeError, ValueError):
return ""
if normalized <= 0:
return ""
try:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(normalized))
except (OverflowError, OSError, ValueError):
return ""
@staticmethod
def _safe_percent(numerator: Any, denominator: Any) -> float:
"""安全计算百分比,避免分母为空时抛异常。"""
try:
denominator_value = float(denominator or 0.0)
if denominator_value <= 0:
return 0.0
return round((float(numerator or 0.0) / denominator_value) * 100, 2)
except (TypeError, ValueError, ZeroDivisionError):
return 0.0
def _build_execution_summary(self, guard_snapshot: Dict[str, Any]) -> Dict[str, Any]:
"""把执行保护记录转换成更适合后台页面展示的执行摘要。
设计考虑:
1. 原始 execution_guard 更偏底层状态,前端直接消费会充满规则判断;
2. 这里统一补出成功率、总执行次数、最近成功/失败时间、最近错误摘要;
3. 未来如果还要做“高风险插件排行”“慢插件排行”,也能直接复用该摘要。
"""
guard_snapshot = dict(guard_snapshot or {})
success_count_total = int(guard_snapshot.get("success_count_total", 0) or 0)
failure_count_total = int(guard_snapshot.get("failure_count_total", 0) or 0)
timeout_count_total = int(guard_snapshot.get("timeout_count_total", 0) or 0)
consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
consecutive_timeouts = int(guard_snapshot.get("consecutive_timeouts", 0) or 0)
last_process_time_ms = round(float(guard_snapshot.get("last_process_time_ms", 0.0) or 0.0), 2)
circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
last_error_message = str(guard_snapshot.get("last_error_message") or "").strip()
if len(last_error_message) > 240:
last_error_message = f"{last_error_message[:237]}..."
total_executions = success_count_total + failure_count_total
success_rate = self._safe_percent(success_count_total, total_executions)
timeout_rate = self._safe_percent(timeout_count_total, total_executions)
last_success_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_success_at"))
last_failure_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_failure_at"))
status = "info"
summary = "暂无执行样本"
if total_executions > 0:
status = "healthy"
summary = (
f"累计执行 {total_executions} 次,成功率 {success_rate}%"
f"最近耗时 {last_process_time_ms}ms"
)
# 熔断打开是最明确的高风险信号,应优先标记为 error。
if circuit_state == "open":
status = "error"
summary = (
f"插件当前处于熔断中,连续失败 {consecutive_failures} 次,"
f"恢复剩余 {int(guard_snapshot.get('open_remaining_seconds', 0) or 0)}s"
)
elif failure_count_total > 0 or timeout_count_total > 0 or consecutive_failures > 0 or consecutive_timeouts > 0:
status = "warning"
summary = (
f"累计失败 {failure_count_total} 次,超时 {timeout_count_total} 次,"
f"成功率 {success_rate}%"
)
return {
"status": status,
"summary": summary,
"total_executions": total_executions,
"success_count_total": success_count_total,
"failure_count_total": failure_count_total,
"timeout_count_total": timeout_count_total,
"success_rate": success_rate,
"timeout_rate": timeout_rate,
"consecutive_failures": consecutive_failures,
"consecutive_timeouts": consecutive_timeouts,
"last_process_time_ms": last_process_time_ms,
"last_success_at_text": last_success_at_text,
"last_failure_at_text": last_failure_at_text,
"last_error_message": last_error_message,
"last_failure_type": str(guard_snapshot.get("last_failure_type") or "").strip(),
"last_timeout_seconds": int(guard_snapshot.get("last_timeout_seconds", 0) or 0),
"circuit_state": circuit_state,
"open_remaining_seconds": int(guard_snapshot.get("open_remaining_seconds", 0) or 0),
}
def _build_plugin_snapshot(self, plugin: PluginInterface) -> Dict[str, Any]:
"""为已加载插件生成标准治理快照。"""
module_name = self._get_module_name_from_plugin(plugin) or "unknown"
runtime_record = self._get_module_runtime_state(module_name)
guard_snapshot = self.get_plugin_guard_snapshot(module_name)
execution_summary = self._build_execution_summary(guard_snapshot)
config_path = plugin.get_config_path()
config_overview = self._read_plugin_config_overview(config_path)
commands = self._collect_plugin_commands(plugin)
feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
feature_description = str(getattr(plugin, "feature_description", "") or "").strip()
governance_diagnostics = self._build_governance_diagnostics(
plugin=plugin,
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
guard_snapshot=guard_snapshot,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
return {
"name": plugin.name,
"module_name": module_name,
"version": getattr(plugin, "version", "N/A"),
"author": getattr(plugin, "author", "N/A"),
"description": getattr(plugin, "description", "N/A"),
"status": plugin.status.name if hasattr(plugin, "status") else "UNKNOWN",
"status_label": self._status_to_label(plugin.status.name if hasattr(plugin, "status") else "UNKNOWN"),
"plugin_types": self._collect_plugin_types(plugin),
"commands": commands,
"command_count": len(commands),
"command_prefix": getattr(plugin, "command_prefix", ""),
"dependencies": list(getattr(plugin, "dependencies", []) or []),
"feature_key": feature_key,
"feature_description": feature_description,
"supports_group_switch": bool(getattr(plugin, "feature", None)),
"config": getattr(plugin, "_config", {}),
"config_path": config_path,
"config_overview": config_overview,
"governance_diagnostics": governance_diagnostics,
"governance_status": governance_summary["status"],
"governance_error_count": governance_summary["error_count"],
"governance_warning_count": governance_summary["warning_count"],
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_record.get("state", "loaded"),
"runtime_message": runtime_record.get("message", ""),
"execution_guard": guard_snapshot,
"execution_summary": execution_summary,
}
def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]:
"""为未成功加载的插件模块生成治理快照。"""
runtime_record = self._get_module_runtime_state(module_name)
guard_snapshot = self.get_plugin_guard_snapshot(module_name)
execution_summary = self._build_execution_summary(guard_snapshot)
config_path = os.path.join(self.plugin_dir, module_name, "config.toml")
if not os.path.exists(config_path):
config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml")
config_overview = self._read_plugin_config_overview(config_path)
governance_diagnostics = self._build_governance_diagnostics(
plugin=None,
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
guard_snapshot=guard_snapshot,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
status = "DISCOVERED"
if runtime_state == "load_failed":
status = "ERROR"
elif runtime_state == "disabled_by_config":
status = "STOPPED"
return {
"name": module_name,
"module_name": module_name,
"version": "N/A",
"author": "N/A",
"description": runtime_record.get("message", "") or "插件模块已发现,但当前未进入加载态。",
"status": status,
"status_label": self._status_to_label(status),
"plugin_types": ["unknown"],
"commands": [],
"command_count": 0,
"command_prefix": "",
"dependencies": [],
"feature_key": "",
"feature_description": "",
"supports_group_switch": False,
"config": {},
"config_path": config_path,
"config_overview": config_overview,
"governance_diagnostics": governance_diagnostics,
"governance_status": governance_summary["status"],
"governance_error_count": governance_summary["error_count"],
"governance_warning_count": governance_summary["warning_count"],
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_state or "discovered",
"runtime_message": runtime_record.get("message", ""),
"execution_guard": guard_snapshot,
"execution_summary": execution_summary,
}
@staticmethod
def _status_to_label(status: str) -> str:
"""把运行态状态码转换成中文展示文案。"""
status_map = {
"RUNNING": "运行中",
"STOPPED": "已停用",
"LOADED": "已加载",
"UNLOADED": "未加载",
"ERROR": "异常",
"DISCOVERED": "待处理",
"UNKNOWN": "未知",
}
return status_map.get(str(status or "").strip().upper(), "未知")
def get_plugin_snapshots(self) -> List[Dict[str, Any]]:
"""返回插件治理中心使用的统一快照列表。"""
snapshots = []
loaded_module_names = set()
discovered_module_names = set(self.discover_plugins())
for plugin in self.plugins.values():
snapshot = self._build_plugin_snapshot(plugin)
snapshots.append(snapshot)
loaded_module_names.add(snapshot["module_name"])
# 这里把“目录已存在但插件未成功加载”的模块也补进列表:
# 1. 否则后台只能看到成功插件,看不到真正需要排查的失败模块;
# 2. 这类插件往往正是治理中心最该暴露的问题;
# 3. 统一补成快照后,前端无需区分“已加载”与“未加载”两套数据源。
for module_name in sorted(discovered_module_names - loaded_module_names):
snapshots.append(self._build_unloaded_plugin_snapshot(module_name))
snapshots.sort(
key=lambda item: (
0 if item.get("status") == "RUNNING" else 1,
0 if item.get("governance_status") == "error" else 1 if item.get("governance_status") == "warning" else 2,
str(item.get("module_name", "")),
)
)
return snapshots
def get_plugin_snapshot(self, name: str) -> Optional[Dict[str, Any]]:
"""按模块名或展示名获取单个插件治理快照。"""
target_name = str(name or "").strip()
if not target_name:
return None
display_name, plugin = self.find_plugin_by_name(target_name)
if plugin:
return self._build_plugin_snapshot(plugin)
for snapshot in self.get_plugin_snapshots():
if snapshot.get("module_name") == target_name or snapshot.get("name") == target_name:
return snapshot
return None
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
Args:
context: 系统上下文
"""
self.system_context = context
bot = context.get("bot")
if bot is not None:
self.current_bot = bot
def _build_module_file_state(self, module_name: str) -> Optional[Dict[str, float]]:
"""
构建模块的文件状态快照,用于检测文件变更
"""
plugin_folder = os.path.join(self.plugin_dir, module_name)
file_state: Dict[str, float] = {}
if os.path.isdir(plugin_folder):
for root, _, files in os.walk(plugin_folder):
for filename in files:
if not (filename.endswith(".py") or filename.endswith(".toml")):
continue
file_path = os.path.join(root, filename)
try:
file_state[os.path.abspath(file_path)] = os.path.getmtime(file_path)
except OSError:
continue
return file_state if file_state else None
single_file = os.path.join(self.plugin_dir, f"{module_name}.py")
if os.path.exists(single_file):
try:
file_state[os.path.abspath(single_file)] = os.path.getmtime(single_file)
except OSError:
return None
return file_state
return None
def _refresh_module_file_state(self, module_name: str):
state = self._build_module_file_state(module_name)
if state is None:
self._module_file_state.pop(module_name, None)
else:
self._module_file_state[module_name] = state
def _inject_bot_to_plugin(self, plugin: PluginInterface):
bot = self.current_bot or self.system_context.get("bot")
if not bot:
return
if hasattr(plugin, "set_bot"):
try:
plugin.set_bot(bot)
except Exception as e:
self.LOG.error(f"自动注入 bot 到插件 {plugin.name} 失败: {e}")
def start_hot_reload_watcher(self, interval_seconds: float = 60.0):
"""
启动插件目录变更监听线程(轮询)
Args:
interval_seconds: 轮询间隔秒数,默认 60 秒
"""
with self._watcher_lock:
if self._watcher_thread and self._watcher_thread.is_alive():
self.LOG.debug("PluginManager热加载监听线程已运行跳过重复启动")
return
self._watcher_interval = max(float(interval_seconds), 0.5)
self._watcher_stop_event.clear()
# 初始化快照
for module_name in self.discover_plugins():
self._refresh_module_file_state(module_name)
self._watcher_thread = threading.Thread(
target=self._hot_reload_watch_loop,
name="plugin-hot-reload-watcher",
daemon=True,
)
self._watcher_thread.start()
self.LOG.info(f"PluginManager插件热加载监听已启动轮询间隔 {self._watcher_interval}s")
def stop_hot_reload_watcher(self):
"""
停止插件目录变更监听线程
"""
with self._watcher_lock:
if not self._watcher_thread:
return
self._watcher_stop_event.set()
thread = self._watcher_thread
self._watcher_thread = None
if thread.is_alive():
thread.join(timeout=2.0)
self.LOG.info("PluginManager插件热加载监听已停止")
def _hot_reload_watch_loop(self):
while not self._watcher_stop_event.is_set():
try:
discovered = set(self.discover_plugins())
loaded_modules = set(self.module_to_display.keys())
# 1. 新增插件 -> 自动加载并启动
new_modules = discovered - loaded_modules
for module_name in new_modules:
plugin = self.load_plugin(module_name)
if plugin:
self.start_plugin(plugin.name)
self.LOG.info(f"PluginManager检测到新增插件 {module_name},已自动加载")
self._refresh_module_file_state(module_name)
# 2. 已删除插件 -> 自动卸载
removed_modules = loaded_modules - discovered
for module_name in removed_modules:
if self.unload_plugin(module_name):
self.LOG.info(f"PluginManager检测到插件 {module_name} 已删除,已自动卸载")
self._module_file_state.pop(module_name, None)
# 3. 文件变更 -> 自动重载
for module_name in list(self.module_to_display.keys()):
new_state = self._build_module_file_state(module_name)
old_state = self._module_file_state.get(module_name)
if new_state is None:
continue
if old_state is None:
self._module_file_state[module_name] = new_state
continue
if new_state != old_state:
reloaded = self.reload_plugin(module_name)
if reloaded:
self.LOG.info(f"PluginManager检测到插件 {module_name} 文件变更,已自动重载")
self._module_file_state[module_name] = new_state
else:
self.LOG.warning(f"PluginManager插件 {module_name} 自动重载失败")
except Exception as e:
self.LOG.error(f"PluginManager热加载监听异常: {e}", exc_info=True)
time.sleep(self._watcher_interval)
def discover_plugins(self) -> List[str]:
"""
发现可用插件
Returns:
插件模块名称列表(module_name)
"""
module_names = []
# 遍历插件目录
for item in os.listdir(self.plugin_dir):
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
# 检查是否有main.py文件
if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")):
module_names.append(item)
elif item.endswith(".py") and not item.startswith("__"):
# 单文件插件
module_names.append(item[:-3])
self.LOG.debug(f"PluginManager发现插件模块: {module_names}")
return module_names
def load_all_plugins(self) -> Dict[str, PluginInterface]:
"""
加载所有插件
Returns:
插件实例字典键为display_name
"""
module_names = self.discover_plugins()
loaded_modules = []
failed_modules = []
# 记录开始加载的插件列表
self.LOG.debug(f"PluginManager开始加载插件列表: {module_names}")
for module_name in module_names:
try:
plugin = self.load_plugin(module_name)
if plugin:
loaded_modules.append(module_name)
# 自动启动插件
self.start_plugin(plugin.name)
else:
failed_modules.append(module_name)
except Exception as e:
self.LOG.error(f"PluginManager加载插件模块 {module_name} 时发生错误: {str(e)}", exc_info=True)
failed_modules.append(module_name)
# 验证所有已加载插件的模块映射
for display_name, plugin in self.plugins.items():
try:
# 尝试从类模块路径获取模块名
module_name = self._get_module_name_from_plugin(plugin)
if module_name and module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager补充缺失的模块映射 {module_name} -> {display_name}")
except Exception as e:
self.LOG.warning(f"PluginManager获取插件 {display_name} 的模块名时出错: {e}")
# 使用插件显示名称作为备选模块名
folder_name = display_name.lower().replace(' ', '_')
if folder_name not in self.module_to_display:
self.module_to_display[folder_name] = display_name
self.LOG.debug(f"PluginManager使用目录名作为模块映射 {folder_name} -> {display_name}")
# 检查是否有重复或无效的映射
invalid_mappings = []
for module_name, display_name in self.module_to_display.items():
if display_name not in self.plugins:
invalid_mappings.append(module_name)
self.LOG.warning(f"PluginManager发现无效的模块映射 {module_name} -> {display_name}")
# 清理无效的映射
for module_name in invalid_mappings:
del self.module_to_display[module_name]
self.LOG.debug(f"PluginManager清理无效的模块映射 {module_name}")
# 记录最终状态
self.LOG.debug(f"PluginManager加载成功的插件模块: {loaded_modules}")
if failed_modules:
self.LOG.warning(f"PluginManager加载失败的插件模块: {failed_modules}")
self.LOG.debug(f"PluginManager当前已加载的插件实例: {list(self.plugins.keys())}")
self.LOG.debug(f"PluginManager最终的模块映射关系: {self.module_to_display}")
return self.plugins
def _get_module_name_from_plugin(self, plugin: PluginInterface) -> Optional[str]:
"""
从插件实例获取模块名
Args:
plugin: 插件实例
Returns:
模块名获取失败返回None
"""
try:
# 获取完整模块路径
full_module = plugin.__class__.__module__
module_parts = full_module.split('.')
# 处理不同的模块路径情况
if len(module_parts) >= 2 and module_parts[0] == 'plugins':
# 对于目录插件,模块名在第二个位置
return module_parts[1]
elif len(module_parts) >= 2:
# 其他情况,取倒数第二个
return module_parts[-2]
else:
# 单文件插件,直接返回
return full_module
except (IndexError, AttributeError) as e:
self.LOG.warning(f"获取插件 {plugin.name} 的模块名时出错: {e}")
return None
def load_plugin(self, module_name: str) -> Optional[PluginInterface]:
"""
加载插件
Args:
module_name: 插件模块名
Returns:
插件实例加载失败返回None
"""
try:
# 检查是否已有同名模块的插件加载
for display_name, plugin in self.plugins.items():
try:
plugin_module_name = self._get_module_name_from_plugin(plugin)
if plugin_module_name == module_name:
self.LOG.debug(f"PluginManager插件模块 {module_name} 已加载为 {display_name}")
# 确保模块名到显示名的映射存在
if module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager添加缺失的模块映射 {module_name} -> {display_name}")
self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。")
self.reset_plugin_guard_state(module_name)
self._inject_bot_to_plugin(plugin)
return plugin
except Exception as e:
self.LOG.warning(f"获取插件 {display_name} 的模块名时出错: {e}")
continue
# 确定插件路径和模块路径
plugin_path = os.path.join(self.plugin_dir, module_name)
# 加载模块
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")):
# 目录插件从main.py加载
module_path = f"plugins.{module_name}.main"
try:
module = importlib.import_module(module_path)
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入插件模块 {module_path} 失败: {e}")
self._record_module_runtime_state(module_name, "load_failed", f"导入插件模块失败: {e}")
return None
else:
# 单文件插件
plugin_path = self.plugin_dir
try:
module = importlib.import_module(module_name)
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入单文件插件 {module_name} 失败: {e}")
self._record_module_runtime_state(module_name, "load_failed", f"导入单文件插件失败: {e}")
return None
# 查找插件类
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginInterface) and
obj != PluginInterface and
obj != MessagePluginInterface and
obj != ScheduledPluginInterface):
plugin_class = obj
break
# 如果没有找到插件类尝试查找get_plugin函数
if plugin_class is None:
get_plugin_func = getattr(module, "get_plugin", None)
if callable(get_plugin_func):
plugin = get_plugin_func()
if isinstance(plugin, PluginInterface):
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件模块 {module_name} 加载配置失败")
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件模块 {module_name} 初始化失败")
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
# 注册插件
PluginRegistry().register(plugin)
# 获取显示名称
display_name = plugin.name
# 存储插件实例
self.plugins[display_name] = plugin
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
return plugin
else:
self.LOG.error(f"PluginManager插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例")
self._record_module_runtime_state(module_name, "load_failed", "get_plugin() 未返回有效的插件实例。")
else:
self.LOG.error(f"PluginManager插件模块 {module_name} 中未找到有效的插件类或 get_plugin 函数")
self._record_module_runtime_state(module_name, "load_failed", "未找到有效的插件类或 get_plugin 函数。")
return None
# 实例化插件
plugin = plugin_class()
plugin.status = PluginStatus.LOADED
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 在load_plugin方法中找到加载配置后的位置约在第298行
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件模块 {module_name} 加载配置失败")
self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
# 修改检查enable状态的代码遍历所有配置节点
for section in plugin._config.values():
if isinstance(section, dict) and not section.get("enable", True):
self.LOG.debug(f"PluginManager插件 {module_name} 已禁用,跳过加载")
self._record_module_runtime_state(module_name, "disabled_by_config", "插件在配置中已禁用,启动时已跳过加载。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件模块 {module_name} 初始化失败")
self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
# 注册插件
PluginRegistry().register(plugin)
# 获取显示名称
display_name = plugin.name
# 存储插件实例
self.plugins[display_name] = plugin
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager添加模块映射 {module_name} -> {display_name}")
return plugin
except Exception as e:
plugin_obj = locals().get("plugin")
if plugin_obj is not None:
async_job.remove_jobs_by_owner(plugin_obj)
self._record_module_runtime_state(module_name, "load_failed", f"插件加载异常: {e}")
self.LOG.exception(f"PluginManager加载插件模块 {module_name} 失败: {e}", exc_info=True)
return None
def unload_plugin(self, name: str) -> bool:
"""
卸载插件
Args:
name: 插件名称(可以是模块名或显示名称)
Returns:
卸载是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(name)
if not plugin:
self.LOG.debug(f"PluginManager插件 {name} 未加载")
return False
# 停止插件
if plugin.status == PluginStatus.RUNNING:
if not plugin.stop():
self.LOG.debug(f"PluginManager停止插件 {display_name} 失败")
return False
plugin.status = PluginStatus.STOPPED # 确保状态更新
removed_jobs = async_job.remove_jobs_by_owner(plugin)
if removed_jobs:
self.LOG.debug(f"PluginManager已移除插件 {display_name} 的定时任务 {removed_jobs}")
# 清理插件资源
if not plugin.cleanup():
self.LOG.debug(f"PluginManager清理插件 {display_name} 资源失败")
return False
# 设置状态为未加载
plugin.status = PluginStatus.UNLOADED
# 注销插件
PluginRegistry().unregister(display_name)
# 获取模块名,用于清理映射
module_name = self._get_module_name_from_plugin(plugin)
if module_name and module_name in self.module_to_display:
del self.module_to_display[module_name]
self.LOG.debug(f"PluginManager清理模块映射 {module_name} -> {display_name}")
self._module_file_state.pop(module_name, None)
# 移除插件实例
del self.plugins[display_name]
return True
def reload_plugin(self, name: str) -> Optional[PluginInterface]:
"""
重新加载插件
Args:
name: 插件名称(可以是模块名或显示名称)
Returns:
插件实例重新加载失败返回None
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(name)
if not plugin:
self.LOG.debug(f"PluginManager插件 {name} 未加载,无法重载")
return None
# 记录原插件状态和模块名
was_running = plugin.status == PluginStatus.RUNNING
module_name = self._get_module_name_from_plugin(plugin)
if not module_name:
self.LOG.error(f"无法获取插件 {display_name} 的模块名,重载失败")
return None
# 卸载插件
if not self.unload_plugin(display_name):
self.LOG.debug(f"卸载插件 {display_name} 失败,无法重载")
return None
# 重新导入模块
if module_name in self.plugin_modules:
try:
importlib.reload(self.plugin_modules[module_name])
except Exception as e:
self.LOG.debug(f"重新导入插件模块 {module_name} 失败: {e}")
return None
# 加载插件
plugin = self.load_plugin(module_name)
# 如果原来是运行状态,则重新启动
if plugin and was_running:
self.start_plugin(plugin.name)
return plugin
def start_plugin(self, name: str) -> bool:
"""
启动插件
Args:
name: 插件名称(可以是模块名或显示名称)
Returns:
启动是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(name)
if not plugin:
self.LOG.debug(f"PluginManager插件 {name} 未加载")
return False
if plugin.status == PluginStatus.RUNNING:
self.LOG.debug(f"PluginManager插件 {display_name} 已经在运行")
return True
if plugin.start():
plugin.status = PluginStatus.RUNNING
module_name = self._get_module_name_from_plugin(plugin) or name
self.reset_plugin_guard_state(module_name)
self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。")
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为在运行")
return True
else:
plugin.status = PluginStatus.ERROR
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "load_failed", "插件启动失败,状态已标记为异常。")
self.LOG.debug(f"PluginManager插件 {display_name} 状态变更为异常")
return False
def stop_plugin(self, name: str) -> bool:
"""
停止插件
Args:
name: 插件名称(可以是模块名或显示名称)
Returns:
停止是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(name)
if not plugin:
self.LOG.debug(f"插件 {name} 未加载")
return False
if plugin.status != PluginStatus.RUNNING:
self.LOG.debug(f"插件 {display_name} 未在运行")
return True
if plugin.stop():
plugin.status = PluginStatus.STOPPED
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "stopped", "插件已手动停用。")
self.LOG.debug(f"插件 {display_name} 状态变更为已停止")
return True
else:
plugin.status = PluginStatus.ERROR
module_name = self._get_module_name_from_plugin(plugin) or name
self._record_module_runtime_state(module_name, "load_failed", "插件停用失败,状态已标记为异常。")
self.LOG.debug(f"插件 {display_name} 状态变更为异常")
return False
def shutdown_plugins(self) -> bool:
"""
卸载所有插件
Returns:
是否全部成功卸载
"""
success = True
self.stop_hot_reload_watcher()
# 创建插件名称的副本因为在卸载过程中会修改self.plugins字典
display_names = list(self.plugins.keys())
for display_name in display_names:
if not self.unload_plugin(display_name):
self.LOG.error(f"卸载插件 {display_name} 失败")
success = False
# 清空插件模块字典
self.plugin_modules.clear()
self.module_to_display.clear()
# 确保插件字典为空
if self.plugins:
self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留")
success = False
return success
def find_plugin_by_name(self, name: str) -> Tuple[Optional[str], Optional[PluginInterface]]:
"""
根据插件名称或模块名查找插件
Args:
name: 插件名称或模块名
Returns:
(插件显示名称, 插件实例) 元组,未找到返回 (None, None)
"""
# 直接通过显示名称查找
if name in self.plugins:
return name, self.plugins[name]
# 通过模块名查找
if name in self.module_to_display:
display_name = self.module_to_display[name]
return display_name, self.plugins.get(display_name)
# 遍历所有插件查找匹配的模块名
for display_name, plugin in self.plugins.items():
try:
module_name = self._get_module_name_from_plugin(plugin)
if module_name and module_name == name:
# 顺便更新映射
if module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager添加缺失的模块映射 {module_name} -> {display_name}")
return display_name, plugin
# 不区分大小写比较
if module_name and module_name.lower() == name.lower():
if module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
return display_name, plugin
# 检查名称是否包含在模块名中(不区分大小写)
if module_name and name.lower() in module_name.lower():
return display_name, plugin
# 检查模块名是否包含在名称中(不区分大小写)
if module_name and module_name.lower() in name.lower():
return display_name, plugin
# 检查名称是否包含在显示名称中(不区分大小写)
if name.lower() in display_name.lower():
return display_name, plugin
except Exception:
continue
# 记录未找到插件的详细信息,帮助调试
self.LOG.warning(f"未找到插件: {name},当前已加载插件: {list(self.plugins.keys())}")
self.LOG.warning(f"模块映射: {self.module_to_display}")
return None, None
def inject_bot(self, bot: WechatAPIClient):
self.current_bot = bot
self.system_context["bot"] = bot
for name, plugin in self.plugins.items():
# self.LOG.debug(f"plugin name{name}, plugin: {plugin}")
if hasattr(plugin, "set_bot"):
try:
plugin.set_bot(bot)
self.LOG.debug(f"已成功注入 bot 到插件 {name}")
except Exception as e:
self.LOG.error(f"注入 bot 到插件 {name} 失败: {e}")