import asyncio import os import re import time from pathlib import Path from typing import Any, Optional, Tuple from loguru import logger as default_logger from base.plugin_common.plugin_manager import PluginManager from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus from utils.html_template_renderer import HtmlTemplateRenderer from wechat_ipad import WechatAPIClient class RobotMenuRenderTool: """机器人菜单渲染工具。 设计目标: 1. 将“菜单排版 + 图片渲染 + 发送策略”从主插件拆出,降低 main.py 维护成本; 2. 统一菜单文本/图片输出行为,保证菜单与状态页展示一致; 3. 对外只暴露清晰方法,主插件只负责“取业务数据 + 调用工具”。 """ def __init__( self, output_mode: Any, image_fallback_to_text: bool, image_render_timeout_seconds: int, image_render_retries: int, sync_send_timeout_seconds: int, image_template_path: str, log=default_logger, ): # 输出模式:支持 text / image,非法值自动归一化为 text。 self.output_mode = self.normalize_output_mode(output_mode) # 图片失败时是否回退文本。 self.image_fallback_to_text = bool(image_fallback_to_text) # 渲染超时与重试参数,统一集中在工具层处理。 self.image_render_timeout_seconds = int(image_render_timeout_seconds) self.image_render_retries = int(image_render_retries) # 同步发送超时预算: # 1. “菜单”属于即时交互命令,不能像离线任务一样长时间占住消息处理协程; # 2. 因此这里单独维护一个“主链路最多等多久”的预算,超时后立即进入降级逻辑; # 3. 图片渲染器即便本身还能继续尝试,也不允许把主消息链路拖成几十秒假死。 self.sync_send_timeout_seconds = max(8, int(sync_send_timeout_seconds or 18)) # 注入日志对象,便于主插件统一控制日志风格与输出目标。 self.log = log or default_logger # 菜单图片模板路径(相对仓库根目录),支持仅改模板文件完成 UI 更新。 self.image_template_path = str(image_template_path or "").strip() or "plugins/robot_menu/templates/menu_cards.html" self.template_renderer = HtmlTemplateRenderer() @staticmethod def normalize_output_mode(raw_mode: Any) -> str: """将配置中的输出模式标准化为 `text` 或 `image`。""" mode = str(raw_mode or "").strip().lower() if mode in {"image", "img", "picture", "pic"}: return "image" return "text" @staticmethod def _split_feature_description(feature_desc: str) -> Tuple[str, str]: """拆分功能描述为“功能说明”和“触发指令提示”。 约定: 1. 描述中 `[]` 内的文本作为“触发说明”; 2. 无 `[]` 时返回“无”,保证渲染模板字段完整。 """ desc = str(feature_desc or "").strip() match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc) if not match: return desc or "未命名功能", "无" title = (match.group(1) or "").strip() or "未命名功能" cmd_hint = (match.group(2) or "").strip() or "无" return title, cmd_hint @staticmethod def _extract_command_tokens(cmd_hint: str) -> list[str]: """从描述里的指令片段提取命令 token 列表。 规则: 1. 按 `|`、`,`、`,`、`/`、`、` 切分; 2. 去重并保序; 3. 清理多余空白,便于后续生成“如何使用”示例。 """ raw = str(cmd_hint or "").strip() if not raw or raw == "无": return [] parts = re.split(r"[|,,/、]+", raw) tokens: list[str] = [] for part in parts: token = re.sub(r"\s+", " ", str(part or "").strip()) if token and token not in tokens: tokens.append(token) return tokens def _build_feature_usage_lines(self, feature: Feature) -> list[str]: """构建单个功能的“怎么用”文案(纯用户视角,不含管理员控制语句)。""" title, cmd_hint = self._split_feature_description(feature.description) tokens = self._extract_command_tokens(cmd_hint) lines = [f"{feature.value}. {title}"] if not tokens: lines.append(" 用法:自动/定时触发,无需手动指令") return lines # 第一行给出最短触发命令,后续补充可选别名,信息更丰富但仍保持紧凑。 lines.append(f" 指令:{tokens[0]}") if len(tokens) > 1: lines.append(f" 别名:{' / '.join(tokens[1:4])}") # 给一个通用可替换参数提示,帮助用户快速落地命令输入。 if any(k in tokens[0] for k in ["@", "积分", "关键词", "日期", "功能名"]): lines.append(" 提示:按指令中的占位词替换为实际内容后发送") return lines def _iter_user_command_features(self) -> list[Feature]: """返回菜单要展示的功能集合。 说明: 1. 按用户最新要求“不要管之前限制”,这里不再做仅指令功能过滤; 2. 全量展示 Feature,保证用户在一张图里看到完整功能地图; 3. 没有显式指令的功能,在卡片中标注为“自动/定时触发”。 """ return list(Feature) def build_feature_status_markdown(self, group_id: str) -> str: """构建菜单 Markdown(全量功能 + 状态 + 指令/触发方式)。""" user_features = self._iter_user_command_features() lines = [ "# 机器人功能菜单", "", f"- 目标:`{group_id}`", f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`", "", "## 功能与指令", ] for feature in user_features: status = GroupBotManager.get_group_permission(group_id, feature) status_text = "开启" if status == PermissionStatus.ENABLED else "关闭" lines.extend(self._build_feature_usage_lines(feature)) lines.append(f" 状态:{status_text}") lines.append("") return "\n".join(lines) def _build_feature_command_payload(self, feature: Feature) -> dict: """构建功能卡片中的命令展示数据。""" _, cmd_hint = self._split_feature_description(feature.description) tokens = self._extract_command_tokens(cmd_hint) if not tokens: return { "is_auto": True, "primary": "", "aliases": [], "tip": "自动/定时触发,无需发送命令", } return { "is_auto": False, "primary": tokens[0], "aliases": tokens[1:4], "tip": "", } def build_feature_status_html(self, group_id: str) -> str: """基于外部 HTML 模板构建菜单页面。""" user_features = self._iter_user_command_features() feature_cards = [] enabled_count = 0 for feature in user_features: title, _ = self._split_feature_description(feature.description) status = GroupBotManager.get_group_permission(group_id, feature) is_enabled = status == PermissionStatus.ENABLED if is_enabled: enabled_count += 1 command_payload = self._build_feature_command_payload(feature) feature_cards.append( { "index": int(feature.value), "title": title, "feature_key": str(feature.name), "status_text": "开启" if is_enabled else "关闭", "status_class": "status-on" if is_enabled else "status-off", "command": command_payload, } ) now_text = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) return self.template_renderer.render( self.image_template_path, { "group_id": str(group_id or ""), "now_text": now_text, "feature_total": len(user_features), "enabled_count": enabled_count, "feature_cards": feature_cards, }, ) @staticmethod def _get_plugin_manager() -> PluginManager: """获取当前运行中的插件管理器单例。""" return PluginManager.get_instance() @staticmethod def _resolve_snapshot_group_status(snapshot: dict, group_id: str) -> dict: """解析插件在当前群里的可用状态。 规则说明: 1. 插件必须先处于 RUNNING,才可能被认为“可用”; 2. 若插件支持群级开关,则继续读取该群的 feature 权限; 3. 若插件没有群级开关,则视为“运行即全局可用”。 """ normalized_snapshot = dict(snapshot or {}) status = str(normalized_snapshot.get("status") or "").strip().upper() supports_group_switch = bool(normalized_snapshot.get("supports_group_switch")) feature_key = str(normalized_snapshot.get("feature_key") or "").strip() if status != "RUNNING": return { "available": False, "reason": "插件未运行", "reason_code": "plugin_not_running", } if not group_id or not supports_group_switch or not feature_key: return { "available": True, "reason": "全局可用", "reason_code": "global_available", } feature = Feature.get_feature(feature_key) if feature is None: return { "available": True, "reason": "未绑定群级开关,按运行中处理", "reason_code": "feature_not_registered", } permission = GroupBotManager.get_group_permission(group_id, feature) if permission == PermissionStatus.ENABLED: return { "available": True, "reason": "本群已启用", "reason_code": "group_enabled", } return { "available": False, "reason": "本群未启用", "reason_code": "group_disabled", } @staticmethod def _format_plugin_command(example_command: str, command_prefix: str) -> str: """把插件命令和前缀拼成最终展示文本。""" prefix = str(command_prefix or "").strip() command = str(example_command or "").strip() if not prefix: return command return f"{prefix}{command}" def _build_plugin_command_entry(self, snapshot: dict, group_id: str) -> Optional[dict]: """把插件快照转换为菜单可展示的命令项。""" normalized_snapshot = dict(snapshot or {}) commands = list(normalized_snapshot.get("commands", []) or []) plugin_types = list(normalized_snapshot.get("plugin_types", []) or []) if not commands and "scheduled" not in plugin_types: return None availability = self._resolve_snapshot_group_status(normalized_snapshot, group_id) command_prefix = str(normalized_snapshot.get("command_prefix") or "").strip() primary_command = self._format_plugin_command(commands[0], command_prefix) if commands else "" alias_commands = [ self._format_plugin_command(command_text, command_prefix) for command_text in commands[1:4] if str(command_text or "").strip() ] if "message" in plugin_types: category = "message" category_label = "消息指令" elif "scheduled" in plugin_types: category = "scheduled" category_label = "自动任务" else: category = "generic" category_label = "通用能力" return { "name": str(normalized_snapshot.get("name") or "").strip(), "module_name": str(normalized_snapshot.get("module_name") or "").strip(), "description": str(normalized_snapshot.get("description") or "").strip() or "暂无描述", "category": category, "category_label": category_label, "commands": commands, "primary_command": primary_command, "alias_commands": alias_commands, "supports_group_switch": bool(normalized_snapshot.get("supports_group_switch")), "feature_key": str(normalized_snapshot.get("feature_key") or "").strip(), "available": bool(availability.get("available")), "availability_reason": str(availability.get("reason") or "").strip(), "availability_code": str(availability.get("reason_code") or "").strip(), "status_label": str(normalized_snapshot.get("status_label") or "").strip(), } def _collect_command_catalog(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict: """采集当前群和当前身份视角下的命令清单。 输出结构分三层: 1. 普通用户可直接用的命令; 2. 自动/定时能力; 3. 管理员附加能力与未启用项。 """ plugin_manager = self._get_plugin_manager() snapshots = plugin_manager.get_plugin_snapshots() if force_admin is None: is_admin = bool(GroupBotManager.is_admin_for_group(requester_id, group_id)) if group_id else bool(GroupBotManager.is_admin(requester_id)) else: is_admin = bool(force_admin) available_manual = [] available_auto = [] unavailable_manual = [] for snapshot in snapshots: entry = self._build_plugin_command_entry(snapshot, group_id) if not entry: continue if entry["category"] == "scheduled": if entry["available"]: available_auto.append(entry) continue if entry["available"]: available_manual.append(entry) else: unavailable_manual.append(entry) available_manual.sort(key=lambda item: (item["category"], item["name"], item["primary_command"])) available_auto.sort(key=lambda item: (item["name"], item["primary_command"])) unavailable_manual.sort(key=lambda item: (item["availability_code"], item["name"])) admin_commands = [] if is_admin: admin_commands = [ {"title": "查看功能状态", "example": "菜单 状态", "description": "查看当前群所有功能开关状态"}, {"title": "启用某个功能", "example": "菜单 启用 功能序号", "description": "按菜单序号启用某项功能"}, {"title": "关闭某个功能", "example": "菜单 关闭 功能序号", "description": "按菜单序号关闭某项功能"}, {"title": "查看群管理员", "example": "菜单 管理员 列表", "description": "查看当前群管理员清单"}, {"title": "添加群管理员", "example": "菜单 管理员 添加 @某人", "description": "把某个群成员加入本群管理员"}, {"title": "删除群管理员", "example": "菜单 管理员 删除 @某人", "description": "移除某个群管理员"}, ] if GroupBotManager.is_admin(requester_id): admin_commands.append( {"title": "查看托管群列表", "example": "菜单 群列表", "description": "查看所有已启用机器人的群"} ) return { "group_id": str(group_id or "").strip(), "requester_id": str(requester_id or "").strip(), "is_admin": is_admin, "available_manual": available_manual, "available_auto": available_auto, "unavailable_manual": unavailable_manual, "admin_commands": admin_commands, "generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), } def build_command_catalog_data(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict: """对外暴露统一的命令目录结构,供机器人菜单和后台页面共同复用。""" return self._collect_command_catalog(group_id, requester_id, force_admin=force_admin) def build_command_catalog_text(self, group_id: str, requester_id: str) -> str: """构建适合直接发送给用户的文本版命令清单。""" catalog = self.build_command_catalog_data(group_id, requester_id) lines = [ "📚 当前群指令清单", f"群ID:{catalog['group_id'] or '私聊'}", f"生成时间:{catalog['generated_at']}", "", "一、当前可直接使用的命令", ] if catalog["available_manual"]: for item in catalog["available_manual"]: lines.append(f"【{item['name']}】{item['description']}") if item["primary_command"]: lines.append(f"主指令:{item['primary_command']}") if item["alias_commands"]: lines.append(f"别名:{' / '.join(item['alias_commands'])}") lines.append("") else: lines.append("当前没有可直接使用的手动命令") lines.append("") lines.append("二、自动/定时能力") if catalog["available_auto"]: for item in catalog["available_auto"]: lines.append(f"【{item['name']}】{item['description']}") lines.append("触发方式:自动或定时运行") lines.append("") else: lines.append("当前没有已启用的自动能力") lines.append("") if catalog["is_admin"]: lines.append("三、管理员额外可见") if catalog["unavailable_manual"]: lines.append("未启用或暂不可用命令:") for item in catalog["unavailable_manual"]: primary = item["primary_command"] or "无手动指令" lines.append(f"- {item['name']}:{primary}({item['availability_reason']})") lines.append("") else: lines.append("当前没有未启用的命令项") lines.append("") lines.append("管理命令:") for item in catalog["admin_commands"]: lines.append(f"- {item['example']}:{item['description']}") lines.append("") lines.append("提示:发送“菜单”查看功能开关;发送“菜单 状态”查看本群功能状态。") return "\n".join(lines).strip() def build_command_catalog_markdown(self, group_id: str, requester_id: str) -> str: """构建适合图片渲染的 Markdown 版指令清单。""" catalog = self.build_command_catalog_data(group_id, requester_id) lines = [ "# 机器人指令清单", "", f"- 目标:`{catalog['group_id'] or '私聊'}`", f"- 生成时间:`{catalog['generated_at']}`", "", "## 当前可直接使用的命令", ] if catalog["available_manual"]: for item in catalog["available_manual"]: lines.append(f"### {item['name']}") lines.append(f"- 说明:{item['description']}") if item["primary_command"]: lines.append(f"- 主指令:`{item['primary_command']}`") if item["alias_commands"]: alias_text = " / ".join(f"`{alias}`" for alias in item["alias_commands"]) lines.append(f"- 别名:{alias_text}") lines.append("") else: lines.append("- 当前没有可直接使用的手动命令") lines.append("") lines.append("## 自动/定时能力") if catalog["available_auto"]: for item in catalog["available_auto"]: lines.append(f"- **{item['name']}**:{item['description']}") else: lines.append("- 当前没有已启用的自动能力") lines.append("") if catalog["is_admin"]: lines.append("## 管理员额外可见") if catalog["unavailable_manual"]: lines.append("### 未启用或暂不可用命令") for item in catalog["unavailable_manual"]: primary = item["primary_command"] or "无手动指令" lines.append(f"- **{item['name']}**:`{primary}`({item['availability_reason']})") lines.append("") lines.append("### 管理命令") for item in catalog["admin_commands"]: lines.append(f"- `{item['example']}`:{item['description']}") lines.append("") lines.append("> 提示:发送 `菜单` 查看功能开关;发送 `菜单 状态` 查看本群功能状态。") return "\n".join(lines) async def send_menu_content( self, bot: WechatAPIClient, target: str, sender: str, revoke: MessageAutoRevoke, text_content: str, markdown_content: Optional[str] = None, html_content: Optional[str] = None, revoke_seconds: int = 90, ) -> None: """按配置发送菜单内容(文本或图片)。""" if self.output_mode != "image": client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) return md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```" output_image = f"robot_menu_{int(time.time() * 1000)}.png" try: # 这里故意不再使用“渲染超时 * 重试次数 + 缓冲”的长预算: # 1. 菜单是即时命令,用户更在意“尽快拿到结果或降级结果”,而不是死等图片; # 2. 若沿用 45~55 秒预算,多个菜单命令会持续占住机器人并发槽位,放大成“整条插件链路卡住”; # 3. 因此统一按 sync_send_timeout_seconds 控制主链路等待时间,超时后快速回退。 total_timeout = max(8, int(self.sync_send_timeout_seconds or 18)) # Markdown 转图内部也要用更短预算,避免内外层超时完全重合,导致降级逻辑来不及执行。 html_budget_seconds = max(5, min(10, total_timeout - 4)) render_budget_seconds = max(6, total_timeout - 2) output_dir = Path(os.getcwd()) / "temp" / "md2image" output_dir.mkdir(parents=True, exist_ok=True) output_path = output_dir / output_image if html_content and html_content.strip(): await asyncio.wait_for( html_to_image(html_content, str(output_path)), timeout=total_timeout, ) image_path = str(output_path.resolve()) else: image_path = await asyncio.wait_for( convert_md_str_to_image( md_content, output_image, max_retries=max(1, self.image_render_retries), render_timeout_seconds=render_budget_seconds, html_timeout_seconds=html_budget_seconds, ), timeout=total_timeout, ) await bot.send_image_message(target, Path(image_path)) except Exception as e: self.log.error(f"[机器人菜单渲染工具] 菜单图片发送失败: {e}") if self.image_fallback_to_text: client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds) else: client_msg_id, create_time, new_msg_id = await bot.send_text_message( target, "❌ 菜单图片生成失败,请稍后重试", sender ) revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30)