from loguru import logger import os from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from base.plugin_common.plugin_registry import PluginRegistry from base.plugin_common.plugin_manager import PluginManager from utils.robot_cmd.robot_command import GroupBotManager from wechat_ipad import WechatAPIClient class PluginManagerPlugin(MessagePluginInterface): """插件管理插件""" @property def name(self) -> str: return "插件管理" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供插件的启用、禁用、重载、卸载和加载功能,实现插件热更新" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return "PLUGIN_MANAGER" @property def feature_description(self) -> Optional[str]: return "🔧 插件管理功能 [插件管理]" def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def __init__(self): super().__init__() self.bot: WechatAPIClient = None # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"]) self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表") self.enable = self._config.get("PluginManager", {}).get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm = message.get("gbm") target = roomid if roomid else sender self.bot: WechatAPIClient = message.get("bot") if not self.bot: self.LOG.error("WechatAPIClient 未初始化") return False, "Bot 未初始化" # 检查功能权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 检查命令格式 parts = content.split(" ") if len(parts) == 1: await self.bot.send_at_message(target, f"❌命令格式错误!\n{self.command_format}", [sender]) return True, "命令格式错误" # 只有使用的时候才全局获取对象。防止在预加载的时候跟主线程冲突 self.plugin_registry = PluginRegistry() self.plugin_manager = PluginManager().get_instance() # 检查管理员权限 if not self._is_admin(sender, gbm): await self.bot.send_at_message(target, f"❌权限不足,只有管理员可以管理插件", [sender]) return True, "权限不足" # 解析子命令 sub_command = parts[1] plugin_name = parts[2] if len(parts) > 2 else "" try: # 根据子命令执行相应操作 command_handlers = { "列表": self._list_plugins, "启用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._enable_plugin), "禁用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._disable_plugin), "重载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._reload_plugin), "卸载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._unload_plugin), "加载": lambda s, r: self._load_plugin(plugin_name, s, r), "信息": lambda s, r: self._operate_plugin(plugin_name, s, r, self._plugin_info) } handler = command_handlers.get(sub_command) if handler and (sub_command == "列表" or plugin_name): return await handler(sender, roomid) else: await self.bot.send_at_message(target, f"❌未知命令或缺少参数!\n{self.command_format}", [sender]) return True, "未知命令" except Exception as e: import traceback error_trace = traceback.format_exc() self.LOG.error(f"处理插件管理请求出错: {e}\n{error_trace}") await self.bot.send_at_message(target, f"❌操作失败: {str(e)}", [sender]) return True, f"处理出错: {e}" def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool: """检查用户是否为管理员""" admin_list = gbm.get_admin_list() if gbm else [] return user_id in admin_list async def _list_plugins(self, sender: str, roomid: str) -> Tuple[bool, str]: """列出所有插件""" plugins = self.plugin_registry.get_all_plugins().values() target = roomid if roomid else sender # 构建插件列表消息 message = "📋 插件列表:\n" for plugin in plugins: status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用" module_name = plugin.__class__.__module__.split('.')[-2] message += f"{status}-{plugin.name} [模块: {module_name}]\n" await self.bot.send_at_message(target, message, [sender]) return True, "列出插件成功" async def _operate_plugin(self, plugin_name: str, sender: str, roomid: str, operation_func) -> Tuple[bool, str]: """通用插件操作函数""" target = roomid if roomid else sender # 查找匹配的插件名称 display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name) if not display_name: await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender]) return True, f"未找到插件 {plugin_name}" # 不允许操作自身(对于某些操作) if display_name == self.name and operation_func in [self._unload_plugin, self._disable_plugin]: await self.bot.send_at_message(target, f"⚠️不能对插件管理插件自身执行此操作", [sender]) return True, "不能对插件管理插件自身执行此操作" # 执行具体操作 return await operation_func(display_name, sender, roomid) async def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]: """启用插件""" target = roomid if roomid else sender plugin = self.plugin_registry.get_plugin(plugin_name) if not plugin: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender]) return True, f"插件 {plugin_name} 不存在" if plugin.status == PluginStatus.RUNNING: await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于启用状态", [sender]) return True, f"插件 {plugin_name} 已经处于启用状态" module_name = plugin.__class__.__module__.split('.')[-2] if self.plugin_manager.start_plugin(module_name): await self.bot.send_at_message(target, f"✅插件 {plugin_name} 启用成功", [sender]) return True, f"插件 {plugin_name} 启用成功" else: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 启用失败", [sender]) return False, f"插件 {plugin_name} 启用失败" async def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]: """禁用插件""" target = roomid if roomid else sender plugin = self.plugin_registry.get_plugin(plugin_name) if not plugin: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender]) return True, f"插件 {plugin_name} 不存在" if plugin.status != PluginStatus.RUNNING: await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于禁用状态", [sender]) return True, f"插件 {plugin_name} 已经处于禁用状态" module_name = plugin.__class__.__module__.split('.')[-2] if self.plugin_manager.stop_plugin(module_name): await self.bot.send_at_message(target, f"✅插件 {plugin_name} 禁用成功", [sender]) return True, f"插件 {plugin_name} 禁用成功" else: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 禁用失败", [sender]) return False, f"插件 {plugin_name} 禁用失败" async def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]: """重载插件""" target = roomid if roomid else sender plugin = self.plugin_registry.get_plugin(plugin_name) if not plugin: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender]) return True, f"插件 {plugin_name} 不存在" module_name = plugin.__class__.__module__.split('.')[-2] reloaded_plugin = self.plugin_manager.reload_plugin(module_name) if reloaded_plugin: await self.bot.send_at_message(target, f"✅插件 {plugin_name} 重载成功", [sender]) return True, f"插件 {plugin_name} 重载成功" else: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 重载失败", [sender]) return False, f"插件 {plugin_name} 重载失败" async def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]: """卸载插件""" target = roomid if roomid else sender plugin = self.plugin_registry.get_plugin(plugin_name) if not plugin: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender]) return True, f"插件 {plugin_name} 不存在" module_name = plugin.__class__.__module__.split('.')[-2] if self.plugin_manager.unload_plugin(module_name): await self.bot.send_at_message(target, f"✅插件 {plugin_name} 卸载成功", [sender]) return True, f"插件 {plugin_name} 卸载成功" else: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 卸载失败", [sender]) return False, f"插件 {plugin_name} 卸载失败" async def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]: """加载插件""" plugin_dir = os.path.join("plugins", plugin_name) target = roomid if roomid else sender if not os.path.exists(plugin_dir): if not silent: await self.bot.send_at_message(target, f"❌插件目录 {plugin_dir} 不存在", [sender]) return False, f"插件目录 {plugin_dir} 不存在" for existing_plugin in self.plugin_registry.get_all_plugins().values(): existing_module_name = existing_plugin.__class__.__module__.split('.')[-2] if existing_module_name == plugin_name: if not silent: await self.bot.send_at_message(target, f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载", [sender]) return True, f"插件 {existing_plugin.name} 已经加载" try: plugin = self.plugin_manager.load_plugin(plugin_name) if plugin: if not silent: await self.bot.send_at_message(target, f"✅插件 {plugin.name} 加载成功", [sender]) return True, f"插件 {plugin.name} 加载成功" else: if not silent: await self.bot.send_at_message(target, f"❌插件 {plugin_name} 加载失败", [sender]) return False, f"插件 {plugin_name} 加载失败" except Exception as e: self.LOG.error(f"加载插件 {plugin_name} 出错: {e}") if not silent: await self.bot.send_at_message(target, f"❌加载插件出错: {str(e)}", [sender]) return False, f"加载插件出错: {e}" async def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]: """查看插件详情""" target = roomid if roomid else sender display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name) if not display_name: await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender]) return True, f"未找到插件 {plugin_name}" plugin = self.plugin_registry.get_plugin(display_name) if not plugin: await self.bot.send_at_message(target, f"❌插件 {display_name} 不存在", [sender]) return True, f"插件 {display_name} 不存在" module_name = plugin.__class__.__module__.split('.')[-2] status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用" message = f""" 📦 插件详情:{plugin.name} 📝 描述:{plugin.description} 🔢 版本:{plugin.version} 👤 作者:{plugin.author} 📂 模块名:{module_name} ⚙️ 状态:{status_text} 🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else '无'} """ await self.bot.send_at_message(target, message, [sender]) return True, "查看插件详情成功"