Files
abot/plugins/plugin_manager/main.py
2025-03-20 10:05:34 +08:00

437 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import sys
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.plugin_manager import PluginManager
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import GroupBotManager
class PluginManagerPlugin(MessagePluginInterface):
"""插件管理插件"""
@property
def name(self) -> str:
return "插件管理"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供插件的启用、禁用、重载、卸载和加载功能,实现插件热更新"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.plugin_manager = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
# 保存插件注册表引用,用于管理插件
self.plugin_registry: PluginRegistry = context.get("plugin_registry")
if not self.plugin_registry:
self.LOG.error("无法获取插件注册表,插件管理功能将无法正常工作")
return False
# 创建插件管理器实例
self.plugin_manager = PluginManager()
# 设置系统上下文
self.plugin_manager.set_system_context(context)
self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"])
self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表")
self.enable = self._config.get("PluginManager", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf = message.get("wcf")
gbm = message.get("gbm")
# 检查命令格式
parts = content.split(" ")
if len(parts) == 1:
wcf.send_text(f"❌命令格式错误!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
# 检查权限 (只允许管理员操作)
if not self._is_admin(sender, gbm):
wcf.send_text(f"❌权限不足,只有管理员可以管理插件",
(roomid if roomid else sender), sender)
return True, "权限不足"
# 解析子命令
sub_command = parts[1]
plugin_name = parts[2] if len(parts) > 2 else ""
try:
# 根据子命令执行相应操作
if sub_command == "列表":
return self._list_plugins(wcf, sender, roomid)
elif sub_command == "启用" and plugin_name:
return self._enable_plugin(plugin_name, wcf, sender, roomid)
elif sub_command == "禁用" and plugin_name:
return self._disable_plugin(plugin_name, wcf, sender, roomid)
elif sub_command == "重载" and plugin_name:
return self._reload_plugin(plugin_name, wcf, sender, roomid)
elif sub_command == "卸载" and plugin_name:
return self._unload_plugin(plugin_name, wcf, sender, roomid)
elif sub_command == "加载" and plugin_name:
return self._load_plugin(plugin_name, wcf, sender, roomid)
elif sub_command == "信息" and plugin_name:
return self._plugin_info(plugin_name, wcf, sender, roomid)
else:
wcf.send_text(f"❌未知命令或缺少参数!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "未知命令"
except Exception as e:
self.LOG.error(f"处理插件管理请求出错: {e}")
wcf.send_text(f"❌操作失败: {str(e)}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool:
"""检查用户是否为管理员"""
# 从配置中获取管理员列表
admin_list = gbm.get_admin_list() if gbm else []
return user_id in admin_list
def _list_plugins(self, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""列出所有插件"""
plugins = self.plugin_registry.get_all_plugins().values()
# 构建插件列表消息
message = "📋 插件列表:\n"
for plugin in plugins:
status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
message += f"{status}-{plugin.name} [模块: {module_name}]\n"
wcf.send_text(message, (roomid if roomid else sender), sender)
return True, "列出插件成功"
def _find_plugin_by_name(self, input_name: str) -> Optional[str]:
"""
根据用户输入的名称查找匹配的插件
支持模糊匹配和不区分大小写
Args:
input_name: 用户输入的插件名称
Returns:
匹配到的插件名称未找到返回None
"""
if not input_name:
return None
# 获取所有插件
all_plugins = self.plugin_registry.get_all_plugins()
# 精确匹配
if input_name in all_plugins:
return input_name
# 不区分大小写的精确匹配
for plugin_name in all_plugins:
if plugin_name.lower() == input_name.lower():
return plugin_name
# 模块名匹配例如输入music匹配音乐点播
for plugin_name, plugin in all_plugins.items():
module_name = plugin.__class__.__module__.split('.')[-2] # 获取模块名
if module_name.lower() == input_name.lower():
return plugin_name
# 部分匹配(包含关系)
matched_plugins = []
for plugin_name in all_plugins:
if input_name.lower() in plugin_name.lower() or plugin_name.lower() in input_name.lower():
matched_plugins.append(plugin_name)
# 如果只有一个匹配项,返回它
if len(matched_plugins) == 1:
return matched_plugins[0]
# 如果有多个匹配项返回None需要用户明确指定
elif len(matched_plugins) > 1:
return None
return None
# 修改现有的插件操作方法使用_find_plugin_by_name函数
def _enable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""启用插件"""
# 查找匹配的插件名称
actual_plugin_name = self._find_plugin_by_name(plugin_name)
if not actual_plugin_name:
wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(actual_plugin_name)
if not plugin:
wcf.send_text(f"❌插件 {actual_plugin_name} 不存在",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 不存在"
if plugin.status == PluginStatus.RUNNING:
wcf.send_text(f"插件 {actual_plugin_name} 已经是启用状态",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 已经是启用状态"
if self.plugin_manager.start_plugin(actual_plugin_name):
wcf.send_text(f"✅插件 {actual_plugin_name} 启用成功",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 启用成功"
else:
wcf.send_text(f"❌插件 {actual_plugin_name} 启用失败",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 启用失败"
# 同样修改其他插件操作方法
def _disable_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""禁用插件"""
# 查找匹配的插件名称
actual_plugin_name = self._find_plugin_by_name(plugin_name)
if not actual_plugin_name:
wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
# 不允许禁用自身
if plugin_name == self.name:
wcf.send_text(f"⚠️不能禁用插件管理插件自身",
(roomid if roomid else sender), sender)
return True, "不能禁用插件管理插件自身"
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
wcf.send_text(f"❌插件 {plugin_name} 不存在",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 不存在"
if plugin.status == PluginStatus.STOPPED:
wcf.send_text(f"⚠️插件 {plugin_name} 已经是禁用状态",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 已经是禁用状态"
# 使用插件管理器停止插件
success = self.plugin_manager.stop_plugin(plugin_name)
if success:
wcf.send_text(f"✅插件 {plugin_name} 禁用成功",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 禁用成功"
else:
wcf.send_text(f"❌插件 {plugin_name} 禁用失败",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 禁用失败"
def _reload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""重新加载插件"""
# 查找匹配的插件名称
actual_plugin_name = self._find_plugin_by_name(plugin_name)
if not actual_plugin_name:
wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
# 不允许重载自身
if actual_plugin_name == self.name:
wcf.send_text(f"⚠️不能重载插件管理插件自身",
(roomid if roomid else sender), sender)
return True, "不能重载插件管理插件自身"
plugin = self.plugin_registry.get_plugin(actual_plugin_name)
if not plugin:
wcf.send_text(f"❌插件 {actual_plugin_name} 不存在",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 不存在"
# 记录插件状态,以便重新加载后恢复
was_running = plugin.status == PluginStatus.RUNNING
# 先卸载插件
self.LOG.info(f"正在卸载插件 {actual_plugin_name} 以进行重载")
if not self.plugin_manager.unload_plugin(actual_plugin_name):
wcf.send_text(f"❌插件 {actual_plugin_name} 卸载失败,无法重载",
(roomid if roomid else sender), sender)
return False, f"插件 {actual_plugin_name} 卸载失败,无法重载"
# 然后加载插件
self.LOG.info(f"正在加载插件 {actual_plugin_name}")
plugin = self.plugin_manager.load_plugin(actual_plugin_name)
if not plugin:
wcf.send_text(f"❌插件 {actual_plugin_name} 加载失败",
(roomid if roomid else sender), sender)
return False, f"插件 {actual_plugin_name} 加载失败"
# 如果之前是启用状态,则重新启用
if was_running:
self.LOG.info(f"正在启用插件 {actual_plugin_name}")
if not self.plugin_manager.start_plugin(actual_plugin_name):
wcf.send_text(f"⚠️插件 {actual_plugin_name} 重载成功,但启用失败",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 重载成功,但启用失败"
wcf.send_text(f"✅插件 {actual_plugin_name} 重载成功",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 重载成功"
def _unload_plugin(self, plugin_name: str, wcf, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""卸载插件"""
# 不允许卸载自身
if plugin_name == self.name:
if not silent:
wcf.send_text(f"⚠️不能卸载插件管理插件自身",
(roomid if roomid else sender), sender)
return True, "不能卸载插件管理插件自身"
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
if not silent:
wcf.send_text(f"❌插件 {plugin_name} 不存在或已卸载",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 不存在或已卸载"
# 使用插件管理器卸载插件
success = self.plugin_manager.unload_plugin(plugin_name)
if success:
if not silent:
wcf.send_text(f"✅插件 {plugin_name} 卸载成功",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 卸载成功"
else:
if not silent:
wcf.send_text(f"❌插件 {plugin_name} 卸载失败",
(roomid if roomid else sender), sender)
return False, f"插件 {plugin_name} 卸载失败"
def _load_plugin(self, plugin_name: str, wcf, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""加载插件"""
# 检查插件是否已加载
existing_plugin = self.plugin_registry.get_plugin(plugin_name)
if existing_plugin:
if not silent:
wcf.send_text(f"⚠️插件 {plugin_name} 已经加载",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 已经加载"
try:
# 检查插件目录是否存在
plugin_dir = os.path.join("plugins", plugin_name)
if not os.path.exists(plugin_dir):
if not silent:
wcf.send_text(f"❌插件目录 {plugin_dir} 不存在",
(roomid if roomid else sender), sender)
return False, f"插件目录 {plugin_dir} 不存在"
# 使用插件管理器加载插件
plugin = self.plugin_manager.load_plugin(plugin_name)
if plugin:
if not silent:
wcf.send_text(f"✅插件 {plugin_name} 加载成功",
(roomid if roomid else sender), sender)
return True, f"插件 {plugin_name} 加载成功"
else:
if not silent:
wcf.send_text(f"❌插件 {plugin_name} 加载失败",
(roomid if roomid else sender), sender)
return False, f"插件 {plugin_name} 加载失败"
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 出错: {e}")
if not silent:
wcf.send_text(f"❌加载插件出错: {str(e)}",
(roomid if roomid else sender), sender)
return False, f"加载插件出错: {e}"
def _plugin_info(self, plugin_name: str, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
"""查看插件详情"""
# 查找匹配的插件名称
actual_plugin_name = self._find_plugin_by_name(plugin_name)
if not actual_plugin_name:
wcf.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(actual_plugin_name)
if not plugin:
wcf.send_text(f"❌插件 {actual_plugin_name} 不存在",
(roomid if roomid else sender), sender)
return True, f"插件 {actual_plugin_name} 不存在"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 构建插件详情消息
status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
message = f"""
📦 插件详情:{plugin.name}
📝 描述:{plugin.description}
🔢 版本:{plugin.version}
👤 作者:{plugin.author}
📂 模块名:{module_name}
⚙️ 状态:{status_text}
🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else ''}
"""
wcf.send_text(message, (roomid if roomid else sender), sender)
return True, "查看插件详情成功"