Files
abot/plugins/robot_menu/main.py
2026-05-01 12:45:34 +08:00

429 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
import re
import xml.etree.ElementTree as ET
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from .menu_render_tool import RobotMenuRenderTool
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
class RobotMenuPlugin(MessagePluginInterface):
"""功能菜单插件"""
# 功能权限常量
FEATURE_KEY = "ROBOT_MENU"
FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 | 菜单 状态 | 菜单 指令清单]"
@property
def name(self) -> str:
return "功能菜单"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供功能菜单功能,支持查看和使用机器人功能"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 从配置文件加载命令列表
self._commands = self._config.get("RobotMenu", {}).get("command", ["菜单", "功能"])
self.command_format = self._config.get("RobotMenu", {}).get("command-format", "菜单 功能名")
self.enable = self._config.get("RobotMenu", {}).get("enable", True)
# 输出模式配置:支持 text / image 两种模式。
# 这里不在主插件内实现渲染细节,而是交给独立工具模块处理。
output_mode = self._config.get("RobotMenu", {}).get("output_mode", "text")
# 图片生成失败时是否回退到文本:
# - True优先保证消息可达
# - False严格按“只发图片”执行失败时仅提示失败原因
self.image_fallback_to_text = bool(
self._config.get("RobotMenu", {}).get("image_fallback_to_text", False)
)
# 菜单图片渲染超时与重试配置,避免极端环境下长时间阻塞消息处理协程。
self.image_render_timeout_seconds = int(
self._config.get("RobotMenu", {}).get("image_render_timeout_seconds", 45)
)
self.image_render_retries = int(
self._config.get("RobotMenu", {}).get("image_render_retries", 1)
)
# 菜单命令属于强交互型消息:
# 1. 用户输入“菜单”后,不能允许单次渲染长期霸占消息处理协程;
# 2. 因此这里单独定义“主链路同步等待预算”,超出后立即由渲染工具降级;
# 3. 该预算默认比底层图片渲染超时短很多,优先保障机器人整体吞吐稳定。
self.sync_send_timeout_seconds = int(
self._config.get("RobotMenu", {}).get("sync_send_timeout_seconds", 18)
)
# 对外层插件保护显式声明一个更合适的总超时:
# 1. 内层菜单发送会在 sync_send_timeout_seconds 内决定“成功发图 / 回退文本 / 返回失败提示”;
# 2. 外层 wait_for 必须比内层稍长,给降级发送文本留出缓冲;
# 3. 这样可以避免过去“内外层都卡在 55 秒”时,外层先打断,导致降级逻辑来不及执行。
self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8)
# 菜单图片模板文件路径(相对仓库根目录):
# 调整样式和布局时只改模板,不改 Python 逻辑。
self.image_template_path = str(
self._config.get("RobotMenu", {}).get(
"image_template_path",
"plugins/robot_menu/templates/menu_cards.html",
)
).strip() or "plugins/robot_menu/templates/menu_cards.html"
# 初始化“菜单渲染工具”,后续菜单图片与文本发送统一由该工具负责。
self.menu_renderer = RobotMenuRenderTool(
output_mode=output_mode,
image_fallback_to_text=self.image_fallback_to_text,
image_render_timeout_seconds=self.image_render_timeout_seconds,
image_render_retries=self.image_render_retries,
sync_send_timeout_seconds=self.sync_send_timeout_seconds,
image_template_path=self.image_template_path,
log=self.LOG,
)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def display_menu_status(self, group_id: str) -> str:
"""显示所有功能列表及其在指定群组中的当前状态带emoji"""
menu = []
for feature in Feature:
status = GroupBotManager.get_group_permission(group_id, feature)
status_emoji = "" if status == PermissionStatus.ENABLED else ""
status_str = "启用" if status == PermissionStatus.ENABLED else "关闭"
menu.append(f"{status_emoji} {status_str}-{feature.value}-{feature.description}")
return "\n".join(menu)
def get_enabled_features(self, group_id: str) -> str:
"""获取某个群已启用的功能列表及其描述,并返回格式化的字符串
只返回描述中包含指令(方括号[])的功能
Args:
group_id: 群ID
Returns:
str: 格式化的已启用功能列表字符串
"""
enabled_features = []
# 检查群是否在列表中
if group_id not in GroupBotManager.local_cache["group_list"]:
return "该群未启用机器人功能"
# 遍历所有功能,检查哪些已启用且包含指令
for feature in Feature:
status = GroupBotManager.get_group_permission(group_id, feature)
# 只添加已启用且描述中包含方括号的功能
if status == PermissionStatus.ENABLED and "[" in feature.description and "]" in feature.description:
enabled_features.append({
"id": feature.value,
"name": feature.name,
"description": feature.description
})
# 如果没有启用任何带指令的功能
if not enabled_features:
return "该群未启用任何带指令的功能"
# 构建格式化的字符串
result = f"不支持@交互,请通过指令触发\n 群功能菜单:\n"
for feature in enabled_features:
result += f"{feature['id']}.{feature['description']}\n"
return result
def get_group_list(self) -> str:
"""返回所有启用了群机器人的群组清单"""
group_list = list(GroupBotManager.local_cache["group_list"])
if not group_list:
return "当前没有启用机器人的群组"
return "\n".join(group_list)
def at_list(self, xml):
try:
root = ET.fromstring(xml)
atuserlist_element = root.find('.//atuserlist')
atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip()
atuserlist_content_no_commas = atuserlist_content.strip(',')
atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas)
atuserlist_set = set(atuserlist_content_no_commas.split(','))
atuserlist_set.discard('')
return atuserlist_set
except ET.ParseError:
return set()
@plugin_stats_decorator(plugin_name="机器人菜单")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
command = content.split(" ")[0]
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
revoke: MessageAutoRevoke = message.get("revoke")
wx_msg = message.get("full_wx_msg")
xml = wx_msg.msg_source if wx_msg else ""
if not bot:
self.LOG.error("WechatAPIClient 未初始化")
return False, "Bot 未初始化"
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
target = roomid if roomid else sender
# 检查命令格式
parts = content.split()
if len(parts) == 1:
# 显示功能菜单
menu_text = self.get_enabled_features(roomid if roomid else sender)
menu_markdown = self.menu_renderer.build_feature_status_markdown(roomid if roomid else sender)
menu_html = self.menu_renderer.build_feature_status_html(roomid if roomid else sender)
await self.menu_renderer.send_menu_content(
bot=bot,
target=target,
sender=sender,
revoke=revoke,
text_content=menu_text,
markdown_content=menu_markdown,
html_content=menu_html,
revoke_seconds=90,
)
return True, "显示功能菜单"
# 提取命令名
cmd_name = content[len(command):].strip()
try:
# 处理特殊命令
if cmd_name.upper() == "状态":
# 显示所有功能状态
status_text = self.display_menu_status(roomid if roomid else sender)
status_markdown = self.menu_renderer.build_feature_status_markdown(roomid if roomid else sender)
status_html = self.menu_renderer.build_feature_status_html(roomid if roomid else sender)
await self.menu_renderer.send_menu_content(
bot=bot,
target=target,
sender=sender,
revoke=revoke,
text_content=status_text,
markdown_content=status_markdown,
html_content=status_html,
revoke_seconds=90,
)
return True, "显示功能状态"
if cmd_name in {"指令清单", "功能清单", "命令清单", "帮助"}:
# 指令清单改为直接从插件快照自动生成:
# 1. 展示当前群“真实可用”的命令,而不是手工维护的固定文案;
# 2. 管理员额外看到未启用项与管理命令,普通用户只看到能直接用的内容;
# 3. 这样后续新增/删除插件后,菜单无需手动同步修改。
command_catalog_text = self.menu_renderer.build_command_catalog_text(
roomid if roomid else sender,
sender,
)
command_catalog_markdown = self.menu_renderer.build_command_catalog_markdown(
roomid if roomid else sender,
sender,
)
await self.menu_renderer.send_menu_content(
bot=bot,
target=target,
sender=sender,
revoke=revoke,
text_content=command_catalog_text,
markdown_content=command_catalog_markdown,
html_content="",
revoke_seconds=120,
)
return True, "显示指令清单"
# 处理群列表命令
if cmd_name.upper() == "群列表":
group_list_text = self.get_group_list()
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, group_list_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "显示群列表"
# 处理功能启用/关闭命令
# 格式:菜单 启用 功能名 或 菜单 关闭 功能名
if len(parts) >= 3 and parts[1] in ["启用", "关闭"]:
# 检查管理员权限
group_id_for_perm = roomid if roomid else sender
if not GroupBotManager.is_admin_for_group(sender, group_id_for_perm):
await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender])
return True, "权限不足"
action = parts[1]
feature_identifier = " ".join(parts[2:]) # 支持带空格的功能名
# 构造命令字符串,调用 GroupBotManager.handle_command
# 尝试作为序号
if feature_identifier.isdigit():
command_str = f"{feature_identifier}-{action}"
else:
# 尝试作为功能名
# 查找匹配的功能枚举
matched_feature = None
for feature in Feature:
# 尝试通过枚举名称匹配
if feature.name.lower() == feature_identifier.lower():
matched_feature = feature
break
# 尝试通过序号匹配
if str(feature.value) == feature_identifier:
matched_feature = feature
break
if matched_feature:
command_str = f"{matched_feature.value}-{action}"
else:
# 直接使用功能标识符(可能是序号或功能名)
command_str = f"{feature_identifier}-{action}"
# 调用 GroupBotManager 处理命令
result = gbm.handle_command(roomid if roomid else sender, command_str)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, result, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "处理功能命令"
if len(parts) >= 2 and parts[1] == "管理员":
if not GroupBotManager.is_admin(sender):
await bot.send_at_message(target, "❌权限不足,只有全局管理员可以管理群管理员", [sender])
return True, "权限不足"
group_id = roomid if roomid else sender
if len(parts) < 3:
help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "管理员命令帮助"
admin_action = parts[2]
if admin_action == "添加" and len(parts) >= 4:
at_users = self.at_list(xml) if xml else set()
if len(at_users) == 1:
new_admin_id = next(iter(at_users))
GroupBotManager.add_group_admin(group_id, new_admin_id)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "添加群管理员"
candidate = parts[3]
resolved_id = candidate
if roomid:
members = ContactManager.get_instance().get_group_members(roomid) or {}
for wxid, nick in members.items():
if nick == candidate:
resolved_id = wxid
break
GroupBotManager.add_group_admin(group_id, resolved_id)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "添加群管理员"
if admin_action == "删除" and len(parts) >= 4:
at_users = self.at_list(xml) if xml else set()
if len(at_users) == 1:
del_admin_id = next(iter(at_users))
ok = GroupBotManager.remove_group_admin(group_id, del_admin_id)
msg = "已删除群管理员" if ok else "该管理员不在清单中"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "删除群管理员"
candidate = parts[3]
resolved_id = candidate
if roomid:
members = ContactManager.get_instance().get_group_members(roomid) or {}
for wxid, nick in members.items():
if nick == candidate:
resolved_id = wxid
break
ok = GroupBotManager.remove_group_admin(group_id, resolved_id)
msg = "已删除群管理员" if ok else "该管理员不在清单中"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "删除群管理员"
if admin_action in ["列表", "清单"]:
admins = GroupBotManager.get_group_admins(group_id)
list_text = "群管理员列表为空" if not admins else "\n".join(admins)
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, list_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "列出群管理员"
help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "管理员命令帮助"
# 如果是其他未知命令,显示帮助
help_text = f"❌命令格式错误!\n{self.command_format}"
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90)
return True, "命令格式错误"
except Exception as e:
self.LOG.error(f"处理功能请求出错: {e}")
import traceback
self.LOG.error(traceback.format_exc())
return False, f"处理出错: {e}"