import asyncio import time from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from loguru import logger import re import xml.etree.ElementTree as ET from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient class RobotMenuPlugin(MessagePluginInterface): """功能菜单插件""" # 功能权限常量 FEATURE_KEY = "ROBOT_MENU" FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 - 显示功能菜单 | 菜单 状态 - 显示功能状态]" @property def name(self) -> str: return "功能菜单" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供功能菜单功能,支持查看和使用机器人功能" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") # 从配置文件加载命令列表 self._commands = self._config.get("RobotMenu", {}).get("command", ["菜单", "功能"]) self.command_format = self._config.get("RobotMenu", {}).get("command-format", "菜单 功能名") self.enable = self._config.get("RobotMenu", {}).get("enable", True) # 输出模式配置:支持 text / image 两种模式。 # 业务诉求是“菜单默认发图片”,因此配置文件默认会写 image; # 为兼容历史配置,这里做兜底归一化,非法值统一降级到 text。 self.menu_output_mode = self._normalize_output_mode( self._config.get("RobotMenu", {}).get("output_mode", "text") ) # 图片生成失败时是否回退到文本: # - True:优先保证消息可达; # - False:严格按“只发图片”执行(失败时仅提示失败原因)。 self.image_fallback_to_text = bool( self._config.get("RobotMenu", {}).get("image_fallback_to_text", False) ) # 菜单图片渲染超时与重试配置,避免极端环境下长时间阻塞消息处理协程。 self.image_render_timeout_seconds = int( self._config.get("RobotMenu", {}).get("image_render_timeout_seconds", 45) ) self.image_render_retries = int( self._config.get("RobotMenu", {}).get("image_render_retries", 1) ) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands def display_menu_status(self, group_id: str) -> str: """显示所有功能列表及其在指定群组中的当前状态,带emoji""" menu = [] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) status_emoji = "✅" if status == PermissionStatus.ENABLED else "❌" status_str = "启用" if status == PermissionStatus.ENABLED else "关闭" menu.append(f"{status_emoji} {status_str}-{feature.value}-{feature.description}") return "\n".join(menu) def get_enabled_features(self, group_id: str) -> str: """获取某个群已启用的功能列表及其描述,并返回格式化的字符串 只返回描述中包含指令(方括号[])的功能 Args: group_id: 群ID Returns: str: 格式化的已启用功能列表字符串 """ enabled_features = [] # 检查群是否在列表中 if group_id not in GroupBotManager.local_cache["group_list"]: return "该群未启用机器人功能" # 遍历所有功能,检查哪些已启用且包含指令 for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) # 只添加已启用且描述中包含方括号的功能 if status == PermissionStatus.ENABLED and "[" in feature.description and "]" in feature.description: enabled_features.append({ "id": feature.value, "name": feature.name, "description": feature.description }) # 如果没有启用任何带指令的功能 if not enabled_features: return "该群未启用任何带指令的功能" # 构建格式化的字符串 result = f"不支持@交互,请通过指令触发\n 群功能菜单:\n" for feature in enabled_features: result += f"{feature['id']}.{feature['description']}\n" return result def get_group_list(self) -> str: """返回所有启用了群机器人的群组清单""" group_list = list(GroupBotManager.local_cache["group_list"]) if not group_list: return "当前没有启用机器人的群组" return "\n".join(group_list) @staticmethod def _normalize_output_mode(raw_mode: Any) -> str: """将配置中的输出模式标准化为 text 或 image。""" mode = str(raw_mode or "").strip().lower() if mode in {"image", "img", "picture", "pic"}: return "image" return "text" @staticmethod def _split_feature_description(feature_desc: str) -> Tuple[str, str]: """拆分功能描述为“功能说明”和“指令说明”。 约定: 1. `Feature.description` 中使用 `[...]` 包裹指令或触发方式; 2. 若未配置方括号,则指令说明使用“无”占位,保证菜单结构稳定。 """ desc = str(feature_desc or "").strip() match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc) if not match: return desc or "未命名功能", "无" title = (match.group(1) or "").strip() or "未命名功能" cmd_hint = (match.group(2) or "").strip() or "无" return title, cmd_hint def _build_feature_status_markdown(self, group_id: str) -> str: """构建菜单图片使用的 Markdown 文本。 输出目标: 1. 逐项列出每个插件(Feature)当前状态; 2. 明确展示每个插件的功能描述与指令/触发方式; 3. 让用户在一张图片里即可完成“查看状态 + 查指令”。 """ lines = [ "# 机器人功能菜单", "", f"- 目标:`{group_id}`", f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`", "", "| 序号 | 功能键 | 状态 | 功能说明 | 指令/触发方式 |", "| --- | --- | --- | --- | --- |", ] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) status_text = "启用 ✅" if status == PermissionStatus.ENABLED else "关闭 ❌" title, cmd_hint = self._split_feature_description(feature.description) lines.append( f"| {feature.value} | `{feature.name}` | {status_text} | {title} | `{cmd_hint}` |" ) return "\n".join(lines) async def _send_menu_content( self, bot: WechatAPIClient, target: str, sender: str, revoke: MessageAutoRevoke, text_content: str, markdown_content: Optional[str] = None, revoke_seconds: int = 90, ) -> None: """按配置发送菜单内容(文本或图片)。 发送策略: 1. `output_mode=text`:走历史文本发送逻辑,并登记自动撤回; 2. `output_mode=image`:先用 md2image 渲染图片后发送; 3. 图片失败时根据 `image_fallback_to_text` 决定是否回退文本。 """ # 文本模式:保持原有行为,避免影响已有群聊使用习惯。 if self.menu_output_mode != "image": client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) return # 图片模式:将菜单 Markdown 转图后发送。 md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```" output_image = f"robot_menu_{int(time.time() * 1000)}.png" try: # 增加总超时保护,防止图片渲染阻塞主流程过久。 total_timeout = max(30, self.image_render_timeout_seconds * max(1, self.image_render_retries) + 10) image_path = await asyncio.wait_for( convert_md_str_to_image( md_content, output_image, max_retries=max(1, self.image_render_retries), render_timeout_seconds=max(10, self.image_render_timeout_seconds), html_timeout_seconds=min(30, max(10, self.image_render_timeout_seconds)), ), timeout=total_timeout, ) await bot.send_image_message(target, Path(image_path)) except Exception as e: self.LOG.error(f"[{self.name}] 菜单图片发送失败: {e}") if self.image_fallback_to_text: client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) else: # 严格图片模式下不回退完整菜单文本,仅发送简短失败提示。 client_msg_id, create_time, new_msg_id = await bot.send_text_message( target, "❌ 菜单图片生成失败,请稍后重试", sender ) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30) def at_list(self, xml): try: root = ET.fromstring(xml) atuserlist_element = root.find('.//atuserlist') atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() atuserlist_content_no_commas = atuserlist_content.strip(',') atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) atuserlist_set = set(atuserlist_content_no_commas.split(',')) atuserlist_set.discard('') return atuserlist_set except ET.ParseError: return set() @plugin_stats_decorator(plugin_name="机器人菜单") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") command = content.split(" ")[0] gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") revoke: MessageAutoRevoke = message.get("revoke") wx_msg = message.get("full_wx_msg") xml = wx_msg.msg_source if wx_msg else "" if not bot: self.LOG.error("WechatAPIClient 未初始化") return False, "Bot 未初始化" # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" target = roomid if roomid else sender # 检查命令格式 parts = content.split() if len(parts) == 1: # 显示功能菜单 menu_text = self.get_enabled_features(roomid if roomid else sender) menu_markdown = self._build_feature_status_markdown(roomid if roomid else sender) await self._send_menu_content( bot=bot, target=target, sender=sender, revoke=revoke, text_content=menu_text, markdown_content=menu_markdown, revoke_seconds=90, ) return True, "显示功能菜单" # 提取命令名 cmd_name = content[len(command):].strip() try: # 处理特殊命令 if cmd_name.upper() == "状态": # 显示所有功能状态 status_text = self.display_menu_status(roomid if roomid else sender) status_markdown = self._build_feature_status_markdown(roomid if roomid else sender) await self._send_menu_content( bot=bot, target=target, sender=sender, revoke=revoke, text_content=status_text, markdown_content=status_markdown, revoke_seconds=90, ) return True, "显示功能状态" # 处理群列表命令 if cmd_name.upper() == "群列表": group_list_text = self.get_group_list() client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, group_list_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "显示群列表" # 处理功能启用/关闭命令 # 格式:菜单 启用 功能名 或 菜单 关闭 功能名 if len(parts) >= 3 and parts[1] in ["启用", "关闭"]: # 检查管理员权限 group_id_for_perm = roomid if roomid else sender if not GroupBotManager.is_admin_for_group(sender, group_id_for_perm): await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender]) return True, "权限不足" action = parts[1] feature_identifier = " ".join(parts[2:]) # 支持带空格的功能名 # 构造命令字符串,调用 GroupBotManager.handle_command # 尝试作为序号 if feature_identifier.isdigit(): command_str = f"{feature_identifier}-{action}" else: # 尝试作为功能名 # 查找匹配的功能枚举 matched_feature = None for feature in Feature: # 尝试通过枚举名称匹配 if feature.name.lower() == feature_identifier.lower(): matched_feature = feature break # 尝试通过序号匹配 if str(feature.value) == feature_identifier: matched_feature = feature break if matched_feature: command_str = f"{matched_feature.value}-{action}" else: # 直接使用功能标识符(可能是序号或功能名) command_str = f"{feature_identifier}-{action}" # 调用 GroupBotManager 处理命令 result = gbm.handle_command(roomid if roomid else sender, command_str) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, result, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "处理功能命令" if len(parts) >= 2 and parts[1] == "管理员": if not GroupBotManager.is_admin(sender): await bot.send_at_message(target, "❌权限不足,只有全局管理员可以管理群管理员", [sender]) return True, "权限不足" group_id = roomid if roomid else sender if len(parts) < 3: help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "管理员命令帮助" admin_action = parts[2] if admin_action == "添加" and len(parts) >= 4: at_users = self.at_list(xml) if xml else set() if len(at_users) == 1: new_admin_id = next(iter(at_users)) GroupBotManager.add_group_admin(group_id, new_admin_id) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "添加群管理员" candidate = parts[3] resolved_id = candidate if roomid: members = ContactManager.get_instance().get_group_members(roomid) or {} for wxid, nick in members.items(): if nick == candidate: resolved_id = wxid break GroupBotManager.add_group_admin(group_id, resolved_id) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "添加群管理员" if admin_action == "删除" and len(parts) >= 4: at_users = self.at_list(xml) if xml else set() if len(at_users) == 1: del_admin_id = next(iter(at_users)) ok = GroupBotManager.remove_group_admin(group_id, del_admin_id) msg = "已删除群管理员" if ok else "该管理员不在清单中" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "删除群管理员" candidate = parts[3] resolved_id = candidate if roomid: members = ContactManager.get_instance().get_group_members(roomid) or {} for wxid, nick in members.items(): if nick == candidate: resolved_id = wxid break ok = GroupBotManager.remove_group_admin(group_id, resolved_id) msg = "已删除群管理员" if ok else "该管理员不在清单中" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "删除群管理员" if admin_action in ["列表", "清单"]: admins = GroupBotManager.get_group_admins(group_id) list_text = "群管理员列表为空" if not admins else "\n".join(admins) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, list_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "列出群管理员" help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "管理员命令帮助" # 如果是其他未知命令,显示帮助 help_text = f"❌命令格式错误!\n{self.command_format}" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "命令格式错误" except Exception as e: self.LOG.error(f"处理功能请求出错: {e}") import traceback self.LOG.error(traceback.format_exc()) return False, f"处理出错: {e}"