优化代码,删除无效代码

This commit is contained in:
liuwei
2025-05-19 17:58:49 +08:00
parent 28f02b58e0
commit fb752c084e
23 changed files with 83 additions and 503 deletions

View File

@@ -7,6 +7,7 @@ from plugin_common.plugin_interface import PluginStatus
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.plugin_manager import PluginManager
from utils.robot_cmd.robot_command import GroupBotManager
from wechat_ipad import WechatAPIClient
class PluginManagerPlugin(MessagePluginInterface):
@@ -36,20 +37,9 @@ class PluginManagerPlugin(MessagePluginInterface):
def commands(self) -> List[str]:
return self._commands
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
@@ -58,7 +48,6 @@ class PluginManagerPlugin(MessagePluginInterface):
# 保存上下文对象
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self._commands = self._config.get("PluginManager", {}).get("command", ["插件", "plugin", "插件管理"])
self.command_format = self._config.get("PluginManager", {}).get("command-format", "插件 列表")
@@ -67,8 +56,6 @@ class PluginManagerPlugin(MessagePluginInterface):
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
# ... start 和 stop 方法保持不变 ...
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
@@ -79,7 +66,7 @@ class PluginManagerPlugin(MessagePluginInterface):
return command in self._commands
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
@@ -88,18 +75,25 @@ class PluginManagerPlugin(MessagePluginInterface):
roomid = message.get("roomid", "")
gbm = message.get("gbm")
target = roomid if roomid else sender
self.bot: WechatAPIClient = message.get("bot")
if not self.bot:
self.LOG.error("WechatAPIClient 未初始化")
return False, "Bot 未初始化"
# 检查命令格式
parts = content.split(" ")
if len(parts) == 1:
self.message_util.send_text(f"❌命令格式错误!\n{self.command_format}", target, sender)
await self.bot.send_at_message(target, f"❌命令格式错误!\n{self.command_format}", [sender])
return True, "命令格式错误"
# 只有使用的时候才全局获取对象。防止在预加载的时候跟主线程冲突
self.plugin_registry = PluginRegistry()
self.plugin_manager = PluginManager().get_instance()
# 检查权限 (只允许管理员操作)
if not self._is_admin(sender, gbm):
self.message_util.send_text(f"❌权限不足,只有管理员可以管理插件", target, sender)
await self.bot.send_at_message(target, f"❌权限不足,只有管理员可以管理插件", [sender])
return True, "权限不足"
# 解析子命令
@@ -114,23 +108,22 @@ class PluginManagerPlugin(MessagePluginInterface):
"禁用": lambda s, r: self._operate_plugin(plugin_name, s, r, self._disable_plugin),
"重载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._reload_plugin),
"卸载": lambda s, r: self._operate_plugin(plugin_name, s, r, self._unload_plugin),
# 修改这一行,使用 lambda 函数而不是直接调用
"加载": lambda s, r: self._load_plugin(plugin_name, s, r),
"信息": lambda s, r: self._operate_plugin(plugin_name, s, r, self._plugin_info)
}
handler = command_handlers.get(sub_command)
if handler and (sub_command == "列表" or plugin_name):
return handler(sender, roomid)
return await handler(sender, roomid)
else:
self.message_util.send_text(f"❌未知命令或缺少参数!\n{self.command_format}", target, sender)
await self.bot.send_at_message(target, f"❌未知命令或缺少参数!\n{self.command_format}", [sender])
return True, "未知命令"
except Exception as e:
import traceback
error_trace = traceback.format_exc()
self.LOG.error(f"处理插件管理请求出错: {e}\n{error_trace}")
self.message_util.send_text(f"❌操作失败: {str(e)}", target, sender)
await self.bot.send_at_message(target, f"❌操作失败: {str(e)}", [sender])
return True, f"处理出错: {e}"
def _is_admin(self, user_id: str, gbm: GroupBotManager) -> bool:
@@ -138,7 +131,7 @@ class PluginManagerPlugin(MessagePluginInterface):
admin_list = gbm.get_admin_list() if gbm else []
return user_id in admin_list
def _list_plugins(self, wcf, sender: str, roomid: str) -> Tuple[bool, str]:
async def _list_plugins(self, sender: str, roomid: str) -> Tuple[bool, str]:
"""列出所有插件"""
plugins = self.plugin_registry.get_all_plugins().values()
target = roomid if roomid else sender
@@ -147,15 +140,14 @@ class PluginManagerPlugin(MessagePluginInterface):
message = "📋 插件列表:\n"
for plugin in plugins:
status = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
message += f"{status}-{plugin.name} [模块: {module_name}]\n"
self.message_util.send_text(message, target, sender)
await self.bot.send_at_message(target, message, [sender])
return True, "列出插件成功"
def _operate_plugin(self, plugin_name: str, sender: str, roomid: str,
operation_func) -> Tuple[bool, str]:
async def _operate_plugin(self, plugin_name: str, sender: str, roomid: str,
operation_func) -> Tuple[bool, str]:
"""通用插件操作函数"""
target = roomid if roomid else sender
@@ -163,170 +155,147 @@ class PluginManagerPlugin(MessagePluginInterface):
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
self.message_util.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确", target, sender)
await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender])
return True, f"未找到插件 {plugin_name}"
# 不允许操作自身(对于某些操作)
if display_name == self.name and operation_func in [self._unload_plugin, self._disable_plugin]:
self.message_util.send_text(f"⚠️不能对插件管理插件自身执行此操作", target, sender)
await self.bot.send_at_message(target, f"⚠️不能对插件管理插件自身执行此操作", [sender])
return True, "不能对插件管理插件自身执行此操作"
# 执行具体操作
return operation_func(display_name, sender, roomid)
return await operation_func(display_name, sender, roomid)
def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
async def _enable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""启用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
if plugin.status == PluginStatus.RUNNING:
self.message_util.send_text(f"⚠️插件 {plugin_name} 已经处于启用状态", target, sender)
await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于启用状态", [sender])
return True, f"插件 {plugin_name} 已经处于启用状态"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 启动插件
if self.plugin_manager.start_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 启用成功", target, sender)
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 启用成功", [sender])
return True, f"插件 {plugin_name} 启用成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 启用失败", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 启用失败", [sender])
return False, f"插件 {plugin_name} 启用失败"
def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
async def _disable_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""禁用插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
if plugin.status != PluginStatus.RUNNING:
self.message_util.send_text(f"⚠️插件 {plugin_name} 已经处于禁用状态", target, sender)
await self.bot.send_at_message(target, f"⚠️插件 {plugin_name} 已经处于禁用状态", [sender])
return True, f"插件 {plugin_name} 已经处于禁用状态"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 停止插件
if self.plugin_manager.stop_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 禁用成功", target, sender)
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 禁用成功", [sender])
return True, f"插件 {plugin_name} 禁用成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 禁用失败", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 禁用失败", [sender])
return False, f"插件 {plugin_name} 禁用失败"
def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
async def _reload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""重载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
# 记录原插件状态
was_running = plugin.status == PluginStatus.RUNNING
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 重载插件
reloaded_plugin = self.plugin_manager.reload_plugin(module_name)
if reloaded_plugin:
self.message_util.send_text(f"✅插件 {plugin_name} 重载成功", target, sender)
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 重载成功", [sender])
return True, f"插件 {plugin_name} 重载成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 重载失败", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 重载失败", [sender])
return False, f"插件 {plugin_name} 重载失败"
def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
async def _unload_plugin(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""卸载插件"""
target = roomid if roomid else sender
plugin = self.plugin_registry.get_plugin(plugin_name)
if not plugin:
self.message_util.send_text(f"❌插件 {plugin_name} 不存在", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 不存在", [sender])
return True, f"插件 {plugin_name} 不存在"
# 获取插件的模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 卸载插件
if self.plugin_manager.unload_plugin(module_name):
self.message_util.send_text(f"✅插件 {plugin_name} 卸载成功", target, sender)
await self.bot.send_at_message(target, f"✅插件 {plugin_name} 卸载成功", [sender])
return True, f"插件 {plugin_name} 卸载成功"
else:
self.message_util.send_text(f"❌插件 {plugin_name} 卸载失败", target, sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 卸载失败", [sender])
return False, f"插件 {plugin_name} 卸载失败"
def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
async def _load_plugin(self, plugin_name: str, sender: str, roomid: str, silent: bool = False) -> Tuple[bool, str]:
"""加载插件"""
# 对于加载操作,我们直接使用目录名作为模块名
# 检查插件目录是否存在
plugin_dir = os.path.join("plugins", plugin_name)
target = roomid if roomid else sender
if not os.path.exists(plugin_dir):
if not silent:
self.message_util.send_text(f"❌插件目录 {plugin_dir} 不存在",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"❌插件目录 {plugin_dir} 不存在", [sender])
return False, f"插件目录 {plugin_dir} 不存在"
# 检查插件是否已加载 - 遍历所有插件查找模块名匹配的
for existing_plugin in self.plugin_registry.get_all_plugins().values():
existing_module_name = existing_plugin.__class__.__module__.split('.')[-2]
if existing_module_name == plugin_name:
if not silent:
self.message_util.send_text(f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"⚠️插件 {existing_plugin.name} (模块名: {plugin_name}) 已经加载", [sender])
return True, f"插件 {existing_plugin.name} 已经加载"
try:
# 使用插件管理器加载插件
plugin = self.plugin_manager.load_plugin(plugin_name)
if plugin:
if not silent:
self.message_util.send_text(f"✅插件 {plugin.name} 加载成功",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"✅插件 {plugin.name} 加载成功", [sender])
return True, f"插件 {plugin.name} 加载成功"
else:
if not silent:
self.message_util.send_text(f"❌插件 {plugin_name} 加载失败",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"❌插件 {plugin_name} 加载失败", [sender])
return False, f"插件 {plugin_name} 加载失败"
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 出错: {e}")
if not silent:
self.message_util.send_text(f"❌加载插件出错: {str(e)}",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"❌加载插件出错: {str(e)}", [sender])
return False, f"加载插件出错: {e}"
def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
async def _plugin_info(self, plugin_name: str, sender: str, roomid: str) -> Tuple[bool, str]:
"""查看插件详情"""
# 查找匹配的插件名称
target = roomid if roomid else sender
display_name, plugin = self.plugin_manager.find_plugin_by_name(plugin_name)
if not display_name:
self.message_util.send_text(f"❌未找到插件 {plugin_name},请检查名称是否正确",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"❌未找到插件 {plugin_name},请检查名称是否正确", [sender])
return True, f"未找到插件 {plugin_name}"
plugin = self.plugin_registry.get_plugin(display_name)
if not plugin:
self.message_util.send_text(f"❌插件 {display_name} 不存在",
(roomid if roomid else sender), sender)
await self.bot.send_at_message(target, f"❌插件 {display_name} 不存在", [sender])
return True, f"插件 {display_name} 不存在"
# 获取插件模块名
module_name = plugin.__class__.__module__.split('.')[-2]
# 构建插件详情消息
status_text = "✅ 已启用" if plugin.status == PluginStatus.RUNNING else "❌ 已禁用"
message = f"""
📦 插件详情:{plugin.name}
@@ -338,5 +307,5 @@ class PluginManagerPlugin(MessagePluginInterface):
🔑 命令:{', '.join(plugin.commands) if hasattr(plugin, 'commands') else ''}
"""
self.message_util.send_text(message, (roomid if roomid else sender), sender)
return True, "查看插件详情成功"
await self.bot.send_at_message(target, message, [sender])
return True, "查看插件详情成功"