255 lines
10 KiB
Python
255 lines
10 KiB
Python
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from loguru import logger
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class RobotMenuPlugin(MessagePluginInterface):
|
||
"""功能菜单插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "ROBOT_MENU"
|
||
FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 - 显示功能菜单 | 菜单 状态 - 显示功能状态]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "功能菜单"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供功能菜单功能,支持查看和使用机器人功能"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
|
||
# 从配置文件加载命令列表
|
||
self._commands = self._config.get("RobotMenu", {}).get("command", ["菜单", "功能"])
|
||
self.command_format = self._config.get("RobotMenu", {}).get("command-format", "菜单 功能名")
|
||
self.enable = self._config.get("RobotMenu", {}).get("enable", True)
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
def display_menu_status(self, group_id: str) -> str:
|
||
"""显示所有功能列表及其在指定群组中的当前状态,带emoji"""
|
||
menu = []
|
||
for feature in Feature:
|
||
status = GroupBotManager.get_group_permission(group_id, feature)
|
||
status_emoji = "✅" if status == PermissionStatus.ENABLED else "❌"
|
||
status_str = "启用" if status == PermissionStatus.ENABLED else "关闭"
|
||
menu.append(f"{status_emoji} {status_str}-{feature.value}-{feature.description}")
|
||
return "\n".join(menu)
|
||
|
||
def get_enabled_features(self, group_id: str) -> str:
|
||
"""获取某个群已启用的功能列表及其描述,并返回格式化的字符串
|
||
只返回描述中包含指令(方括号[])的功能
|
||
|
||
Args:
|
||
group_id: 群ID
|
||
|
||
Returns:
|
||
str: 格式化的已启用功能列表字符串
|
||
"""
|
||
enabled_features = []
|
||
|
||
# 检查群是否在列表中
|
||
if group_id not in GroupBotManager.local_cache["group_list"]:
|
||
return "该群未启用机器人功能"
|
||
|
||
# 遍历所有功能,检查哪些已启用且包含指令
|
||
for feature in Feature:
|
||
status = GroupBotManager.get_group_permission(group_id, feature)
|
||
# 只添加已启用且描述中包含方括号的功能
|
||
if status == PermissionStatus.ENABLED and "[" in feature.description and "]" in feature.description:
|
||
enabled_features.append({
|
||
"id": feature.value,
|
||
"name": feature.name,
|
||
"description": feature.description
|
||
})
|
||
|
||
# 如果没有启用任何带指令的功能
|
||
if not enabled_features:
|
||
return "该群未启用任何带指令的功能"
|
||
|
||
# 构建格式化的字符串
|
||
result = f"不支持@交互,请通过指令触发\n 群功能菜单:\n"
|
||
for feature in enabled_features:
|
||
result += f"{feature['id']}.{feature['description']}\n"
|
||
|
||
return result
|
||
|
||
def get_group_list(self) -> str:
|
||
"""返回所有启用了群机器人的群组清单"""
|
||
group_list = list(GroupBotManager.local_cache["group_list"])
|
||
if not group_list:
|
||
return "当前没有启用机器人的群组"
|
||
return "\n".join(group_list)
|
||
|
||
@plugin_stats_decorator(plugin_name="机器人菜单")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
command = content.split(" ")[0]
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
revoke: MessageAutoRevoke = message.get("revoke")
|
||
|
||
if not bot:
|
||
self.LOG.error("WechatAPIClient 未初始化")
|
||
return False, "Bot 未初始化"
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
target = roomid if roomid else sender
|
||
|
||
# 检查命令格式
|
||
parts = content.split()
|
||
if len(parts) == 1:
|
||
# 显示功能菜单
|
||
menu_text = self.get_enabled_features(roomid if roomid else sender)
|
||
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, menu_text, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
|
||
return True, "显示功能菜单"
|
||
|
||
# 提取命令名
|
||
cmd_name = content[len(command):].strip()
|
||
|
||
try:
|
||
# 处理特殊命令
|
||
if cmd_name.upper() == "状态":
|
||
# 显示所有功能状态
|
||
status_text = self.display_menu_status(roomid if roomid else sender)
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, status_text, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
|
||
return True, "显示功能状态"
|
||
|
||
# 处理群列表命令
|
||
if cmd_name.upper() == "群列表":
|
||
group_list_text = self.get_group_list()
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, group_list_text, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
|
||
return True, "显示群列表"
|
||
|
||
# 处理功能启用/关闭命令
|
||
# 格式:菜单 启用 功能名 或 菜单 关闭 功能名
|
||
if len(parts) >= 3 and parts[1] in ["启用", "关闭"]:
|
||
# 检查管理员权限
|
||
if not GroupBotManager.is_admin(sender):
|
||
await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender])
|
||
return True, "权限不足"
|
||
|
||
action = parts[1]
|
||
feature_identifier = " ".join(parts[2:]) # 支持带空格的功能名
|
||
|
||
# 构造命令字符串,调用 GroupBotManager.handle_command
|
||
# 尝试作为序号
|
||
if feature_identifier.isdigit():
|
||
command_str = f"{feature_identifier}-{action}"
|
||
else:
|
||
# 尝试作为功能名
|
||
# 查找匹配的功能枚举
|
||
matched_feature = None
|
||
for feature in Feature:
|
||
# 尝试通过枚举名称匹配
|
||
if feature.name.lower() == feature_identifier.lower():
|
||
matched_feature = feature
|
||
break
|
||
# 尝试通过序号匹配
|
||
if str(feature.value) == feature_identifier:
|
||
matched_feature = feature
|
||
break
|
||
|
||
if matched_feature:
|
||
command_str = f"{matched_feature.value}-{action}"
|
||
else:
|
||
# 直接使用功能标识符(可能是序号或功能名)
|
||
command_str = f"{feature_identifier}-{action}"
|
||
|
||
# 调用 GroupBotManager 处理命令
|
||
result = gbm.handle_command(roomid if roomid else sender, command_str)
|
||
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, result, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
|
||
return True, "处理功能命令"
|
||
|
||
# 如果是其他未知命令,显示帮助
|
||
help_text = f"❌命令格式错误!\n{self.command_format}"
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
|
||
return True, "命令格式错误"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理功能请求出错: {e}")
|
||
import traceback
|
||
self.LOG.error(traceback.format_exc())
|
||
return False, f"处理出错: {e}"
|