Files
abot/plugins/robot_menu/menu_render_tool.py
Liu 1b6da6db1f 修复菜单插件超时拖慢主链路问题
1. 为菜单图片发送增加独立的同步等待预算,避免单次菜单命令长时间占用消息处理协程。

2. 调整菜单插件外层处理超时与文本回退空间,避免内外层超时重合导致降级逻辑来不及执行。

3. 修复 md2img 专用运行时在超时/取消时未显式取消后台任务的问题,减少渲染残留任务堆积。
2026-05-01 11:08:49 +08:00

537 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import re
import time
from pathlib import Path
from typing import Any, Optional, Tuple
from loguru import logger as default_logger
from base.plugin_common.plugin_manager import PluginManager
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
from utils.html_template_renderer import HtmlTemplateRenderer
from wechat_ipad import WechatAPIClient
class RobotMenuRenderTool:
"""机器人菜单渲染工具。
设计目标:
1. 将“菜单排版 + 图片渲染 + 发送策略”从主插件拆出,降低 main.py 维护成本;
2. 统一菜单文本/图片输出行为,保证菜单与状态页展示一致;
3. 对外只暴露清晰方法,主插件只负责“取业务数据 + 调用工具”。
"""
def __init__(
self,
output_mode: Any,
image_fallback_to_text: bool,
image_render_timeout_seconds: int,
image_render_retries: int,
sync_send_timeout_seconds: int,
image_template_path: str,
log=default_logger,
):
# 输出模式:支持 text / image非法值自动归一化为 text。
self.output_mode = self.normalize_output_mode(output_mode)
# 图片失败时是否回退文本。
self.image_fallback_to_text = bool(image_fallback_to_text)
# 渲染超时与重试参数,统一集中在工具层处理。
self.image_render_timeout_seconds = int(image_render_timeout_seconds)
self.image_render_retries = int(image_render_retries)
# 同步发送超时预算:
# 1. “菜单”属于即时交互命令,不能像离线任务一样长时间占住消息处理协程;
# 2. 因此这里单独维护一个“主链路最多等多久”的预算,超时后立即进入降级逻辑;
# 3. 图片渲染器即便本身还能继续尝试,也不允许把主消息链路拖成几十秒假死。
self.sync_send_timeout_seconds = max(8, int(sync_send_timeout_seconds or 18))
# 注入日志对象,便于主插件统一控制日志风格与输出目标。
self.log = log or default_logger
# 菜单图片模板路径(相对仓库根目录),支持仅改模板文件完成 UI 更新。
self.image_template_path = str(image_template_path or "").strip() or "plugins/robot_menu/templates/menu_cards.html"
self.template_renderer = HtmlTemplateRenderer()
@staticmethod
def normalize_output_mode(raw_mode: Any) -> str:
"""将配置中的输出模式标准化为 `text` 或 `image`。"""
mode = str(raw_mode or "").strip().lower()
if mode in {"image", "img", "picture", "pic"}:
return "image"
return "text"
@staticmethod
def _split_feature_description(feature_desc: str) -> Tuple[str, str]:
"""拆分功能描述为“功能说明”和“触发指令提示”。
约定:
1. 描述中 `[]` 内的文本作为“触发说明”;
2. 无 `[]` 时返回“无”,保证渲染模板字段完整。
"""
desc = str(feature_desc or "").strip()
match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc)
if not match:
return desc or "未命名功能", ""
title = (match.group(1) or "").strip() or "未命名功能"
cmd_hint = (match.group(2) or "").strip() or ""
return title, cmd_hint
@staticmethod
def _extract_command_tokens(cmd_hint: str) -> list[str]:
"""从描述里的指令片段提取命令 token 列表。
规则:
1. 按 `|`、``、`,`、`/`、`、` 切分;
2. 去重并保序;
3. 清理多余空白,便于后续生成“如何使用”示例。
"""
raw = str(cmd_hint or "").strip()
if not raw or raw == "":
return []
parts = re.split(r"[|,/、]+", raw)
tokens: list[str] = []
for part in parts:
token = re.sub(r"\s+", " ", str(part or "").strip())
if token and token not in tokens:
tokens.append(token)
return tokens
def _build_feature_usage_lines(self, feature: Feature) -> list[str]:
"""构建单个功能的“怎么用”文案(纯用户视角,不含管理员控制语句)。"""
title, cmd_hint = self._split_feature_description(feature.description)
tokens = self._extract_command_tokens(cmd_hint)
lines = [f"{feature.value}. {title}"]
if not tokens:
lines.append(" 用法:自动/定时触发,无需手动指令")
return lines
# 第一行给出最短触发命令,后续补充可选别名,信息更丰富但仍保持紧凑。
lines.append(f" 指令:{tokens[0]}")
if len(tokens) > 1:
lines.append(f" 别名:{' / '.join(tokens[1:4])}")
# 给一个通用可替换参数提示,帮助用户快速落地命令输入。
if any(k in tokens[0] for k in ["@", "积分", "关键词", "日期", "功能名"]):
lines.append(" 提示:按指令中的占位词替换为实际内容后发送")
return lines
def _iter_user_command_features(self) -> list[Feature]:
"""返回菜单要展示的功能集合。
说明:
1. 按用户最新要求“不要管之前限制”,这里不再做仅指令功能过滤;
2. 全量展示 Feature保证用户在一张图里看到完整功能地图
3. 没有显式指令的功能,在卡片中标注为“自动/定时触发”。
"""
return list(Feature)
def build_feature_status_markdown(self, group_id: str) -> str:
"""构建菜单 Markdown全量功能 + 状态 + 指令/触发方式)。"""
user_features = self._iter_user_command_features()
lines = [
"# 机器人功能菜单",
"",
f"- 目标:`{group_id}`",
f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`",
"",
"## 功能与指令",
]
for feature in user_features:
status = GroupBotManager.get_group_permission(group_id, feature)
status_text = "开启" if status == PermissionStatus.ENABLED else "关闭"
lines.extend(self._build_feature_usage_lines(feature))
lines.append(f" 状态:{status_text}")
lines.append("")
return "\n".join(lines)
def _build_feature_command_payload(self, feature: Feature) -> dict:
"""构建功能卡片中的命令展示数据。"""
_, cmd_hint = self._split_feature_description(feature.description)
tokens = self._extract_command_tokens(cmd_hint)
if not tokens:
return {
"is_auto": True,
"primary": "",
"aliases": [],
"tip": "自动/定时触发,无需发送命令",
}
return {
"is_auto": False,
"primary": tokens[0],
"aliases": tokens[1:4],
"tip": "",
}
def build_feature_status_html(self, group_id: str) -> str:
"""基于外部 HTML 模板构建菜单页面。"""
user_features = self._iter_user_command_features()
feature_cards = []
enabled_count = 0
for feature in user_features:
title, _ = self._split_feature_description(feature.description)
status = GroupBotManager.get_group_permission(group_id, feature)
is_enabled = status == PermissionStatus.ENABLED
if is_enabled:
enabled_count += 1
command_payload = self._build_feature_command_payload(feature)
feature_cards.append(
{
"index": int(feature.value),
"title": title,
"feature_key": str(feature.name),
"status_text": "开启" if is_enabled else "关闭",
"status_class": "status-on" if is_enabled else "status-off",
"command": command_payload,
}
)
now_text = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
return self.template_renderer.render(
self.image_template_path,
{
"group_id": str(group_id or ""),
"now_text": now_text,
"feature_total": len(user_features),
"enabled_count": enabled_count,
"feature_cards": feature_cards,
},
)
@staticmethod
def _get_plugin_manager() -> PluginManager:
"""获取当前运行中的插件管理器单例。"""
return PluginManager.get_instance()
@staticmethod
def _resolve_snapshot_group_status(snapshot: dict, group_id: str) -> dict:
"""解析插件在当前群里的可用状态。
规则说明:
1. 插件必须先处于 RUNNING才可能被认为“可用”
2. 若插件支持群级开关,则继续读取该群的 feature 权限;
3. 若插件没有群级开关,则视为“运行即全局可用”。
"""
normalized_snapshot = dict(snapshot or {})
status = str(normalized_snapshot.get("status") or "").strip().upper()
supports_group_switch = bool(normalized_snapshot.get("supports_group_switch"))
feature_key = str(normalized_snapshot.get("feature_key") or "").strip()
if status != "RUNNING":
return {
"available": False,
"reason": "插件未运行",
"reason_code": "plugin_not_running",
}
if not group_id or not supports_group_switch or not feature_key:
return {
"available": True,
"reason": "全局可用",
"reason_code": "global_available",
}
feature = Feature.get_feature(feature_key)
if feature is None:
return {
"available": True,
"reason": "未绑定群级开关,按运行中处理",
"reason_code": "feature_not_registered",
}
permission = GroupBotManager.get_group_permission(group_id, feature)
if permission == PermissionStatus.ENABLED:
return {
"available": True,
"reason": "本群已启用",
"reason_code": "group_enabled",
}
return {
"available": False,
"reason": "本群未启用",
"reason_code": "group_disabled",
}
@staticmethod
def _format_plugin_command(example_command: str, command_prefix: str) -> str:
"""把插件命令和前缀拼成最终展示文本。"""
prefix = str(command_prefix or "").strip()
command = str(example_command or "").strip()
if not prefix:
return command
return f"{prefix}{command}"
def _build_plugin_command_entry(self, snapshot: dict, group_id: str) -> Optional[dict]:
"""把插件快照转换为菜单可展示的命令项。"""
normalized_snapshot = dict(snapshot or {})
commands = list(normalized_snapshot.get("commands", []) or [])
plugin_types = list(normalized_snapshot.get("plugin_types", []) or [])
if not commands and "scheduled" not in plugin_types:
return None
availability = self._resolve_snapshot_group_status(normalized_snapshot, group_id)
command_prefix = str(normalized_snapshot.get("command_prefix") or "").strip()
primary_command = self._format_plugin_command(commands[0], command_prefix) if commands else ""
alias_commands = [
self._format_plugin_command(command_text, command_prefix)
for command_text in commands[1:4]
if str(command_text or "").strip()
]
if "message" in plugin_types:
category = "message"
category_label = "消息指令"
elif "scheduled" in plugin_types:
category = "scheduled"
category_label = "自动任务"
else:
category = "generic"
category_label = "通用能力"
return {
"name": str(normalized_snapshot.get("name") or "").strip(),
"module_name": str(normalized_snapshot.get("module_name") or "").strip(),
"description": str(normalized_snapshot.get("description") or "").strip() or "暂无描述",
"category": category,
"category_label": category_label,
"commands": commands,
"primary_command": primary_command,
"alias_commands": alias_commands,
"supports_group_switch": bool(normalized_snapshot.get("supports_group_switch")),
"feature_key": str(normalized_snapshot.get("feature_key") or "").strip(),
"available": bool(availability.get("available")),
"availability_reason": str(availability.get("reason") or "").strip(),
"availability_code": str(availability.get("reason_code") or "").strip(),
"status_label": str(normalized_snapshot.get("status_label") or "").strip(),
}
def _collect_command_catalog(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
"""采集当前群和当前身份视角下的命令清单。
输出结构分三层:
1. 普通用户可直接用的命令;
2. 自动/定时能力;
3. 管理员附加能力与未启用项。
"""
plugin_manager = self._get_plugin_manager()
snapshots = plugin_manager.get_plugin_snapshots()
if force_admin is None:
is_admin = bool(GroupBotManager.is_admin_for_group(requester_id, group_id)) if group_id else bool(GroupBotManager.is_admin(requester_id))
else:
is_admin = bool(force_admin)
available_manual = []
available_auto = []
unavailable_manual = []
for snapshot in snapshots:
entry = self._build_plugin_command_entry(snapshot, group_id)
if not entry:
continue
if entry["category"] == "scheduled":
if entry["available"]:
available_auto.append(entry)
continue
if entry["available"]:
available_manual.append(entry)
else:
unavailable_manual.append(entry)
available_manual.sort(key=lambda item: (item["category"], item["name"], item["primary_command"]))
available_auto.sort(key=lambda item: (item["name"], item["primary_command"]))
unavailable_manual.sort(key=lambda item: (item["availability_code"], item["name"]))
admin_commands = []
if is_admin:
admin_commands = [
{"title": "查看功能状态", "example": "菜单 状态", "description": "查看当前群所有功能开关状态"},
{"title": "启用某个功能", "example": "菜单 启用 功能序号", "description": "按菜单序号启用某项功能"},
{"title": "关闭某个功能", "example": "菜单 关闭 功能序号", "description": "按菜单序号关闭某项功能"},
{"title": "查看群管理员", "example": "菜单 管理员 列表", "description": "查看当前群管理员清单"},
{"title": "添加群管理员", "example": "菜单 管理员 添加 @某人", "description": "把某个群成员加入本群管理员"},
{"title": "删除群管理员", "example": "菜单 管理员 删除 @某人", "description": "移除某个群管理员"},
]
if GroupBotManager.is_admin(requester_id):
admin_commands.append(
{"title": "查看托管群列表", "example": "菜单 群列表", "description": "查看所有已启用机器人的群"}
)
return {
"group_id": str(group_id or "").strip(),
"requester_id": str(requester_id or "").strip(),
"is_admin": is_admin,
"available_manual": available_manual,
"available_auto": available_auto,
"unavailable_manual": unavailable_manual,
"admin_commands": admin_commands,
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
def build_command_catalog_data(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
"""对外暴露统一的命令目录结构,供机器人菜单和后台页面共同复用。"""
return self._collect_command_catalog(group_id, requester_id, force_admin=force_admin)
def build_command_catalog_text(self, group_id: str, requester_id: str) -> str:
"""构建适合直接发送给用户的文本版命令清单。"""
catalog = self.build_command_catalog_data(group_id, requester_id)
lines = [
"📚 当前群指令清单",
f"群ID{catalog['group_id'] or '私聊'}",
f"生成时间:{catalog['generated_at']}",
"",
"一、当前可直接使用的命令",
]
if catalog["available_manual"]:
for item in catalog["available_manual"]:
lines.append(f"{item['name']}{item['description']}")
if item["primary_command"]:
lines.append(f"主指令:{item['primary_command']}")
if item["alias_commands"]:
lines.append(f"别名:{' / '.join(item['alias_commands'])}")
lines.append("")
else:
lines.append("当前没有可直接使用的手动命令")
lines.append("")
lines.append("二、自动/定时能力")
if catalog["available_auto"]:
for item in catalog["available_auto"]:
lines.append(f"{item['name']}{item['description']}")
lines.append("触发方式:自动或定时运行")
lines.append("")
else:
lines.append("当前没有已启用的自动能力")
lines.append("")
if catalog["is_admin"]:
lines.append("三、管理员额外可见")
if catalog["unavailable_manual"]:
lines.append("未启用或暂不可用命令:")
for item in catalog["unavailable_manual"]:
primary = item["primary_command"] or "无手动指令"
lines.append(f"- {item['name']}{primary}{item['availability_reason']}")
lines.append("")
else:
lines.append("当前没有未启用的命令项")
lines.append("")
lines.append("管理命令:")
for item in catalog["admin_commands"]:
lines.append(f"- {item['example']}{item['description']}")
lines.append("")
lines.append("提示:发送“菜单”查看功能开关;发送“菜单 状态”查看本群功能状态。")
return "\n".join(lines).strip()
def build_command_catalog_markdown(self, group_id: str, requester_id: str) -> str:
"""构建适合图片渲染的 Markdown 版指令清单。"""
catalog = self.build_command_catalog_data(group_id, requester_id)
lines = [
"# 机器人指令清单",
"",
f"- 目标:`{catalog['group_id'] or '私聊'}`",
f"- 生成时间:`{catalog['generated_at']}`",
"",
"## 当前可直接使用的命令",
]
if catalog["available_manual"]:
for item in catalog["available_manual"]:
lines.append(f"### {item['name']}")
lines.append(f"- 说明:{item['description']}")
if item["primary_command"]:
lines.append(f"- 主指令:`{item['primary_command']}`")
if item["alias_commands"]:
alias_text = " / ".join(f"`{alias}`" for alias in item["alias_commands"])
lines.append(f"- 别名:{alias_text}")
lines.append("")
else:
lines.append("- 当前没有可直接使用的手动命令")
lines.append("")
lines.append("## 自动/定时能力")
if catalog["available_auto"]:
for item in catalog["available_auto"]:
lines.append(f"- **{item['name']}**{item['description']}")
else:
lines.append("- 当前没有已启用的自动能力")
lines.append("")
if catalog["is_admin"]:
lines.append("## 管理员额外可见")
if catalog["unavailable_manual"]:
lines.append("### 未启用或暂不可用命令")
for item in catalog["unavailable_manual"]:
primary = item["primary_command"] or "无手动指令"
lines.append(f"- **{item['name']}**`{primary}`{item['availability_reason']}")
lines.append("")
lines.append("### 管理命令")
for item in catalog["admin_commands"]:
lines.append(f"- `{item['example']}`{item['description']}")
lines.append("")
lines.append("> 提示:发送 `菜单` 查看功能开关;发送 `菜单 状态` 查看本群功能状态。")
return "\n".join(lines)
async def send_menu_content(
self,
bot: WechatAPIClient,
target: str,
sender: str,
revoke: MessageAutoRevoke,
text_content: str,
markdown_content: Optional[str] = None,
html_content: Optional[str] = None,
revoke_seconds: int = 90,
) -> None:
"""按配置发送菜单内容(文本或图片)。"""
if self.output_mode != "image":
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
return
md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```"
output_image = f"robot_menu_{int(time.time() * 1000)}.png"
try:
# 这里故意不再使用“渲染超时 * 重试次数 + 缓冲”的长预算:
# 1. 菜单是即时命令,用户更在意“尽快拿到结果或降级结果”,而不是死等图片;
# 2. 若沿用 45~55 秒预算,多个菜单命令会持续占住机器人并发槽位,放大成“整条插件链路卡住”;
# 3. 因此统一按 sync_send_timeout_seconds 控制主链路等待时间,超时后快速回退。
total_timeout = max(8, int(self.sync_send_timeout_seconds or 18))
# Markdown 转图内部也要用更短预算,避免内外层超时完全重合,导致降级逻辑来不及执行。
html_budget_seconds = max(5, min(10, total_timeout - 4))
render_budget_seconds = max(6, total_timeout - 2)
output_dir = Path(os.getcwd()) / "temp" / "md2image"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / output_image
if html_content and html_content.strip():
await asyncio.wait_for(
html_to_image(html_content, str(output_path)),
timeout=total_timeout,
)
image_path = str(output_path.resolve())
else:
image_path = await asyncio.wait_for(
convert_md_str_to_image(
md_content,
output_image,
max_retries=max(1, self.image_render_retries),
render_timeout_seconds=render_budget_seconds,
html_timeout_seconds=html_budget_seconds,
),
timeout=total_timeout,
)
await bot.send_image_message(target, Path(image_path))
except Exception as e:
self.log.error(f"[机器人菜单渲染工具] 菜单图片发送失败: {e}")
if self.image_fallback_to_text:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
else:
client_msg_id, create_time, new_msg_id = await bot.send_text_message(
target,
"❌ 菜单图片生成失败,请稍后重试",
sender
)
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30)