import importlib import inspect import logging import os import sys from typing import Dict, List, Any, Optional, Type, Tuple from plugin_common.plugin_interface import PluginInterface, PluginStatus from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.scheduled_plugin_interface import ScheduledPluginInterface from plugin_common.plugin_registry import PluginRegistry from plugin_common.event_system import EventSystem, EventType class PluginManager: """插件管理器,负责加载、卸载、启动、停止插件""" # 单例实例 _instance = None @classmethod def get_instance(cls, plugin_dir=None): """获取单例实例 Args: plugin_dir: 插件目录,如果已有实例则忽略此参数 Returns: PluginManager实例 """ if cls._instance is None: cls._instance = cls(plugin_dir=plugin_dir or "plugins") return cls._instance def __new__(cls, *args, **kwargs): """实现单例模式""" if cls._instance is None: cls._instance = super(PluginManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, plugin_dir: str = "plugins"): """ 初始化插件管理器 Args: plugin_dir: 插件目录 """ self.plugin_dir = plugin_dir self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典 self.plugin_modules = {} # 插件模块字典 self.module_to_plugin = {} # 模块名到插件名的映射 self.system_context = {} # 系统上下文 self.LOG = logging.getLogger(__name__) # 确保插件目录存在 if not os.path.exists(self.plugin_dir): os.makedirs(self.plugin_dir) # 将插件目录添加到Python路径 if self.plugin_dir not in sys.path: sys.path.insert(0, self.plugin_dir) def set_system_context(self, context: Dict[str, Any]): """ 设置系统上下文 Args: context: 系统上下文 """ self.system_context = context def discover_plugins(self) -> List[str]: """ 发现可用插件 Returns: 插件模块名称列表 """ plugin_modules = [] # 遍历插件目录 for item in os.listdir(self.plugin_dir): if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"): # 检查是否有main.py文件 if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")): plugin_modules.append(item) elif item.endswith(".py") and not item.startswith("__"): # 单文件插件 plugin_modules.append(item[:-3]) self.LOG.info(f"PluginManager:发现插件模块: {plugin_modules}") return plugin_modules def load_all_plugins(self) -> Dict[str, PluginInterface]: """ 加载所有插件 Returns: 插件实例字典 """ plugin_modules = self.discover_plugins() loaded_plugins = [] for plugin_name in plugin_modules: try: plugin = self.load_plugin(plugin_name) if plugin: loaded_plugins.append(plugin_name) # 自动启动插件 self.start_plugin(plugin.name) except Exception as e: self.LOG.error(f"PluginManager:加载插件 {plugin_name} 时发生错误: {str(e)}", exc_info=True) self.LOG.info(f"PluginManager:成功加载插件: {loaded_plugins}") self.LOG.info(f"PluginManager:self.plugins: {self.plugins}") self.LOG.info(f"PluginManager:self.module_to_plugin: {self.module_to_plugin}") return self.plugins def load_plugin(self, plugin_name: str) -> Optional[PluginInterface]: """ 加载插件 Args: plugin_name: 插件名称(模块名) Returns: 插件实例,加载失败返回None """ try: # 检查是否已有同名模块的插件加载 for name, plugin in self.plugins.items(): module_name = plugin.__class__.__module__.split('.')[-2] if module_name == plugin_name: self.LOG.info(f"PluginManager:插件模块 {plugin_name} 已加载为 {name}") return plugin # 如果插件已加载,直接返回 if plugin_name in self.plugins: return self.plugins[plugin_name] # 确定插件路径和模块路径 plugin_path = os.path.join(self.plugin_dir, plugin_name) # 加载模块 if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")): # 目录插件,从main.py加载 module_path = f"plugins.{plugin_name}.main" try: module = importlib.import_module(module_path) self.plugin_modules[plugin_name] = module except ImportError as e: self.LOG.error(f"PluginManager:导入插件模块 {module_path} 失败: {e}") return None else: # 单文件插件 plugin_path = self.plugin_dir try: module = importlib.import_module(plugin_name) self.plugin_modules[plugin_name] = module except ImportError as e: self.LOG.error(f"PluginManager:导入单文件插件 {plugin_name} 失败: {e}") return None # 查找插件类 plugin_class = None for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, PluginInterface) and obj != PluginInterface and obj != MessagePluginInterface and obj != ScheduledPluginInterface): plugin_class = obj break # 如果没有找到插件类,尝试查找get_plugin函数 if plugin_class is None: get_plugin_func = getattr(module, "get_plugin", None) if callable(get_plugin_func): plugin = get_plugin_func() if isinstance(plugin, PluginInterface): # 设置插件路径 plugin.set_plugin_path(plugin_path) # 加载插件配置 if not plugin.load_config(): self.LOG.error(f"PluginManager:插件 {plugin_name} 加载配置失败") return None # 初始化插件 if not plugin.initialize(self.system_context): self.LOG.error(f"PluginManager:插件 {plugin_name} 初始化失败") return None # 注册插件 PluginRegistry().register(plugin) # 存储插件实例 self.plugins[plugin.name] = plugin # 发布插件加载事件 EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin}) return plugin else: self.LOG.error(f"PluginManager:插件 {plugin_name} 的 get_plugin() 返回的不是有效的插件实例") else: self.LOG.error(f"PluginManager:插件 {plugin_name} 中未找到有效的插件类或 get_plugin 函数") return None # 实例化插件 plugin = plugin_class() plugin.status = PluginStatus.LOADED # 设置插件路径 plugin.set_plugin_path(plugin_path) # 加载插件配置 if not plugin.load_config(): self.LOG.error(f"PluginManager:插件 {plugin_name} 加载配置失败") return None # 初始化插件 if not plugin.initialize(self.system_context): self.LOG.error(f"PluginManager:插件 {plugin_name} 初始化失败") return None # 注册插件 PluginRegistry().register(plugin) # 在 load_plugin 方法中,修改存储插件实例的部分 # 存储插件实例 self.plugins[plugin.name] = plugin # 添加模块名到插件名的映射 try: module_name = plugin.__class__.__module__.split('.')[-2] self.module_to_plugin[module_name] = plugin.name except (IndexError, AttributeError): self.LOG.warning(f"无法为插件 {plugin.name} 获取有效的模块名") # 发布插件加载事件 EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin}) return plugin except Exception as e: self.LOG.error(f"PluginManager:加载插件 {plugin_name} 失败: {e}", exc_info=True) return None def unload_plugin(self, plugin_name: str) -> bool: """ 卸载插件 Args: plugin_name: 插件名称(可以是模块名或显示名称) Returns: 卸载是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(plugin_name) if not plugin: self.LOG.info(f"PluginManager:插件 {plugin_name} 未加载") return False # 停止插件 if plugin.status == PluginStatus.RUNNING: if not plugin.stop(): self.LOG.info(f"PluginManager:停止插件 {display_name} 失败") return False plugin.status = PluginStatus.STOPPED # 确保状态更新 # 清理插件资源 if not plugin.cleanup(): self.LOG.info(f"PluginManager:清理插件 {display_name} 资源失败") return False # 设置状态为未加载 plugin.status = PluginStatus.UNLOADED # 注销插件 PluginRegistry().unregister(display_name) # 获取模块名,用于清理映射 try: module_name = plugin.__class__.__module__.split('.')[-2] # 清理模块名到插件名的映射 if module_name in self.module_to_plugin: del self.module_to_plugin[module_name] except (IndexError, AttributeError): pass # 移除插件实例 del self.plugins[display_name] # 发布插件卸载事件 EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": display_name}) return True def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]: """ 重新加载插件 Args: plugin_name: 插件名称(可以是模块名或显示名称) Returns: 插件实例,重新加载失败返回None """ # 查找插件 display_name, plugin = self.find_plugin_by_name(plugin_name) if not plugin: self.LOG.info(f"PluginManager:插件 {plugin_name} 未加载,无法重载") return None # 记录原插件状态和模块名 was_running = plugin.status == PluginStatus.RUNNING try: module_name = plugin.__class__.__module__.split('.')[-2] except (IndexError, AttributeError): self.LOG.error(f"无法获取插件 {display_name} 的模块名,重载失败") return None # 卸载插件 if not self.unload_plugin(display_name): self.LOG.info(f"卸载插件 {display_name} 失败,无法重载") return None # 重新导入模块 if module_name in self.plugin_modules: try: importlib.reload(self.plugin_modules[module_name]) except Exception as e: self.LOG.info(f"重新导入插件模块 {module_name} 失败: {e}") return None # 加载插件 plugin = self.load_plugin(module_name) # 如果原来是运行状态,则重新启动 if plugin and was_running: self.start_plugin(plugin.name) return plugin def start_plugin(self, plugin_name: str) -> bool: """ 启动插件 Args: plugin_name: 插件名称(可以是模块名或显示名称) Returns: 启动是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(plugin_name) if not plugin: self.LOG.info(f"PluginManager:插件 {plugin_name} 未加载") return False if plugin.status == PluginStatus.RUNNING: self.LOG.info(f"PluginManager:插件 {display_name} 已经在运行") return True if plugin.start(): plugin.status = PluginStatus.RUNNING self.LOG.info(f"PluginManager:插件 {display_name} 状态变更为在运行") return True else: plugin.status = PluginStatus.ERROR self.LOG.info(f"PluginManager:插件 {display_name} 状态变更为异常") return False def stop_plugin(self, plugin_name: str) -> bool: """ 停止插件 Args: plugin_name: 插件名称(可以是模块名或显示名称) Returns: 停止是否成功 """ # 查找插件 display_name, plugin = self.find_plugin_by_name(plugin_name) if not plugin: self.LOG.info(f"插件 {plugin_name} 未加载") return False if plugin.status != PluginStatus.RUNNING: self.LOG.info(f"插件 {display_name} 未在运行") return True if plugin.stop(): plugin.status = PluginStatus.STOPPED self.LOG.info(f"插件 {display_name} 状态变更为已停止") return True else: plugin.status = PluginStatus.ERROR self.LOG.info(f"插件 {display_name} 状态变更为异常") return False def shutdown_plugins(self) -> bool: """ 卸载所有插件 Returns: 是否全部成功卸载 """ success = True # 创建插件名称的副本,因为在卸载过程中会修改self.plugins字典 plugin_names = list(self.plugins.keys()) for plugin_name in plugin_names: if not self.unload_plugin(plugin_name): self.LOG.error(f"卸载插件 {plugin_name} 失败") success = False # 清空插件模块字典 self.plugin_modules.clear() # 确保插件字典为空 if self.plugins: self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留") success = False return success def find_plugin_by_name(self, plugin_name: str) -> Tuple[Optional[str], Optional[PluginInterface]]: """ 根据插件名称或模块名查找插件 Args: plugin_name: 插件名称或模块名 Returns: (插件显示名称, 插件实例) 元组,未找到返回 (None, None) """ # 直接通过显示名称查找 if plugin_name in self.plugins: return plugin_name, self.plugins[plugin_name] # 通过模块名查找 if plugin_name in self.module_to_plugin: display_name = self.module_to_plugin[plugin_name] return display_name, self.plugins.get(display_name) # 遍历所有插件查找匹配的模块名 for name, plugin in self.plugins.items(): try: module_name = plugin.__class__.__module__.split('.')[-2] if module_name == plugin_name: return name, plugin except (IndexError, AttributeError): continue return None, None