import logging import os import sys from typing import Dict, Any, List, Optional, Tuple from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from plugin_common.plugin_registry import PluginRegistry from plugin_common.plugin_manager import PluginManager from plugins.stats_collector.decorators import plugin_stats_decorator from robot_cmd.robot_command import GroupBotManager class PluginManagerPlugin(MessagePluginInterface): """插件管理插件""" @property def name(self) -> str: return "插件管理" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供插件的启用、禁用、重载、卸载和加载功能,实现插件热更新" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.plugin_manager = None def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logging.getLogger(f"Plugin.{self.name}") self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.wcf = context.get("wcf") self.event_system = context.get("event_system") self.message_util = context.get("message_util") # 保存插件注册表引用,用于管理插件 self.plugin_registry: PluginRegistry = context.get("plugin_registry") if not self.plugin_registry: self.LOG.error("无法获取插件注册表,插件管理功能将无法正常工作") return False # 创建插件管理器实例 self.plugin_manager = PluginManager() # 设置系统上下文 self.plugin_manager.set_system_context(context) self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"]) self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表") self.enable = self._config.get("PluginManager", {}).get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.info(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") wcf = message.get("wcf") gbm = message.get("gbm") # 检查命令格式 parts = content.split(" ") if len(parts) == 1: wcf.send_text(f"❌命令格式错误!\n{self.command_format}", (roomid if roomid else sender), sender) return True, "命令格式错误" # 检查权限 (只允许管理员操作) if not self._is_admin(sender, gbm): wcf.send_text(f"❌权限不足,只有管理员可以管理插件", (roomid if roomid else sender), sender) return True, "权限不足" # 解析子命令 sub_command = parts[1] plugin_name = parts[2] if len(parts) > 2 else "" try: # 根据子命令执行相应操作 if sub_command == "列表": return self._list_plugins(wcf, sender, roomid) elif sub_command == "启用" and plugin_name: return self._enable_plugin(plugin_name, wcf, sender, roomid) elif sub_command == "禁用" and plugin_name: return self._disable_plugin(plugin_name, wcf, sender, roomid) elif sub_command == "重载" and plugin_name: return self._reload_plugin(plugin_name, wcf, sender, roomid) elif sub_command == "卸载" and plugin_name: return self._unload_plugin(plugin_name, wcf, sender, roomid) elif sub_command == "加载" and plugin_name: return self._load_plugin(plugin_name, wcf, sender, roomid) elif sub_command == "信息" and plugin_name: return self._plugin_info(plugin_name, wcf, sender, roomid) else: wcf.send_text(f"❌未知命令或缺少参数!\n{self.command_format}", (roomid if roomid else sender), sender) return True, "未知命令" except Exception as e: import traceback error_trace = traceback.format_exc() self.LOG.error(f"处理插件管理请求出错: {e}\n{error_trace}") wcf.send_text(f"❌操作失败: {str(e)}", (roomid if roomid else sender), sender) return True, f"处理出错: {e}" def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool: """检查用户是否为管理员""" # 从配置中获取管理员列表 admin_list = gbm.get_admin_list() if gbm else [] return user_id in admin_list def _list_plugins(self, wcf, sender: str, roomid: str) -> Tuple[bool, str]: """列出所有插件""" plugins = self.plugin_registry.get_all_plugins().values() # 构建插件列表消息 message = "📋 插件列表:\n" for plugin in plugins: status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用" # 获取插件模块名 module_name = plugin.__class__.__module__.split('.')[-2] message += f"{status}-{plugin.name} [模块: {module_name}]\n" wcf.send_text(message, (roomid if roomid else sender), sender) return True, "列出插件成功" def _find_plugin_by_name(self, input_name: str) -> Optional[str]: """ 根据用户输入的名称查找匹配的插件 支持模糊匹配和不区分大小写 Args: input_name: 用户输入的插件名称 Returns: 匹配到的插件名称,未找到返回None """ if not input_name: return None # 获取所有插件 all_plugins = self.plugin_registry.get_all_plugins() # 精确匹配 if input_name in all_plugins: return input_name # 不区分大小写的精确匹配 for plugin_name in all_plugins: if plugin_name.lower() == input_name.lower(): return plugin_name # 模块名匹配(例如输入music匹配音乐点播) for plugin_name, plugin in all_plugins.items(): module_name = plugin.__class__.__module__.split('.')[-2] # 获取模块名 if module_name.lower() == input_name.lower(): return plugin_name # 部分匹配(包含关系) matched_plugins = [] for plugin_name in all_plugins: if input_name.lower() in plugin_name.lower() or plugin_name.lower() in input_name.lower(): matched_plugins.append(plugin_name) # 如果只有一个匹配项,返回它 if len(matched_plugins) == 1: return matched_plugins[0] # 如果有多个匹配项,返回None(需要用户明确指定) elif len(matched_plugins) > 1: return None return None # 修改现有的插件操作方法,使用_find_plugin_by_name函数 def _enable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]: """启用插件""" # 查找匹配的插件名称 actual_plugin_name = self._find_plugin_by_name(plugin_name) if not actual_plugin_name: wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", (roomid if roomid else sender), sender) return True, f"未找到插件 {plugin_name}" plugin = self.plugin_registry.get_plugin(actual_plugin_name) if not plugin: wcf.send_text(f"❌插件 {actual_plugin_name} 不存在", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 不存在" if plugin.status == PluginStatus.RUNNING: wcf.send_text(f"插件 {actual_plugin_name} 已经是启用状态", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 已经是启用状态" # 获取插件的模块名,这才是插件管理器需要的名称 module_name = plugin.__class__.__module__.split('.')[-2] # 使用模块名启用插件 if self.plugin_manager.start_plugin(module_name): wcf.send_text(f"✅插件 {actual_plugin_name} 启用成功", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 启用成功" else: wcf.send_text(f"❌插件 {actual_plugin_name} 启用失败", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 启用失败" # 同样修改其他插件操作方法 def _disable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]: """禁用插件""" # 查找匹配的插件名称 actual_plugin_name = self._find_plugin_by_name(plugin_name) if not actual_plugin_name: wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", (roomid if roomid else sender), sender) return True, f"未找到插件 {plugin_name}" # 不允许禁用自身 if actual_plugin_name == self.name: wcf.send_text(f"⚠️不能禁用插件管理插件自身", (roomid if roomid else sender), sender) return True, "不能禁用插件管理插件自身" plugin = self.plugin_registry.get_plugin(actual_plugin_name) if not plugin: wcf.send_text(f"❌插件 {actual_plugin_name} 不存在", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 不存在" if plugin.status == PluginStatus.STOPPED: wcf.send_text(f"⚠️插件 {actual_plugin_name} 已经是禁用状态", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 已经是禁用状态" # 获取插件的模块名,这才是插件管理器需要的名称 module_name = plugin.__class__.__module__.split('.')[-2] # 使用插件管理器停止插件 - 使用模块名而不是显示名称 success = self.plugin_manager.stop_plugin(module_name) if success: wcf.send_text(f"✅插件 {actual_plugin_name} 禁用成功", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 禁用成功" else: wcf.send_text(f"❌插件 {actual_plugin_name} 禁用失败", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 禁用失败" def _reload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]: """重新加载插件""" # 查找匹配的插件名称 actual_plugin_name = self._find_plugin_by_name(plugin_name) if not actual_plugin_name: wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", (roomid if roomid else sender), sender) return True, f"未找到插件 {plugin_name}" # 不允许重载自身 if actual_plugin_name == self.name: wcf.send_text(f"⚠️不能重载插件管理插件自身", (roomid if roomid else sender), sender) return True, "不能重载插件管理插件自身" plugin = self.plugin_registry.get_plugin(actual_plugin_name) if not plugin: wcf.send_text(f"❌插件 {actual_plugin_name} 不存在", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 不存在" # 获取插件的模块名,这才是插件管理器需要的名称 module_name = plugin.__class__.__module__.split('.')[-2] # 记录插件状态,以便重新加载后恢复 was_running = plugin.status == PluginStatus.RUNNING # 先卸载插件 - 使用模块名而不是显示名称 self.LOG.info(f"正在卸载插件 {actual_plugin_name}(模块名:{module_name}) 以进行重载") if not self.plugin_manager.unload_plugin(module_name): wcf.send_text(f"❌插件 {actual_plugin_name} 卸载失败,无法重载", (roomid if roomid else sender), sender) return False, f"插件 {actual_plugin_name} 卸载失败,无法重载" # 然后加载插件 - 使用模块名而不是显示名称 self.LOG.info(f"正在加载插件 {module_name}") plugin = self.plugin_manager.load_plugin(module_name) if not plugin: wcf.send_text(f"❌插件 {actual_plugin_name} 加载失败", (roomid if roomid else sender), sender) return False, f"插件 {actual_plugin_name} 加载失败" # 如果之前是启用状态,则重新启用 if was_running: self.LOG.info(f"正在启用插件 {actual_plugin_name}") if not self.plugin_manager.start_plugin(module_name): # 使用模块名而不是显示名称 wcf.send_text(f"⚠️插件 {actual_plugin_name} 重载成功,但启用失败", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 重载成功,但启用失败" wcf.send_text(f"✅插件 {actual_plugin_name} 重载成功", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 重载成功" def _unload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]: """卸载插件""" # 查找匹配的插件名称 actual_plugin_name = self._find_plugin_by_name(plugin_name) if not actual_plugin_name: if not silent: wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", (roomid if roomid else sender), sender) return True, f"未找到插件 {plugin_name}" # 不允许卸载自身 if actual_plugin_name == self.name: if not silent: wcf.send_text(f"⚠️不能卸载插件管理插件自身", (roomid if roomid else sender), sender) return True, "不能卸载插件管理插件自身" plugin = self.plugin_registry.get_plugin(actual_plugin_name) if not plugin: if not silent: wcf.send_text(f"❌插件 {actual_plugin_name} 不存在或已卸载", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 不存在或已卸载" # 获取插件的模块名,这才是插件管理器需要的名称 module_name = plugin.__class__.__module__.split('.')[-2] # 使用插件管理器卸载插件 - 使用模块名而不是显示名称 self.LOG.info(f"正在卸载插件 {actual_plugin_name}(模块名:{module_name})") success = self.plugin_manager.unload_plugin(module_name) if success: if not silent: wcf.send_text(f"✅插件 {actual_plugin_name} 卸载成功", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 卸载成功" else: if not silent: wcf.send_text(f"❌插件 {actual_plugin_name} 卸载失败", (roomid if roomid else sender), sender) return False, f"插件 {actual_plugin_name} 卸载失败" def _load_plugin(self, plugin_name: str, wcf, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]: """加载插件""" # 对于加载操作,我们直接使用目录名作为模块名 # 检查插件目录是否存在 plugin_dir = os.path.join("plugins", plugin_name) if not os.path.exists(plugin_dir): if not silent: wcf.send_text(f"❌插件目录 {plugin_dir} 不存在", (roomid if roomid else sender), sender) return False, f"插件目录 {plugin_dir} 不存在" # 检查插件是否已加载 - 遍历所有插件查找模块名匹配的 for existing_plugin in self.plugin_registry.get_all_plugins().values(): existing_module_name = existing_plugin.__class__.__module__.split('.')[-2] if existing_module_name == plugin_name: if not silent: wcf.send_text(f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载", (roomid if roomid else sender), sender) return True, f"插件 {existing_plugin.name} 已经加载" try: # 使用插件管理器加载插件 plugin = self.plugin_manager.load_plugin(plugin_name) if plugin: if not silent: wcf.send_text(f"✅插件 {plugin.name} 加载成功", (roomid if roomid else sender), sender) return True, f"插件 {plugin.name} 加载成功" else: if not silent: wcf.send_text(f"❌插件 {plugin_name} 加载失败", (roomid if roomid else sender), sender) return False, f"插件 {plugin_name} 加载失败" except Exception as e: self.LOG.error(f"加载插件 {plugin_name} 出错: {e}") if not silent: wcf.send_text(f"❌加载插件出错: {str(e)}", (roomid if roomid else sender), sender) return False, f"加载插件出错: {e}" def _plugin_info(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]: """查看插件详情""" # 查找匹配的插件名称 actual_plugin_name = self._find_plugin_by_name(plugin_name) if not actual_plugin_name: wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", (roomid if roomid else sender), sender) return True, f"未找到插件 {plugin_name}" plugin = self.plugin_registry.get_plugin(actual_plugin_name) if not plugin: wcf.send_text(f"❌插件 {actual_plugin_name} 不存在", (roomid if roomid else sender), sender) return True, f"插件 {actual_plugin_name} 不存在" # 获取插件模块名 module_name = plugin.__class__.__module__.split('.')[-2] # 构建插件详情消息 status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用" message = f""" 📦 插件详情:{plugin.name} 📝 描述:{plugin.description} 🔢 版本:{plugin.version} 👤 作者:{plugin.author} 📂 模块名:{module_name} ⚙️ 状态:{status_text} 🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else '无'} """ wcf.send_text(message, (roomid if roomid else sender), sender) return True, "查看插件详情成功"