import asyncio import html import os import time from pathlib import Path from typing import Dict, Any, List, Optional, Tuple from loguru import logger import re import xml.etree.ElementTree as ET from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.wechat.contact_manager import ContactManager from wechat_ipad import WechatAPIClient class RobotMenuPlugin(MessagePluginInterface): """功能菜单插件""" # 功能权限常量 FEATURE_KEY = "ROBOT_MENU" FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 - 显示功能菜单 | 菜单 状态 - 显示功能状态]" @property def name(self) -> str: return "功能菜单" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供功能菜单功能,支持查看和使用机器人功能" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") # 从配置文件加载命令列表 self._commands = self._config.get("RobotMenu", {}).get("command", ["菜单", "功能"]) self.command_format = self._config.get("RobotMenu", {}).get("command-format", "菜单 功能名") self.enable = self._config.get("RobotMenu", {}).get("enable", True) # 输出模式配置:支持 text / image 两种模式。 # 业务诉求是“菜单默认发图片”,因此配置文件默认会写 image; # 为兼容历史配置,这里做兜底归一化,非法值统一降级到 text。 self.menu_output_mode = self._normalize_output_mode( self._config.get("RobotMenu", {}).get("output_mode", "text") ) # 图片生成失败时是否回退到文本: # - True:优先保证消息可达; # - False:严格按“只发图片”执行(失败时仅提示失败原因)。 self.image_fallback_to_text = bool( self._config.get("RobotMenu", {}).get("image_fallback_to_text", False) ) # 菜单图片渲染超时与重试配置,避免极端环境下长时间阻塞消息处理协程。 self.image_render_timeout_seconds = int( self._config.get("RobotMenu", {}).get("image_render_timeout_seconds", 45) ) self.image_render_retries = int( self._config.get("RobotMenu", {}).get("image_render_retries", 1) ) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands def display_menu_status(self, group_id: str) -> str: """显示所有功能列表及其在指定群组中的当前状态,带emoji""" menu = [] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) status_emoji = "✅" if status == PermissionStatus.ENABLED else "❌" status_str = "启用" if status == PermissionStatus.ENABLED else "关闭" menu.append(f"{status_emoji} {status_str}-{feature.value}-{feature.description}") return "\n".join(menu) def get_enabled_features(self, group_id: str) -> str: """获取某个群已启用的功能列表及其描述,并返回格式化的字符串 只返回描述中包含指令(方括号[])的功能 Args: group_id: 群ID Returns: str: 格式化的已启用功能列表字符串 """ enabled_features = [] # 检查群是否在列表中 if group_id not in GroupBotManager.local_cache["group_list"]: return "该群未启用机器人功能" # 遍历所有功能,检查哪些已启用且包含指令 for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) # 只添加已启用且描述中包含方括号的功能 if status == PermissionStatus.ENABLED and "[" in feature.description and "]" in feature.description: enabled_features.append({ "id": feature.value, "name": feature.name, "description": feature.description }) # 如果没有启用任何带指令的功能 if not enabled_features: return "该群未启用任何带指令的功能" # 构建格式化的字符串 result = f"不支持@交互,请通过指令触发\n 群功能菜单:\n" for feature in enabled_features: result += f"{feature['id']}.{feature['description']}\n" return result def get_group_list(self) -> str: """返回所有启用了群机器人的群组清单""" group_list = list(GroupBotManager.local_cache["group_list"]) if not group_list: return "当前没有启用机器人的群组" return "\n".join(group_list) @staticmethod def _normalize_output_mode(raw_mode: Any) -> str: """将配置中的输出模式标准化为 text 或 image。""" mode = str(raw_mode or "").strip().lower() if mode in {"image", "img", "picture", "pic"}: return "image" return "text" @staticmethod def _split_feature_description(feature_desc: str) -> Tuple[str, str]: """拆分功能描述为“功能说明”和“指令说明”。 约定: 1. `Feature.description` 中使用 `[...]` 包裹指令或触发方式; 2. 若未配置方括号,则指令说明使用“无”占位,保证菜单结构稳定。 """ desc = str(feature_desc or "").strip() match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc) if not match: return desc or "未命名功能", "无" title = (match.group(1) or "").strip() or "未命名功能" cmd_hint = (match.group(2) or "").strip() or "无" return title, cmd_hint def _build_feature_status_markdown(self, group_id: str) -> str: """构建菜单图片使用的 Markdown 文本。 输出目标: 1. 逐项列出每个插件(Feature)当前状态; 2. 明确展示每个插件的功能描述与指令/触发方式; 3. 让用户在一张图片里即可完成“查看状态 + 查指令”。 """ lines = [ "# 机器人功能菜单", "", f"- 目标:`{group_id}`", f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`", "", "| 序号 | 功能键 | 状态 | 功能说明 | 指令/触发方式 |", "| --- | --- | --- | --- | --- |", ] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) status_text = "启用 ✅" if status == PermissionStatus.ENABLED else "关闭 ❌" title, cmd_hint = self._split_feature_description(feature.description) lines.append( f"| {feature.value} | `{feature.name}` | {status_text} | {title} | `{cmd_hint}` |" ) return "\n".join(lines) def _get_feature_command_examples(self, feature: Feature) -> str: """返回功能的详细指令示例文本。 设计说明: 1. `Feature.description` 中的 `[]` 内容通常是“触发指令”或“触发方式”; 2. 当某些功能没有显式指令时,给出“自动触发/定时触发”提示,避免用户误以为缺失; 3. 所有功能统一补充“启用/关闭管理命令”,让用户知道如何控制开关。 """ _, cmd_hint = self._split_feature_description(feature.description) # 统一的功能开关命令模板:管理员可通过序号或功能键控制开关。 manage_hint = ( f"管理:菜单 启用 {feature.value} | 菜单 关闭 {feature.value} | " f"菜单 启用 {feature.name} | 菜单 关闭 {feature.name}" ) if cmd_hint == "无": return f"触发:自动/定时触发(无直接聊天指令)
{manage_hint}" return f"触发:{html.escape(cmd_hint)}
{manage_hint}" def _build_feature_status_html(self, group_id: str) -> str: """构建菜单图片使用的 HTML 文本(自定义视觉,不复用 md2image 默认样式)。 目标: 1. 清晰分区:基础命令、管理员命令、功能明细; 2. 每个功能独立卡片,展示状态、用途、触发指令与管理命令; 3. 页面在移动端宽度下也可完整展示,减少截图后拥挤感。 """ feature_cards: List[str] = [] for feature in Feature: status = GroupBotManager.get_group_permission(group_id, feature) enabled = status == PermissionStatus.ENABLED status_class = "badge-on" if enabled else "badge-off" status_text = "已启用" if enabled else "已关闭" title, _ = self._split_feature_description(feature.description) command_examples = self._get_feature_command_examples(feature) feature_cards.append( f"""
#{feature.value}

