Files
abot/plugins/plugin_manager/main.py
2025-04-23 16:11:06 +08:00

343 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.plugin_manager import PluginManager
from utils.robot_cmd.robot_command import GroupBotManager
class PluginManagerPlugin(MessagePluginInterface):
"""插件管理插件"""
@property
def name(self) -> str:
return "插件管理"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供插件的启用、禁用、重载、卸载和加载功能,实现插件热更新"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def __init__(self):
super().__init__()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"])
self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表")
self.enable = self._config.get("PluginManager", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
# ... start 和 stop 方法保持不变 ...
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm = message.get("gbm")
target = roomid if roomid else sender
# 检查命令格式
parts = content.split(" ")
if len(parts) == 1:
self.message_util.send_text(f"❌命令格式错误!\n{self.command_format}", target, sender)
return True, "命令格式错误"
# 只有使用的时候才全局获取对象。防止在预加载的时候跟主线程冲突
self.plugin_registry = PluginRegistry()
self.plugin_manager = PluginManager().get_instance()
# 检查权限 (只允许管理员操作)
if not self._is_admin(sender, gbm):
self.message_util.send_text(f"❌权限不足,只有管理员可以管理插件", target, sender)
return True, "权限不足"
# 解析子命令
sub_command = parts[1]
plugin_name = parts[2] if len(parts) > 2 else ""
try:
# 根据子命令执行相应操作
command_handlers = {
"列表": self._list_plugins,
"启用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._enable_plugin),
"禁用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._disable_plugin),
"重载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._reload_plugin),
"卸载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._unload_plugin),
# 修改这一行,使用 lambda 函数而不是直接调用
"加载": lambda s, r: self._load_plugin(plugin_name, s, r),
"信息": lambda s, r: self._operate_plugin(plugin_name, s, r, self._plugin_info)
}
handler = command_handlers.get(sub_command)
if handler and (sub_command == "列表" or plugin_name):
return handler(sender, roomid)
else:
self.message_util.send_text(f"❌未知命令或缺少参数!\n{self.command_format}", target, sender)
return True, "未知命令"
except Exception as e:
import traceback
error_trace = traceback.format_exc()
self.LOG.error(f"处理插件管理请求出错: {e}\n{error_trace}")
self.message_util.send_text(f"❌操作失败: {str(e)}", target, sender)
return True, f"处理出错: {e}"
def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool:
"""检查用户是否为管理员"""
admin_list = gbm.get_admin_list() if gbm else []
return user_id in admin_list
def _list_plugins(self, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""列出所有插件"""
plugins = self.plugin_registry.get_all_plugins().values()
target = roomid if roomid else sender
# 构建插件列表消息
message = "📋 插件列表:\n"
for plugin in plugins:
status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
message += f"{status}-{plugin.name} [模块: {module_name}]\n"
self.message_util.send_text(message, target, sender)
return True, "列出插件成功"
def _operate_plugin(self, plugin_name: str, sender: str, roomid: str,
operation_func) -> Tuple[bool, str]:
"""通用插件操作函数"""
target = roomid if roomid else sender
# 查找匹配的插件名称
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
self.message_util.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", target, sender)
return True, f"未找到插件 {plugin_name}"
# 不允许操作自身(对于某些操作)
if display_name == self.name and operation_func in [self._unload_plugin, self._disable_plugin]:
self.message_util.send_text(f"⚠️不能对插件管理插件自身执行此操作", target, sender)
return True, "不能对插件管理插件自身执行此操作"
# 执行具体操作
return operation_func(display_name, sender, roomid)
def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""启用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
return True, f"插件 {plugin_name} 不存在"
if plugin.status == PluginStatus.RUNNING:
self.message_util.send_text(f"⚠️插件 {plugin_name} 已经处于启用状态", target, sender)
return True, f"插件 {plugin_name} 已经处于启用状态"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 启动插件
if self.plugin_manager.start_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 启用成功", target, sender)
return True, f"插件 {plugin_name} 启用成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 启用失败", target, sender)
return False, f"插件 {plugin_name} 启用失败"
def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""禁用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
return True, f"插件 {plugin_name} 不存在"
if plugin.status != PluginStatus.RUNNING:
self.message_util.send_text(f"⚠️插件 {plugin_name} 已经处于禁用状态", target, sender)
return True, f"插件 {plugin_name} 已经处于禁用状态"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 停止插件
if self.plugin_manager.stop_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 禁用成功", target, sender)
return True, f"插件 {plugin_name} 禁用成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 禁用失败", target, sender)
return False, f"插件 {plugin_name} 禁用失败"
def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""重载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
return True, f"插件 {plugin_name} 不存在"
# 记录原插件状态
was_running = plugin.status == PluginStatus.RUNNING
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 重载插件
reloaded_plugin = self.plugin_manager.reload_plugin(module_name)
if reloaded_plugin:
self.message_util.send_text(f"✅插件 {plugin_name} 重载成功", target, sender)
return True, f"插件 {plugin_name} 重载成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 重载失败", target, sender)
return False, f"插件 {plugin_name} 重载失败"
def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""卸载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
return True, f"插件 {plugin_name} 不存在"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 卸载插件
if self.plugin_manager.unload_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 卸载成功", target, sender)
return True, f"插件 {plugin_name} 卸载成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 卸载失败", target, sender)
return False, f"插件 {plugin_name} 卸载失败"
def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""加载插件"""
# 对于加载操作,我们直接使用目录名作为模块名
# 检查插件目录是否存在
plugin_dir = os.path.join("plugins", plugin_name)
if not os.path.exists(plugin_dir):
if not silent:
self.message_util.send_text(f"❌插件目录 {plugin_dir} 不存在",
(roomid if roomid else sender), sender)
return False, f"插件目录 {plugin_dir} 不存在"
# 检查插件是否已加载 - 遍历所有插件查找模块名匹配的
for existing_plugin in self.plugin_registry.get_all_plugins().values():
existing_module_name = existing_plugin.__class__.__module__.split('.')[-2]
if existing_module_name == plugin_name:
if not silent:
self.message_util.send_text(f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载",
(roomid if roomid else sender), sender)
return True, f"插件 {existing_plugin.name} 已经加载"
try:
# 使用插件管理器加载插件
plugin = self.plugin_manager.load_plugin(plugin_name)
if plugin:
if not silent:
self.message_util.send_text(f"✅插件 {plugin.name} 加载成功",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin.name} 加载成功"
else:
if not silent:
self.message_util.send_text(f"❌插件 {plugin_name} 加载失败",
(roomid if roomid else sender), sender)
return False, f"插件 {plugin_name} 加载失败"
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 出错: {e}")
if not silent:
self.message_util.send_text(f"❌加载插件出错: {str(e)}",
(roomid if roomid else sender), sender)
return False, f"加载插件出错: {e}"
def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""查看插件详情"""
# 查找匹配的插件名称
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
self.message_util.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(display_name)
if not plugin:
self.message_util.send_text(f"❌插件 {display_name} 不存在",
(roomid if roomid else sender), sender)
return True, f"插件 {display_name} 不存在"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 构建插件详情消息
status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
message = f"""
📦 插件详情:{plugin.name}
📝 描述:{plugin.description}
🔢 版本:{plugin.version}
👤 作者:{plugin.author}
📂 模块名:{module_name}
⚙️ 状态:{status_text}
🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else ''}
"""
self.message_util.send_text(message, (roomid if roomid else sender), sender)
return True, "查看插件详情成功"