import importlib import inspect import os import sys import threading import time from typing import Dict, List, Any, Optional, Tuple from loguru import logger from base.plugin_common.plugin_interface import PluginInterface, PluginStatus from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.scheduled_plugin_interface import ScheduledPluginInterface from base.plugin_common.plugin_registry import PluginRegistry from utils.decorator.async_job import async_job from wechat_ipad import WechatAPIClient class PluginManager: """插件管理器,负责加载、卸载、启动、停止插件""" # 单例实例 _instance = None @classmethod def get_instance(cls, plugin_dir=None): """获取单例实例 Args: plugin_dir: 插件目录,如果已有实例则忽略此参数 Returns: PluginManager实例 """ if cls._instance is None: cls._instance = cls(plugin_dir=plugin_dir or "plugins") return cls._instance def __new__(cls, *args, **kwargs): """实现单例模式""" if cls._instance is None: cls._instance = super(PluginManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, plugin_dir: str = "plugins"): """ 初始化插件管理器 Args: plugin_dir: 插件目录 """ self.plugin_dir = plugin_dir self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典,键为display_name self.plugin_modules = {} # 插件模块字典,键为module_name self.module_to_display = {} # 模块名到显示名的映射 self.system_context = {} # 系统上下文 self.current_bot: Optional[WechatAPIClient] = None # 热加载相关 self._watcher_thread: Optional[threading.Thread] = None self._watcher_stop_event = threading.Event() # 默认每 60 秒扫描一次插件目录,降低线上资源消耗 self._watcher_interval = 60.0 self._module_file_state: Dict[str, Dict[str, float]] = {} self._watcher_lock = threading.RLock() self.LOG = logger # 确保插件目录存在 if not os.path.exists(self.plugin_dir): os.makedirs(self.plugin_dir) # 将插件目录添加到Python路径 if self.plugin_dir not in sys.path: sys.path.insert(0, self.plugin_dir) def set_system_context(self, context: Dict[str, Any]): """ 设置系统上下文 Args: context: 系统上下文 """ self.system_context = context bot = context.get("bot") if bot is not None: self.current_bot = bot def _build_module_file_state(self, module_name: str) -> Optional[Dict[str, float]]: """ 构建模块的文件状态快照,用于检测文件变更 """ plugin_folder = os.path.join(self.plugin_dir, module_name) file_state: Dict[str, float] = {} if os.path.isdir(plugin_folder): for root, _, files in os.walk(plugin_folder): for filename in files: if not (filename.endswith(".py") or filename.endswith(".toml")): continue file_path = os.path.join(root, filename) try: file_state[os.path.abspath(file_path)] = os.path.getmtime(file_path) except OSError: continue return file_state if file_state else None single_file = os.path.join(self.plugin_dir, f"{module_name}.py") if os.path.exists(single_file): try: file_state[os.path.abspath(single_file)] = os.path.getmtime(single_file) except OSError: return None return file_state return None def _refresh_module_file_state(self, module_name: str): state = self._build_module_file_state(module_name) if state is None: self._module_file_state.pop(module_name, None) else: self._module_file_state[module_name] = state def _inject_bot_to_plugin(self, plugin: PluginInterface): bot = self.current_bot or self.system_context.get("bot") if not bot: return if hasattr(plugin, "set_bot"): try: plugin.set_bot(bot) except Exception as e: self.LOG.error(f"自动注入 bot 到插件 {plugin.name} 失败: {e}") def start_hot_reload_watcher(self, interval_seconds: float = 60.0): """ 启动插件目录变更监听线程(轮询) Args: interval_seconds: 轮询间隔秒数,默认 60 秒 """ with self._watcher_lock: if self._watcher_thread and self._watcher_thread.is_alive(): self.LOG.debug("PluginManager:热加载监听线程已运行,跳过重复启动") return self._watcher_interval = max(float(interval_seconds), 0.5) self._watcher_stop_event.clear() # 初始化快照 for module_name in self.discover_plugins(): self._refresh_module_file_state(module_name) self._watcher_thread = threading.Thread( target=self._hot_reload_watch_loop, name="plugin-hot-reload-watcher", daemon=True, ) self._watcher_thread.start() self.LOG.info(f"PluginManager:插件热加载监听已启动,轮询间隔 {self._watcher_interval}s") def stop_hot_reload_watcher(self): """ 停止插件目录变更监听线程 """ with self._watcher_lock: if not self._watcher_thread: return self._watcher_stop_event.set() thread = self._watcher_thread self._watcher_thread = None if thread.is_alive(): thread.join(timeout=2.0) self.LOG.info("PluginManager:插件热加载监听已停止") def _hot_reload_watch_loop(self): while not self._watcher_stop_event.is_set(): try: discovered = set(self.discover_plugins()) loaded_modules = set(self.module_to_display.keys()) # 1. 新增插件 -> 自动加载并启动 new_modules = discovered - loaded_modules for module_name in new_modules: plugin = self.load_plugin(module_name) if plugin: self.start_plugin(plugin.name) self.LOG.info(f"PluginManager:检测到新增插件 {module_name},已自动加载") self._refresh_module_file_state(module_name) # 2. 已删除插件 -> 自动卸载 removed_modules = loaded_modules - discovered for module_name in removed_modules: if self.unload_plugin(module_name): self.LOG.info(f"PluginManager:检测到插件 {module_name} 已删除,已自动卸载") self._module_file_state.pop(module_name, None) # 3. 文件变更 -> 自动重载 for module_name in list(self.module_to_display.keys()): new_state = self._build_module_file_state(module_name) old_state = self._module_file_state.get(module_name) if new_state is None: continue if old_state is None: self._module_file_state[module_name] = new_state continue if new_state != old_state: reloaded = self.reload_plugin(module_name) if reloaded: self.LOG.info(f"PluginManager:检测到插件 {module_name} 文件变更,已自动重载") self._module_file_state[module_name] = new_state else: self.LOG.warning(f"PluginManager:插件 {module_name} 自动重载失败") except Exception as e: self.LOG.error(f"PluginManager:热加载监听异常: {e}", exc_info=True) time.sleep(self._watcher_interval) def discover_plugins(self) -> List[str]: """ 发现可用插件 Returns: 插件模块名称列表(module_name) """ module_names = [] # 遍历插件目录 for item in os.listdir(self.plugin_dir): if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"): # 检查是否有main.py文件 if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")): module_names.append(item) elif item.endswith(".py") and not item.startswith("__"): # 单文件插件 module_names.append(item[:-3]) self.LOG.debug(f"PluginManager:发现插件模块: {module_names}") return module_names def load_all_plugins(self) -> Dict[str, PluginInterface]: """ 加载所有插件 Returns: 插件实例字典,键为display_name """ module_names = self.discover_plugins() loaded_modules = [] failed_modules = [] # 记录开始加载的插件列表 self.LOG.debug(f"PluginManager:开始加载插件列表: {module_names}") for module_name in module_names: try: plugin = self.load_plugin(module_name) if plugin: loaded_modules.append(module_name) # 自动启动插件 self.start_plugin(plugin.name) else: failed_modules.append(module_name) except Exception as e: self.LOG.error(f"PluginManager:加载插件模块 {module_name} 时发生错误: {str(e)}", exc_info=True) failed_modules.append(module_name) # 验证所有已加载插件的模块映射 for display_name, plugin in self.plugins.items(): try: # 尝试从类模块路径获取模块名 module_name = self._get_module_name_from_plugin(plugin) if module_name and module_name not in self.module_to_display: self.module_to_display[module_name] = display_name self.LOG.debug(f"PluginManager:补充缺失的模块映射 {module_name} -> {display_name}") except Exception as e: self.LOG.warning(f"PluginManager:获取插件 {display_name} 的模块名时出错: {e}") # 使用插件显示名称作为备选模块名 folder_name = display_name.lower().replace(' ', '_') if folder_name not in self.module_to_display: self.module_to_display[folder_name] = display_name self.LOG.debug(f"PluginManager:使用目录名作为模块映射 {folder_name} -> {display_name}") # 检查是否有重复或无效的映射 invalid_mappings = [] for module_name, display_name in self.module_to_display.items(): if display_name not in self.plugins: invalid_mappings.append(module_name) self.LOG.warning(f"PluginManager:发现无效的模块映射 {module_name} -> {display_name}") # 清理无效的映射 for module_name in invalid_mappings: del self.module_to_display[module_name] self.LOG.debug(f"PluginManager:清理无效的模块映射 {module_name}") # 记录最终状态 self.LOG.debug(f"PluginManager:加载成功的插件模块: {loaded_modules}") if failed_modules: self.LOG.warning(f"PluginManager:加载失败的插件模块: {failed_modules}") self.LOG.debug(f"PluginManager:当前已加载的插件实例: {list(self.plugins.keys())}") self.LOG.debug(f"PluginManager:最终的模块映射关系: {self.module_to_display}") return self.plugins def _get_module_name_from_plugin(self, plugin: PluginInterface) -> Optional[str]: """ 从插件实例获取模块名 Args: plugin: 插件实例 Returns: 模块名,获取失败返回None """ try: # 获取完整模块路径 full_module = plugin.__class__.__module__ module_parts = full_module.split('.') # 处理不同的模块路径情况 if len(module_parts) >= 2 and module_parts[0] == 'plugins': # 对于目录插件,模块名在第二个位置 return module_parts[1] elif len(module_parts) >= 2: # 其他情况,取倒数第二个 return module_parts[-2] else: # 单文件插件,直接返回 return full_module except (IndexError, AttributeError) as e: self.LOG.warning(f"获取插件 {plugin.name} 的模块名时出错: {e}") return None def load_plugin(self, module_name: str) -> Optional[PluginInterface]: """ 加载插件 Args: module_name: 插件模块名 Returns: 插件实例,加载失败返回None """ try: # 检查是否已有同名模块的插件加载 for display_name, plugin in self.plugins.items(): try: plugin_module_name = self._get_module_name_from_plugin(plugin) if plugin_module_name == module_name: self.LOG.debug(f"PluginManager:插件模块 {module_name} 已加载为 {display_name}") # 确保模块名到显示名的映射存在 if module_name not in self.module_to_display: self.module_to_display[module_name] = display_name self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}") self._inject_bot_to_plugin(plugin) return plugin except Exception as e: self.LOG.warning(f"获取插件 {display_name} 的模块名时出错: {e}") continue # 确定插件路径和模块路径 plugin_path = os.path.join(self.plugin_dir, module_name) # 加载模块 if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")): # 目录插件,从main.py加载 module_path = f"plugins.{module_name}.main" try: module = importlib.import_module(module_path) self.plugin_modules[module_name] = module except ImportError as e: self.LOG.error(f"PluginManager:导入插件模块 {module_path} 失败: {e}") return None else: # 单文件插件 plugin_path = self.plugin_dir try: module = importlib.import_module(module_name) self.plugin_modules[module_name] = module except ImportError as e: self.LOG.error(f"PluginManager:导入单文件插件 {module_name} 失败: {e}") return None # 查找插件类 plugin_class = None for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, PluginInterface) and obj != PluginInterface and obj != MessagePluginInterface and obj != ScheduledPluginInterface): plugin_class = obj break # 如果没有找到插件类,尝试查找get_plugin函数 if plugin_class is None: get_plugin_func = getattr(module, "get_plugin", None) if callable(get_plugin_func): plugin = get_plugin_func() if isinstance(plugin, PluginInterface): # 设置插件路径 plugin.set_plugin_path(plugin_path) # 加载插件配置 if not plugin.load_config(): self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败") async_job.remove_jobs_by_owner(plugin) return None # 初始化插件 if not plugin.initialize(self.system_context): self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败") async_job.remove_jobs_by_owner(plugin) return None self._inject_bot_to_plugin(plugin) # 注册插件 PluginRegistry().register(plugin) # 获取显示名称 display_name = plugin.name # 存储插件实例 self.plugins[display_name] = plugin # 添加模块名到显示名的映射 self.module_to_display[module_name] = display_name self._refresh_module_file_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") return plugin else: self.LOG.error(f"PluginManager:插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例") else: self.LOG.error(f"PluginManager:插件模块 {module_name} 中未找到有效的插件类或 get_plugin 函数") return None # 实例化插件 plugin = plugin_class() plugin.status = PluginStatus.LOADED # 设置插件路径 plugin.set_plugin_path(plugin_path) # 在load_plugin方法中,找到加载配置后的位置(约在第298行) # 加载插件配置 if not plugin.load_config(): self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败") async_job.remove_jobs_by_owner(plugin) return None # 修改检查enable状态的代码:遍历所有配置节点 for section in plugin._config.values(): if isinstance(section, dict) and not section.get("enable", True): self.LOG.debug(f"PluginManager:插件 {module_name} 已禁用,跳过加载") async_job.remove_jobs_by_owner(plugin) return None # 初始化插件 if not plugin.initialize(self.system_context): self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败") async_job.remove_jobs_by_owner(plugin) return None self._inject_bot_to_plugin(plugin) # 注册插件 PluginRegistry().register(plugin) # 获取显示名称 display_name = plugin.name # 存储插件实例 self.plugins[display_name] = plugin # 添加模块名到显示名的映射 self.module_to_display[module_name] = display_name self._refresh_module_file_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") return plugin except Exception as e: plugin_obj = locals().get("plugin") if plugin_obj is not None: async_job.remove_jobs_by_owner(plugin_obj) self.LOG.exception(f"PluginManager:加载插件模块 {module_name} 失败: {e}", exc_info=True) return None def unload_plugin(self, name: str) -> bool: """ 卸载插件 Args: name: 插件名称(可以是模块名或显示名称) Returns: 卸载是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(name) if not plugin: self.LOG.debug(f"PluginManager:插件 {name} 未加载") return False # 停止插件 if plugin.status == PluginStatus.RUNNING: if not plugin.stop(): self.LOG.debug(f"PluginManager:停止插件 {display_name} 失败") return False plugin.status = PluginStatus.STOPPED # 确保状态更新 removed_jobs = async_job.remove_jobs_by_owner(plugin) if removed_jobs: self.LOG.debug(f"PluginManager:已移除插件 {display_name} 的定时任务 {removed_jobs} 个") # 清理插件资源 if not plugin.cleanup(): self.LOG.debug(f"PluginManager:清理插件 {display_name} 资源失败") return False # 设置状态为未加载 plugin.status = PluginStatus.UNLOADED # 注销插件 PluginRegistry().unregister(display_name) # 获取模块名,用于清理映射 module_name = self._get_module_name_from_plugin(plugin) if module_name and module_name in self.module_to_display: del self.module_to_display[module_name] self.LOG.debug(f"PluginManager:清理模块映射 {module_name} -> {display_name}") self._module_file_state.pop(module_name, None) # 移除插件实例 del self.plugins[display_name] return True def reload_plugin(self, name: str) -> Optional[PluginInterface]: """ 重新加载插件 Args: name: 插件名称(可以是模块名或显示名称) Returns: 插件实例,重新加载失败返回None """ # 查找插件 display_name, plugin = self.find_plugin_by_name(name) if not plugin: self.LOG.debug(f"PluginManager:插件 {name} 未加载,无法重载") return None # 记录原插件状态和模块名 was_running = plugin.status == PluginStatus.RUNNING module_name = self._get_module_name_from_plugin(plugin) if not module_name: self.LOG.error(f"无法获取插件 {display_name} 的模块名,重载失败") return None # 卸载插件 if not self.unload_plugin(display_name): self.LOG.debug(f"卸载插件 {display_name} 失败,无法重载") return None # 重新导入模块 if module_name in self.plugin_modules: try: importlib.reload(self.plugin_modules[module_name]) except Exception as e: self.LOG.debug(f"重新导入插件模块 {module_name} 失败: {e}") return None # 加载插件 plugin = self.load_plugin(module_name) # 如果原来是运行状态,则重新启动 if plugin and was_running: self.start_plugin(plugin.name) return plugin def start_plugin(self, name: str) -> bool: """ 启动插件 Args: name: 插件名称(可以是模块名或显示名称) Returns: 启动是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(name) if not plugin: self.LOG.debug(f"PluginManager:插件 {name} 未加载") return False if plugin.status == PluginStatus.RUNNING: self.LOG.debug(f"PluginManager:插件 {display_name} 已经在运行") return True if plugin.start(): plugin.status = PluginStatus.RUNNING self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为在运行") return True else: plugin.status = PluginStatus.ERROR self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为异常") return False def stop_plugin(self, name: str) -> bool: """ 停止插件 Args: name: 插件名称(可以是模块名或显示名称) Returns: 停止是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(name) if not plugin: self.LOG.debug(f"插件 {name} 未加载") return False if plugin.status != PluginStatus.RUNNING: self.LOG.debug(f"插件 {display_name} 未在运行") return True if plugin.stop(): plugin.status = PluginStatus.STOPPED self.LOG.debug(f"插件 {display_name} 状态变更为已停止") return True else: plugin.status = PluginStatus.ERROR self.LOG.debug(f"插件 {display_name} 状态变更为异常") return False def shutdown_plugins(self) -> bool: """ 卸载所有插件 Returns: 是否全部成功卸载 """ success = True self.stop_hot_reload_watcher() # 创建插件名称的副本,因为在卸载过程中会修改self.plugins字典 display_names = list(self.plugins.keys()) for display_name in display_names: if not self.unload_plugin(display_name): self.LOG.error(f"卸载插件 {display_name} 失败") success = False # 清空插件模块字典 self.plugin_modules.clear() self.module_to_display.clear() # 确保插件字典为空 if self.plugins: self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留") success = False return success def find_plugin_by_name(self, name: str) -> Tuple[Optional[str], Optional[PluginInterface]]: """ 根据插件名称或模块名查找插件 Args: name: 插件名称或模块名 Returns: (插件显示名称, 插件实例) 元组,未找到返回 (None, None) """ # 直接通过显示名称查找 if name in self.plugins: return name, self.plugins[name] # 通过模块名查找 if name in self.module_to_display: display_name = self.module_to_display[name] return display_name, self.plugins.get(display_name) # 遍历所有插件查找匹配的模块名 for display_name, plugin in self.plugins.items(): try: module_name = self._get_module_name_from_plugin(plugin) if module_name and module_name == name: # 顺便更新映射 if module_name not in self.module_to_display: self.module_to_display[module_name] = display_name self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}") return display_name, plugin # 不区分大小写比较 if module_name and module_name.lower() == name.lower(): if module_name not in self.module_to_display: self.module_to_display[module_name] = display_name return display_name, plugin # 检查名称是否包含在模块名中(不区分大小写) if module_name and name.lower() in module_name.lower(): return display_name, plugin # 检查模块名是否包含在名称中(不区分大小写) if module_name and module_name.lower() in name.lower(): return display_name, plugin # 检查名称是否包含在显示名称中(不区分大小写) if name.lower() in display_name.lower(): return display_name, plugin except Exception: continue # 记录未找到插件的详细信息,帮助调试 self.LOG.warning(f"未找到插件: {name},当前已加载插件: {list(self.plugins.keys())}") self.LOG.warning(f"模块映射: {self.module_to_display}") return None, None def inject_bot(self, bot: WechatAPIClient): self.current_bot = bot self.system_context["bot"] = bot for name, plugin in self.plugins.items(): # self.LOG.debug(f"plugin name{name}, plugin: {plugin}") if hasattr(plugin, "set_bot"): try: plugin.set_bot(bot) self.LOG.debug(f"已成功注入 bot 到插件 {name}") except Exception as e: self.LOG.error(f"注入 bot 到插件 {name} 失败: {e}")