{html.escape(title)}

功能键:{html.escape(feature.name)}

{status_text}

{command_examples}

""" ) now_text = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return f"""

机器人功能菜单

目标群/会话:{html.escape(group_id)}

生成时间:{now_text}

基础命令

  • 菜单:查看完整功能菜单(状态 + 详细指令)
  • 菜单 状态:查看所有功能当前启用状态
  • 菜单 群列表:查看已启用群机器人的群组清单

管理员命令

  • 菜单 启用 序号 / 菜单 关闭 序号
  • 菜单 启用 功能键 / 菜单 关闭 功能键
  • 菜单 管理员 添加 wxid/昵称菜单 管理员 删除 wxid/昵称菜单 管理员 列表

功能明细

{''.join(feature_cards)}
""" async def _send_menu_content( self, bot: WechatAPIClient, target: str, sender: str, revoke: MessageAutoRevoke, text_content: str, markdown_content: Optional[str] = None, html_content: Optional[str] = None, revoke_seconds: int = 90, ) -> None: """按配置发送菜单内容(文本或图片)。 发送策略: 1. `output_mode=text`:走历史文本发送逻辑,并登记自动撤回; 2. `output_mode=image`:先用 md2image 渲染图片后发送; 3. 图片失败时根据 `image_fallback_to_text` 决定是否回退文本。 """ # 文本模式:保持原有行为,避免影响已有群聊使用习惯。 if self.menu_output_mode != "image": client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) return # 图片模式:优先使用“自定义 HTML 模板”渲染图片,不使用 md2image 默认 Markdown 样式。 # 兼容策略: # 1. 若调用方传了 html_content,则按该模板直接截图; # 2. 若未传 html_content,才回退到 Markdown 转图(作为兜底,不影响稳定性)。 md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```" output_image = f"robot_menu_{int(time.time() * 1000)}.png" try: # 增加总超时保护,防止图片渲染阻塞主流程过久。 total_timeout = max(30, self.image_render_timeout_seconds * max(1, self.image_render_retries) + 10) output_dir = Path(os.getcwd()) / "temp" / "md2image" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / output_image if html_content and html_content.strip(): await asyncio.wait_for( html_to_image(html_content, str(output_path)), timeout=total_timeout, ) image_path = str(output_path.resolve()) else: image_path = await asyncio.wait_for( convert_md_str_to_image( md_content, output_image, max_retries=max(1, self.image_render_retries), render_timeout_seconds=max(10, self.image_render_timeout_seconds), html_timeout_seconds=min(30, max(10, self.image_render_timeout_seconds)), ), timeout=total_timeout, ) await bot.send_image_message(target, Path(image_path)) except Exception as e: self.LOG.error(f"[{self.name}] 菜单图片发送失败: {e}") if self.image_fallback_to_text: client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) else: # 严格图片模式下不回退完整菜单文本,仅发送简短失败提示。 client_msg_id, create_time, new_msg_id = await bot.send_text_message( target, "❌ 菜单图片生成失败,请稍后重试", sender ) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30) def at_list(self, xml): try: root = ET.fromstring(xml) atuserlist_element = root.find('.//atuserlist') atuserlist_content = (atuserlist_element.text if atuserlist_element is not None else '').strip() atuserlist_content_no_commas = atuserlist_content.strip(',') atuserlist_content_no_commas = re.sub(r'\s+', '', atuserlist_content_no_commas) atuserlist_set = set(atuserlist_content_no_commas.split(',')) atuserlist_set.discard('') return atuserlist_set except ET.ParseError: return set() @plugin_stats_decorator(plugin_name="机器人菜单") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") command = content.split(" ")[0] gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") revoke: MessageAutoRevoke = message.get("revoke") wx_msg = message.get("full_wx_msg") xml = wx_msg.msg_source if wx_msg else "" if not bot: self.LOG.error("WechatAPIClient 未初始化") return False, "Bot 未初始化" # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" target = roomid if roomid else sender # 检查命令格式 parts = content.split() if len(parts) == 1: # 显示功能菜单 menu_text = self.get_enabled_features(roomid if roomid else sender) menu_markdown = self._build_feature_status_markdown(roomid if roomid else sender) menu_html = self._build_feature_status_html(roomid if roomid else sender) await self._send_menu_content( bot=bot, target=target, sender=sender, revoke=revoke, text_content=menu_text, markdown_content=menu_markdown, html_content=menu_html, revoke_seconds=90, ) return True, "显示功能菜单" # 提取命令名 cmd_name = content[len(command):].strip() try: # 处理特殊命令 if cmd_name.upper() == "状态": # 显示所有功能状态 status_text = self.display_menu_status(roomid if roomid else sender) status_markdown = self._build_feature_status_markdown(roomid if roomid else sender) status_html = self._build_feature_status_html(roomid if roomid else sender) await self._send_menu_content( bot=bot, target=target, sender=sender, revoke=revoke, text_content=status_text, markdown_content=status_markdown, html_content=status_html, revoke_seconds=90, ) return True, "显示功能状态" # 处理群列表命令 if cmd_name.upper() == "群列表": group_list_text = self.get_group_list() client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, group_list_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "显示群列表" # 处理功能启用/关闭命令 # 格式:菜单 启用 功能名 或 菜单 关闭 功能名 if len(parts) >= 3 and parts[1] in ["启用", "关闭"]: # 检查管理员权限 group_id_for_perm = roomid if roomid else sender if not GroupBotManager.is_admin_for_group(sender, group_id_for_perm): await bot.send_at_message(target, "❌权限不足,只有管理员可以管理功能", [sender]) return True, "权限不足" action = parts[1] feature_identifier = " ".join(parts[2:]) # 支持带空格的功能名 # 构造命令字符串,调用 GroupBotManager.handle_command # 尝试作为序号 if feature_identifier.isdigit(): command_str = f"{feature_identifier}-{action}" else: # 尝试作为功能名 # 查找匹配的功能枚举 matched_feature = None for feature in Feature: # 尝试通过枚举名称匹配 if feature.name.lower() == feature_identifier.lower(): matched_feature = feature break # 尝试通过序号匹配 if str(feature.value) == feature_identifier: matched_feature = feature break if matched_feature: command_str = f"{matched_feature.value}-{action}" else: # 直接使用功能标识符(可能是序号或功能名) command_str = f"{feature_identifier}-{action}" # 调用 GroupBotManager 处理命令 result = gbm.handle_command(roomid if roomid else sender, command_str) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, result, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "处理功能命令" if len(parts) >= 2 and parts[1] == "管理员": if not GroupBotManager.is_admin(sender): await bot.send_at_message(target, "❌权限不足,只有全局管理员可以管理群管理员", [sender]) return True, "权限不足" group_id = roomid if roomid else sender if len(parts) < 3: help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "管理员命令帮助" admin_action = parts[2] if admin_action == "添加" and len(parts) >= 4: at_users = self.at_list(xml) if xml else set() if len(at_users) == 1: new_admin_id = next(iter(at_users)) GroupBotManager.add_group_admin(group_id, new_admin_id) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "添加群管理员" candidate = parts[3] resolved_id = candidate if roomid: members = ContactManager.get_instance().get_group_members(roomid) or {} for wxid, nick in members.items(): if nick == candidate: resolved_id = wxid break GroupBotManager.add_group_admin(group_id, resolved_id) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, "已添加群管理员", sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "添加群管理员" if admin_action == "删除" and len(parts) >= 4: at_users = self.at_list(xml) if xml else set() if len(at_users) == 1: del_admin_id = next(iter(at_users)) ok = GroupBotManager.remove_group_admin(group_id, del_admin_id) msg = "已删除群管理员" if ok else "该管理员不在清单中" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "删除群管理员" candidate = parts[3] resolved_id = candidate if roomid: members = ContactManager.get_instance().get_group_members(roomid) or {} for wxid, nick in members.items(): if nick == candidate: resolved_id = wxid break ok = GroupBotManager.remove_group_admin(group_id, resolved_id) msg = "已删除群管理员" if ok else "该管理员不在清单中" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, msg, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "删除群管理员" if admin_action in ["列表", "清单"]: admins = GroupBotManager.get_group_admins(group_id) list_text = "群管理员列表为空" if not admins else "\n".join(admins) client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, list_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "列出群管理员" help_text = "管理员命令:菜单 管理员 添加 wxid | 菜单 管理员 删除 wxid | 菜单 管理员 列表" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "管理员命令帮助" # 如果是其他未知命令,显示帮助 help_text = f"❌命令格式错误!\n{self.command_format}" client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, help_text, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 90) return True, "命令格式错误" except Exception as e: self.LOG.error(f"处理功能请求出错: {e}") import traceback self.LOG.error(traceback.format_exc()) return False, f"处理出错: {e}"