Files
abot/plugins/plugin_manager/main.py
2026-01-16 13:34:37 +08:00

343 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import os
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from base.plugin_common.plugin_registry import PluginRegistry
from base.plugin_common.plugin_manager import PluginManager
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from wechat_ipad import WechatAPIClient
class PluginManagerPlugin(MessagePluginInterface):
"""插件管理插件"""
# 功能权限常量
FEATURE_KEY = "PLUGIN_MANAGER"
FEATURE_DESCRIPTION = "🔧 插件管理功能 [插件管理]"
@property
def name(self) -> str:
return "插件管理"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供插件的启用、禁用、重载、卸载和加载功能,实现插件热更新"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def __init__(self):
super().__init__()
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"])
self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表")
self.enable = self._config.get("PluginManager", {}).get("enable", True)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm = message.get("gbm")
target = roomid if roomid else sender
self.bot: WechatAPIClient = message.get("bot")
if not self.bot:
self.LOG.error("WechatAPIClient 未初始化")
return False, "Bot 未初始化"
# 检查功能权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 检查命令格式
parts = content.split(" ")
if len(parts) == 1:
await self.bot.send_at_message(target, f"❌命令格式错误!\n{self.command_format}", [sender])
return True, "命令格式错误"
# 只有使用的时候才全局获取对象。防止在预加载的时候跟主线程冲突
self.plugin_registry = PluginRegistry()
self.plugin_manager = PluginManager().get_instance()
# 检查管理员权限
if not self._is_admin(sender, gbm):
await self.bot.send_at_message(target, f"❌权限不足,只有管理员可以管理插件", [sender])
return True, "权限不足"
# 解析子命令
sub_command = parts[1]
plugin_name = parts[2] if len(parts) > 2 else ""
try:
# 根据子命令执行相应操作
command_handlers = {
"列表": self._list_plugins,
"启用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._enable_plugin),
"禁用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._disable_plugin),
"重载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._reload_plugin),
"卸载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._unload_plugin),
"加载": lambda s, r: self._load_plugin(plugin_name, s, r),
"信息": lambda s, r: self._operate_plugin(plugin_name, s, r, self._plugin_info)
}
handler = command_handlers.get(sub_command)
if handler and (sub_command == "列表" or plugin_name):
return await handler(sender, roomid)
else:
await self.bot.send_at_message(target, f"❌未知命令或缺少参数!\n{self.command_format}", [sender])
return True, "未知命令"
except Exception as e:
import traceback
error_trace = traceback.format_exc()
self.LOG.error(f"处理插件管理请求出错: {e}\n{error_trace}")
await self.bot.send_at_message(target, f"❌操作失败: {str(e)}", [sender])
return True, f"处理出错: {e}"
def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool:
"""检查用户是否为管理员"""
admin_list = gbm.get_admin_list() if gbm else []
return user_id in admin_list
async def _list_plugins(self, sender: str, roomid: str) -> Tuple[bool, str]:
"""列出所有插件"""
plugins = self.plugin_registry.get_all_plugins().values()
target = roomid if roomid else sender
# 构建插件列表消息
message = "📋 插件列表:\n"
for plugin in plugins:
status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
module_name = plugin.__class__.__module__.split('.')[-2]
message += f"{status}-{plugin.name} [模块: {module_name}]\n"
await self.bot.send_at_message(target, message, [sender])
return True, "列出插件成功"
async def _operate_plugin(self, plugin_name: str, sender: str, roomid: str,
operation_func) -> Tuple[bool, str]:
"""通用插件操作函数"""
target = roomid if roomid else sender
# 查找匹配的插件名称
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender])
return True, f"未找到插件 {plugin_name}"
# 不允许操作自身(对于某些操作)
if display_name == self.name and operation_func in [self._unload_plugin, self._disable_plugin]:
await self.bot.send_at_message(target, f"⚠️不能对插件管理插件自身执行此操作", [sender])
return True, "不能对插件管理插件自身执行此操作"
# 执行具体操作
return await operation_func(display_name, sender, roomid)
async def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""启用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
if plugin.status == PluginStatus.RUNNING:
await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于启用状态", [sender])
return True, f"插件 {plugin_name} 已经处于启用状态"
module_name = plugin.__class__.__module__.split('.')[-2]
if self.plugin_manager.start_plugin(module_name):
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 启用成功", [sender])
return True, f"插件 {plugin_name} 启用成功"
else:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 启用失败", [sender])
return False, f"插件 {plugin_name} 启用失败"
async def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""禁用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
if plugin.status != PluginStatus.RUNNING:
await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于禁用状态", [sender])
return True, f"插件 {plugin_name} 已经处于禁用状态"
module_name = plugin.__class__.__module__.split('.')[-2]
if self.plugin_manager.stop_plugin(module_name):
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 禁用成功", [sender])
return True, f"插件 {plugin_name} 禁用成功"
else:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 禁用失败", [sender])
return False, f"插件 {plugin_name} 禁用失败"
async def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""重载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
module_name = plugin.__class__.__module__.split('.')[-2]
reloaded_plugin = self.plugin_manager.reload_plugin(module_name)
if reloaded_plugin:
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 重载成功", [sender])
return True, f"插件 {plugin_name} 重载成功"
else:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 重载失败", [sender])
return False, f"插件 {plugin_name} 重载失败"
async def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""卸载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
module_name = plugin.__class__.__module__.split('.')[-2]
if self.plugin_manager.unload_plugin(module_name):
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 卸载成功", [sender])
return True, f"插件 {plugin_name} 卸载成功"
else:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 卸载失败", [sender])
return False, f"插件 {plugin_name} 卸载失败"
async def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""加载插件"""
plugin_dir = os.path.join("plugins", plugin_name)
target = roomid if roomid else sender
if not os.path.exists(plugin_dir):
if not silent:
await self.bot.send_at_message(target, f"❌插件目录 {plugin_dir} 不存在", [sender])
return False, f"插件目录 {plugin_dir} 不存在"
for existing_plugin in self.plugin_registry.get_all_plugins().values():
existing_module_name = existing_plugin.__class__.__module__.split('.')[-2]
if existing_module_name == plugin_name:
if not silent:
await self.bot.send_at_message(target,
f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载",
[sender])
return True, f"插件 {existing_plugin.name} 已经加载"
try:
plugin = self.plugin_manager.load_plugin(plugin_name)
if plugin:
if not silent:
await self.bot.send_at_message(target, f"✅插件 {plugin.name} 加载成功", [sender])
return True, f"插件 {plugin.name} 加载成功"
else:
if not silent:
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 加载失败", [sender])
return False, f"插件 {plugin_name} 加载失败"
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 出错: {e}")
if not silent:
await self.bot.send_at_message(target, f"❌加载插件出错: {str(e)}", [sender])
return False, f"加载插件出错: {e}"
async def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""查看插件详情"""
target = roomid if roomid else sender
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender])
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(display_name)
if not plugin:
await self.bot.send_at_message(target, f"❌插件 {display_name} 不存在", [sender])
return True, f"插件 {display_name} 不存在"
module_name = plugin.__class__.__module__.split('.')[-2]
status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
message = f"""
📦 插件详情:{plugin.name}
📝 描述:{plugin.description}
🔢 版本:{plugin.version}
👤 作者:{plugin.author}
📂 模块名:{module_name}
⚙️ 状态:{status_text}
🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else ''}
"""
await self.bot.send_at_message(target, message, [sender])
return True, "查看插件详情成功"