524 lines
23 KiB
Python
524 lines
23 KiB
Python
import asyncio
|
||
import os
|
||
import re
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any, Optional, Tuple
|
||
|
||
from loguru import logger as default_logger
|
||
|
||
from base.plugin_common.plugin_manager import PluginManager
|
||
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
|
||
from utils.html_template_renderer import HtmlTemplateRenderer
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class RobotMenuRenderTool:
|
||
"""机器人菜单渲染工具。
|
||
|
||
设计目标:
|
||
1. 将“菜单排版 + 图片渲染 + 发送策略”从主插件拆出,降低 main.py 维护成本;
|
||
2. 统一菜单文本/图片输出行为,保证菜单与状态页展示一致;
|
||
3. 对外只暴露清晰方法,主插件只负责“取业务数据 + 调用工具”。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
output_mode: Any,
|
||
image_fallback_to_text: bool,
|
||
image_render_timeout_seconds: int,
|
||
image_render_retries: int,
|
||
image_template_path: str,
|
||
log=default_logger,
|
||
):
|
||
# 输出模式:支持 text / image,非法值自动归一化为 text。
|
||
self.output_mode = self.normalize_output_mode(output_mode)
|
||
# 图片失败时是否回退文本。
|
||
self.image_fallback_to_text = bool(image_fallback_to_text)
|
||
# 渲染超时与重试参数,统一集中在工具层处理。
|
||
self.image_render_timeout_seconds = int(image_render_timeout_seconds)
|
||
self.image_render_retries = int(image_render_retries)
|
||
# 注入日志对象,便于主插件统一控制日志风格与输出目标。
|
||
self.log = log or default_logger
|
||
# 菜单图片模板路径(相对仓库根目录),支持仅改模板文件完成 UI 更新。
|
||
self.image_template_path = str(image_template_path or "").strip() or "plugins/robot_menu/templates/menu_cards.html"
|
||
self.template_renderer = HtmlTemplateRenderer()
|
||
|
||
@staticmethod
|
||
def normalize_output_mode(raw_mode: Any) -> str:
|
||
"""将配置中的输出模式标准化为 `text` 或 `image`。"""
|
||
mode = str(raw_mode or "").strip().lower()
|
||
if mode in {"image", "img", "picture", "pic"}:
|
||
return "image"
|
||
return "text"
|
||
|
||
@staticmethod
|
||
def _split_feature_description(feature_desc: str) -> Tuple[str, str]:
|
||
"""拆分功能描述为“功能说明”和“触发指令提示”。
|
||
|
||
约定:
|
||
1. 描述中 `[]` 内的文本作为“触发说明”;
|
||
2. 无 `[]` 时返回“无”,保证渲染模板字段完整。
|
||
"""
|
||
desc = str(feature_desc or "").strip()
|
||
match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc)
|
||
if not match:
|
||
return desc or "未命名功能", "无"
|
||
title = (match.group(1) or "").strip() or "未命名功能"
|
||
cmd_hint = (match.group(2) or "").strip() or "无"
|
||
return title, cmd_hint
|
||
|
||
@staticmethod
|
||
def _extract_command_tokens(cmd_hint: str) -> list[str]:
|
||
"""从描述里的指令片段提取命令 token 列表。
|
||
|
||
规则:
|
||
1. 按 `|`、`,`、`,`、`/`、`、` 切分;
|
||
2. 去重并保序;
|
||
3. 清理多余空白,便于后续生成“如何使用”示例。
|
||
"""
|
||
raw = str(cmd_hint or "").strip()
|
||
if not raw or raw == "无":
|
||
return []
|
||
parts = re.split(r"[|,,/、]+", raw)
|
||
tokens: list[str] = []
|
||
for part in parts:
|
||
token = re.sub(r"\s+", " ", str(part or "").strip())
|
||
if token and token not in tokens:
|
||
tokens.append(token)
|
||
return tokens
|
||
|
||
def _build_feature_usage_lines(self, feature: Feature) -> list[str]:
|
||
"""构建单个功能的“怎么用”文案(纯用户视角,不含管理员控制语句)。"""
|
||
title, cmd_hint = self._split_feature_description(feature.description)
|
||
tokens = self._extract_command_tokens(cmd_hint)
|
||
lines = [f"{feature.value}. {title}"]
|
||
if not tokens:
|
||
lines.append(" 用法:自动/定时触发,无需手动指令")
|
||
return lines
|
||
|
||
# 第一行给出最短触发命令,后续补充可选别名,信息更丰富但仍保持紧凑。
|
||
lines.append(f" 指令:{tokens[0]}")
|
||
if len(tokens) > 1:
|
||
lines.append(f" 别名:{' / '.join(tokens[1:4])}")
|
||
# 给一个通用可替换参数提示,帮助用户快速落地命令输入。
|
||
if any(k in tokens[0] for k in ["@", "积分", "关键词", "日期", "功能名"]):
|
||
lines.append(" 提示:按指令中的占位词替换为实际内容后发送")
|
||
return lines
|
||
|
||
def _iter_user_command_features(self) -> list[Feature]:
|
||
"""返回菜单要展示的功能集合。
|
||
|
||
说明:
|
||
1. 按用户最新要求“不要管之前限制”,这里不再做仅指令功能过滤;
|
||
2. 全量展示 Feature,保证用户在一张图里看到完整功能地图;
|
||
3. 没有显式指令的功能,在卡片中标注为“自动/定时触发”。
|
||
"""
|
||
return list(Feature)
|
||
|
||
def build_feature_status_markdown(self, group_id: str) -> str:
|
||
"""构建菜单 Markdown(全量功能 + 状态 + 指令/触发方式)。"""
|
||
user_features = self._iter_user_command_features()
|
||
lines = [
|
||
"# 机器人功能菜单",
|
||
"",
|
||
f"- 目标:`{group_id}`",
|
||
f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`",
|
||
"",
|
||
"## 功能与指令",
|
||
]
|
||
for feature in user_features:
|
||
status = GroupBotManager.get_group_permission(group_id, feature)
|
||
status_text = "开启" if status == PermissionStatus.ENABLED else "关闭"
|
||
lines.extend(self._build_feature_usage_lines(feature))
|
||
lines.append(f" 状态:{status_text}")
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
def _build_feature_command_payload(self, feature: Feature) -> dict:
|
||
"""构建功能卡片中的命令展示数据。"""
|
||
_, cmd_hint = self._split_feature_description(feature.description)
|
||
tokens = self._extract_command_tokens(cmd_hint)
|
||
if not tokens:
|
||
return {
|
||
"is_auto": True,
|
||
"primary": "",
|
||
"aliases": [],
|
||
"tip": "自动/定时触发,无需发送命令",
|
||
}
|
||
return {
|
||
"is_auto": False,
|
||
"primary": tokens[0],
|
||
"aliases": tokens[1:4],
|
||
"tip": "",
|
||
}
|
||
|
||
def build_feature_status_html(self, group_id: str) -> str:
|
||
"""基于外部 HTML 模板构建菜单页面。"""
|
||
user_features = self._iter_user_command_features()
|
||
feature_cards = []
|
||
enabled_count = 0
|
||
for feature in user_features:
|
||
title, _ = self._split_feature_description(feature.description)
|
||
status = GroupBotManager.get_group_permission(group_id, feature)
|
||
is_enabled = status == PermissionStatus.ENABLED
|
||
if is_enabled:
|
||
enabled_count += 1
|
||
command_payload = self._build_feature_command_payload(feature)
|
||
feature_cards.append(
|
||
{
|
||
"index": int(feature.value),
|
||
"title": title,
|
||
"feature_key": str(feature.name),
|
||
"status_text": "开启" if is_enabled else "关闭",
|
||
"status_class": "status-on" if is_enabled else "status-off",
|
||
"command": command_payload,
|
||
}
|
||
)
|
||
|
||
now_text = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
return self.template_renderer.render(
|
||
self.image_template_path,
|
||
{
|
||
"group_id": str(group_id or ""),
|
||
"now_text": now_text,
|
||
"feature_total": len(user_features),
|
||
"enabled_count": enabled_count,
|
||
"feature_cards": feature_cards,
|
||
},
|
||
)
|
||
|
||
@staticmethod
|
||
def _get_plugin_manager() -> PluginManager:
|
||
"""获取当前运行中的插件管理器单例。"""
|
||
return PluginManager.get_instance()
|
||
|
||
@staticmethod
|
||
def _resolve_snapshot_group_status(snapshot: dict, group_id: str) -> dict:
|
||
"""解析插件在当前群里的可用状态。
|
||
|
||
规则说明:
|
||
1. 插件必须先处于 RUNNING,才可能被认为“可用”;
|
||
2. 若插件支持群级开关,则继续读取该群的 feature 权限;
|
||
3. 若插件没有群级开关,则视为“运行即全局可用”。
|
||
"""
|
||
normalized_snapshot = dict(snapshot or {})
|
||
status = str(normalized_snapshot.get("status") or "").strip().upper()
|
||
supports_group_switch = bool(normalized_snapshot.get("supports_group_switch"))
|
||
feature_key = str(normalized_snapshot.get("feature_key") or "").strip()
|
||
|
||
if status != "RUNNING":
|
||
return {
|
||
"available": False,
|
||
"reason": "插件未运行",
|
||
"reason_code": "plugin_not_running",
|
||
}
|
||
|
||
if not group_id or not supports_group_switch or not feature_key:
|
||
return {
|
||
"available": True,
|
||
"reason": "全局可用",
|
||
"reason_code": "global_available",
|
||
}
|
||
|
||
feature = Feature.get_feature(feature_key)
|
||
if feature is None:
|
||
return {
|
||
"available": True,
|
||
"reason": "未绑定群级开关,按运行中处理",
|
||
"reason_code": "feature_not_registered",
|
||
}
|
||
|
||
permission = GroupBotManager.get_group_permission(group_id, feature)
|
||
if permission == PermissionStatus.ENABLED:
|
||
return {
|
||
"available": True,
|
||
"reason": "本群已启用",
|
||
"reason_code": "group_enabled",
|
||
}
|
||
return {
|
||
"available": False,
|
||
"reason": "本群未启用",
|
||
"reason_code": "group_disabled",
|
||
}
|
||
|
||
@staticmethod
|
||
def _format_plugin_command(example_command: str, command_prefix: str) -> str:
|
||
"""把插件命令和前缀拼成最终展示文本。"""
|
||
prefix = str(command_prefix or "").strip()
|
||
command = str(example_command or "").strip()
|
||
if not prefix:
|
||
return command
|
||
return f"{prefix}{command}"
|
||
|
||
def _build_plugin_command_entry(self, snapshot: dict, group_id: str) -> Optional[dict]:
|
||
"""把插件快照转换为菜单可展示的命令项。"""
|
||
normalized_snapshot = dict(snapshot or {})
|
||
commands = list(normalized_snapshot.get("commands", []) or [])
|
||
plugin_types = list(normalized_snapshot.get("plugin_types", []) or [])
|
||
if not commands and "scheduled" not in plugin_types:
|
||
return None
|
||
|
||
availability = self._resolve_snapshot_group_status(normalized_snapshot, group_id)
|
||
command_prefix = str(normalized_snapshot.get("command_prefix") or "").strip()
|
||
primary_command = self._format_plugin_command(commands[0], command_prefix) if commands else ""
|
||
alias_commands = [
|
||
self._format_plugin_command(command_text, command_prefix)
|
||
for command_text in commands[1:4]
|
||
if str(command_text or "").strip()
|
||
]
|
||
|
||
if "message" in plugin_types:
|
||
category = "message"
|
||
category_label = "消息指令"
|
||
elif "scheduled" in plugin_types:
|
||
category = "scheduled"
|
||
category_label = "自动任务"
|
||
else:
|
||
category = "generic"
|
||
category_label = "通用能力"
|
||
|
||
return {
|
||
"name": str(normalized_snapshot.get("name") or "").strip(),
|
||
"module_name": str(normalized_snapshot.get("module_name") or "").strip(),
|
||
"description": str(normalized_snapshot.get("description") or "").strip() or "暂无描述",
|
||
"category": category,
|
||
"category_label": category_label,
|
||
"commands": commands,
|
||
"primary_command": primary_command,
|
||
"alias_commands": alias_commands,
|
||
"supports_group_switch": bool(normalized_snapshot.get("supports_group_switch")),
|
||
"feature_key": str(normalized_snapshot.get("feature_key") or "").strip(),
|
||
"available": bool(availability.get("available")),
|
||
"availability_reason": str(availability.get("reason") or "").strip(),
|
||
"availability_code": str(availability.get("reason_code") or "").strip(),
|
||
"status_label": str(normalized_snapshot.get("status_label") or "").strip(),
|
||
}
|
||
|
||
def _collect_command_catalog(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
|
||
"""采集当前群和当前身份视角下的命令清单。
|
||
|
||
输出结构分三层:
|
||
1. 普通用户可直接用的命令;
|
||
2. 自动/定时能力;
|
||
3. 管理员附加能力与未启用项。
|
||
"""
|
||
plugin_manager = self._get_plugin_manager()
|
||
snapshots = plugin_manager.get_plugin_snapshots()
|
||
if force_admin is None:
|
||
is_admin = bool(GroupBotManager.is_admin_for_group(requester_id, group_id)) if group_id else bool(GroupBotManager.is_admin(requester_id))
|
||
else:
|
||
is_admin = bool(force_admin)
|
||
|
||
available_manual = []
|
||
available_auto = []
|
||
unavailable_manual = []
|
||
|
||
for snapshot in snapshots:
|
||
entry = self._build_plugin_command_entry(snapshot, group_id)
|
||
if not entry:
|
||
continue
|
||
if entry["category"] == "scheduled":
|
||
if entry["available"]:
|
||
available_auto.append(entry)
|
||
continue
|
||
if entry["available"]:
|
||
available_manual.append(entry)
|
||
else:
|
||
unavailable_manual.append(entry)
|
||
|
||
available_manual.sort(key=lambda item: (item["category"], item["name"], item["primary_command"]))
|
||
available_auto.sort(key=lambda item: (item["name"], item["primary_command"]))
|
||
unavailable_manual.sort(key=lambda item: (item["availability_code"], item["name"]))
|
||
|
||
admin_commands = []
|
||
if is_admin:
|
||
admin_commands = [
|
||
{"title": "查看功能状态", "example": "菜单 状态", "description": "查看当前群所有功能开关状态"},
|
||
{"title": "启用某个功能", "example": "菜单 启用 功能序号", "description": "按菜单序号启用某项功能"},
|
||
{"title": "关闭某个功能", "example": "菜单 关闭 功能序号", "description": "按菜单序号关闭某项功能"},
|
||
{"title": "查看群管理员", "example": "菜单 管理员 列表", "description": "查看当前群管理员清单"},
|
||
{"title": "添加群管理员", "example": "菜单 管理员 添加 @某人", "description": "把某个群成员加入本群管理员"},
|
||
{"title": "删除群管理员", "example": "菜单 管理员 删除 @某人", "description": "移除某个群管理员"},
|
||
]
|
||
if GroupBotManager.is_admin(requester_id):
|
||
admin_commands.append(
|
||
{"title": "查看托管群列表", "example": "菜单 群列表", "description": "查看所有已启用机器人的群"}
|
||
)
|
||
|
||
return {
|
||
"group_id": str(group_id or "").strip(),
|
||
"requester_id": str(requester_id or "").strip(),
|
||
"is_admin": is_admin,
|
||
"available_manual": available_manual,
|
||
"available_auto": available_auto,
|
||
"unavailable_manual": unavailable_manual,
|
||
"admin_commands": admin_commands,
|
||
"generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||
}
|
||
|
||
def build_command_catalog_data(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
|
||
"""对外暴露统一的命令目录结构,供机器人菜单和后台页面共同复用。"""
|
||
return self._collect_command_catalog(group_id, requester_id, force_admin=force_admin)
|
||
|
||
def build_command_catalog_text(self, group_id: str, requester_id: str) -> str:
|
||
"""构建适合直接发送给用户的文本版命令清单。"""
|
||
catalog = self.build_command_catalog_data(group_id, requester_id)
|
||
lines = [
|
||
"📚 当前群指令清单",
|
||
f"群ID:{catalog['group_id'] or '私聊'}",
|
||
f"生成时间:{catalog['generated_at']}",
|
||
"",
|
||
"一、当前可直接使用的命令",
|
||
]
|
||
|
||
if catalog["available_manual"]:
|
||
for item in catalog["available_manual"]:
|
||
lines.append(f"【{item['name']}】{item['description']}")
|
||
if item["primary_command"]:
|
||
lines.append(f"主指令:{item['primary_command']}")
|
||
if item["alias_commands"]:
|
||
lines.append(f"别名:{' / '.join(item['alias_commands'])}")
|
||
lines.append("")
|
||
else:
|
||
lines.append("当前没有可直接使用的手动命令")
|
||
lines.append("")
|
||
|
||
lines.append("二、自动/定时能力")
|
||
if catalog["available_auto"]:
|
||
for item in catalog["available_auto"]:
|
||
lines.append(f"【{item['name']}】{item['description']}")
|
||
lines.append("触发方式:自动或定时运行")
|
||
lines.append("")
|
||
else:
|
||
lines.append("当前没有已启用的自动能力")
|
||
lines.append("")
|
||
|
||
if catalog["is_admin"]:
|
||
lines.append("三、管理员额外可见")
|
||
if catalog["unavailable_manual"]:
|
||
lines.append("未启用或暂不可用命令:")
|
||
for item in catalog["unavailable_manual"]:
|
||
primary = item["primary_command"] or "无手动指令"
|
||
lines.append(f"- {item['name']}:{primary}({item['availability_reason']})")
|
||
lines.append("")
|
||
else:
|
||
lines.append("当前没有未启用的命令项")
|
||
lines.append("")
|
||
|
||
lines.append("管理命令:")
|
||
for item in catalog["admin_commands"]:
|
||
lines.append(f"- {item['example']}:{item['description']}")
|
||
lines.append("")
|
||
|
||
lines.append("提示:发送“菜单”查看功能开关;发送“菜单 状态”查看本群功能状态。")
|
||
return "\n".join(lines).strip()
|
||
|
||
def build_command_catalog_markdown(self, group_id: str, requester_id: str) -> str:
|
||
"""构建适合图片渲染的 Markdown 版指令清单。"""
|
||
catalog = self.build_command_catalog_data(group_id, requester_id)
|
||
lines = [
|
||
"# 机器人指令清单",
|
||
"",
|
||
f"- 目标:`{catalog['group_id'] or '私聊'}`",
|
||
f"- 生成时间:`{catalog['generated_at']}`",
|
||
"",
|
||
"## 当前可直接使用的命令",
|
||
]
|
||
|
||
if catalog["available_manual"]:
|
||
for item in catalog["available_manual"]:
|
||
lines.append(f"### {item['name']}")
|
||
lines.append(f"- 说明:{item['description']}")
|
||
if item["primary_command"]:
|
||
lines.append(f"- 主指令:`{item['primary_command']}`")
|
||
if item["alias_commands"]:
|
||
alias_text = " / ".join(f"`{alias}`" for alias in item["alias_commands"])
|
||
lines.append(f"- 别名:{alias_text}")
|
||
lines.append("")
|
||
else:
|
||
lines.append("- 当前没有可直接使用的手动命令")
|
||
lines.append("")
|
||
|
||
lines.append("## 自动/定时能力")
|
||
if catalog["available_auto"]:
|
||
for item in catalog["available_auto"]:
|
||
lines.append(f"- **{item['name']}**:{item['description']}")
|
||
else:
|
||
lines.append("- 当前没有已启用的自动能力")
|
||
lines.append("")
|
||
|
||
if catalog["is_admin"]:
|
||
lines.append("## 管理员额外可见")
|
||
if catalog["unavailable_manual"]:
|
||
lines.append("### 未启用或暂不可用命令")
|
||
for item in catalog["unavailable_manual"]:
|
||
primary = item["primary_command"] or "无手动指令"
|
||
lines.append(f"- **{item['name']}**:`{primary}`({item['availability_reason']})")
|
||
lines.append("")
|
||
|
||
lines.append("### 管理命令")
|
||
for item in catalog["admin_commands"]:
|
||
lines.append(f"- `{item['example']}`:{item['description']}")
|
||
lines.append("")
|
||
|
||
lines.append("> 提示:发送 `菜单` 查看功能开关;发送 `菜单 状态` 查看本群功能状态。")
|
||
return "\n".join(lines)
|
||
|
||
async def send_menu_content(
|
||
self,
|
||
bot: WechatAPIClient,
|
||
target: str,
|
||
sender: str,
|
||
revoke: MessageAutoRevoke,
|
||
text_content: str,
|
||
markdown_content: Optional[str] = None,
|
||
html_content: Optional[str] = None,
|
||
revoke_seconds: int = 90,
|
||
) -> None:
|
||
"""按配置发送菜单内容(文本或图片)。"""
|
||
if self.output_mode != "image":
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
|
||
return
|
||
|
||
md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```"
|
||
output_image = f"robot_menu_{int(time.time() * 1000)}.png"
|
||
try:
|
||
total_timeout = max(30, self.image_render_timeout_seconds * max(1, self.image_render_retries) + 10)
|
||
output_dir = Path(os.getcwd()) / "temp" / "md2image"
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_path = output_dir / output_image
|
||
if html_content and html_content.strip():
|
||
await asyncio.wait_for(
|
||
html_to_image(html_content, str(output_path)),
|
||
timeout=total_timeout,
|
||
)
|
||
image_path = str(output_path.resolve())
|
||
else:
|
||
image_path = await asyncio.wait_for(
|
||
convert_md_str_to_image(
|
||
md_content,
|
||
output_image,
|
||
max_retries=max(1, self.image_render_retries),
|
||
render_timeout_seconds=max(10, self.image_render_timeout_seconds),
|
||
html_timeout_seconds=min(30, max(10, self.image_render_timeout_seconds)),
|
||
),
|
||
timeout=total_timeout,
|
||
)
|
||
await bot.send_image_message(target, Path(image_path))
|
||
except Exception as e:
|
||
self.log.error(f"[机器人菜单渲染工具] 菜单图片发送失败: {e}")
|
||
if self.image_fallback_to_text:
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
|
||
else:
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(
|
||
target,
|
||
"❌ 菜单图片生成失败,请稍后重试",
|
||
sender
|
||
)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30)
|