356 lines
15 KiB
Python
356 lines
15 KiB
Python
import asyncio
|
||
import html
|
||
import os
|
||
import re
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any, Optional, Tuple
|
||
|
||
from loguru import logger as default_logger
|
||
|
||
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import Feature
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class RobotMenuRenderTool:
|
||
"""机器人菜单渲染工具。
|
||
|
||
设计目标:
|
||
1. 将“菜单排版 + 图片渲染 + 发送策略”从主插件拆出,降低 main.py 维护成本;
|
||
2. 统一菜单文本/图片输出行为,保证菜单与状态页展示一致;
|
||
3. 对外只暴露清晰方法,主插件只负责“取业务数据 + 调用工具”。
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
output_mode: Any,
|
||
image_fallback_to_text: bool,
|
||
image_render_timeout_seconds: int,
|
||
image_render_retries: int,
|
||
log=default_logger,
|
||
):
|
||
# 输出模式:支持 text / image,非法值自动归一化为 text。
|
||
self.output_mode = self.normalize_output_mode(output_mode)
|
||
# 图片失败时是否回退文本。
|
||
self.image_fallback_to_text = bool(image_fallback_to_text)
|
||
# 渲染超时与重试参数,统一集中在工具层处理。
|
||
self.image_render_timeout_seconds = int(image_render_timeout_seconds)
|
||
self.image_render_retries = int(image_render_retries)
|
||
# 注入日志对象,便于主插件统一控制日志风格与输出目标。
|
||
self.log = log or default_logger
|
||
|
||
@staticmethod
|
||
def normalize_output_mode(raw_mode: Any) -> str:
|
||
"""将配置中的输出模式标准化为 `text` 或 `image`。"""
|
||
mode = str(raw_mode or "").strip().lower()
|
||
if mode in {"image", "img", "picture", "pic"}:
|
||
return "image"
|
||
return "text"
|
||
|
||
@staticmethod
|
||
def _split_feature_description(feature_desc: str) -> Tuple[str, str]:
|
||
"""拆分功能描述为“功能说明”和“触发指令提示”。
|
||
|
||
约定:
|
||
1. 描述中 `[]` 内的文本作为“触发说明”;
|
||
2. 无 `[]` 时返回“无”,保证渲染模板字段完整。
|
||
"""
|
||
desc = str(feature_desc or "").strip()
|
||
match = re.match(r"^(.*?)(?:\[(.*?)\])?$", desc)
|
||
if not match:
|
||
return desc or "未命名功能", "无"
|
||
title = (match.group(1) or "").strip() or "未命名功能"
|
||
cmd_hint = (match.group(2) or "").strip() or "无"
|
||
return title, cmd_hint
|
||
|
||
@staticmethod
|
||
def _extract_command_tokens(cmd_hint: str) -> list[str]:
|
||
"""从描述里的指令片段提取命令 token 列表。
|
||
|
||
规则:
|
||
1. 按 `|`、`,`、`,`、`/`、`、` 切分;
|
||
2. 去重并保序;
|
||
3. 清理多余空白,便于后续生成“如何使用”示例。
|
||
"""
|
||
raw = str(cmd_hint or "").strip()
|
||
if not raw or raw == "无":
|
||
return []
|
||
parts = re.split(r"[|,,/、]+", raw)
|
||
tokens: list[str] = []
|
||
for part in parts:
|
||
token = re.sub(r"\s+", " ", str(part or "").strip())
|
||
if token and token not in tokens:
|
||
tokens.append(token)
|
||
return tokens
|
||
|
||
def _build_feature_usage_lines(self, feature: Feature) -> list[str]:
|
||
"""构建单个功能的“怎么用”文案(纯用户视角,不含管理员控制语句)。"""
|
||
title, cmd_hint = self._split_feature_description(feature.description)
|
||
tokens = self._extract_command_tokens(cmd_hint)
|
||
lines = [f"{feature.value}. {title}"]
|
||
if not tokens:
|
||
lines.append(" 用法:自动/定时触发,无需手动指令")
|
||
return lines
|
||
|
||
# 第一行给出最短触发命令,后续补充可选别名,信息更丰富但仍保持紧凑。
|
||
lines.append(f" 指令:{tokens[0]}")
|
||
if len(tokens) > 1:
|
||
lines.append(f" 别名:{' / '.join(tokens[1:4])}")
|
||
# 给一个通用可替换参数提示,帮助用户快速落地命令输入。
|
||
if any(k in tokens[0] for k in ["@", "积分", "关键词", "日期", "功能名"]):
|
||
lines.append(" 提示:按指令中的占位词替换为实际内容后发送")
|
||
return lines
|
||
|
||
def build_feature_status_markdown(self, group_id: str) -> str:
|
||
"""构建紧凑菜单 Markdown(强调“怎么用”,不展示启停状态/管理命令)。"""
|
||
lines = [
|
||
"# 机器人功能菜单",
|
||
"",
|
||
f"- 目标:`{group_id}`",
|
||
f"- 生成时间:`{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}`",
|
||
"",
|
||
"## 快速使用",
|
||
"- 发送 `菜单` 可再次查看本图",
|
||
"- 以下仅保留用户可直接使用的触发方式",
|
||
"",
|
||
"## 功能与指令",
|
||
]
|
||
for feature in Feature:
|
||
lines.extend(self._build_feature_usage_lines(feature))
|
||
lines.append("")
|
||
return "\n".join(lines)
|
||
|
||
def _get_feature_command_examples(self, feature: Feature) -> str:
|
||
"""返回单个功能在卡片中的紧凑使用说明(不包含管理员内容)。"""
|
||
_, cmd_hint = self._split_feature_description(feature.description)
|
||
tokens = self._extract_command_tokens(cmd_hint)
|
||
if not tokens:
|
||
return "自动/定时触发,无需发送命令"
|
||
|
||
rows = [f"<span class='line-main'>指令:<code>{html.escape(tokens[0])}</code></span>"]
|
||
if len(tokens) > 1:
|
||
alias_text = " / ".join([f"<code>{html.escape(t)}</code>" for t in tokens[1:4]])
|
||
rows.append(f"<span class='line-sub'>别名:{alias_text}</span>")
|
||
return "<br>".join(rows)
|
||
|
||
def build_feature_status_html(self, group_id: str) -> str:
|
||
"""构建菜单 HTML(自定义样式,不复用 md2image 默认样式)。"""
|
||
feature_cards = []
|
||
for feature in Feature:
|
||
title, _ = self._split_feature_description(feature.description)
|
||
command_examples = self._get_feature_command_examples(feature)
|
||
# 为纵向卡片增加轻量色彩分组,提升视觉节奏感,避免纯白卡片过于单调。
|
||
try:
|
||
tone_idx = int(feature.value) % 4
|
||
except Exception:
|
||
tone_idx = 0
|
||
feature_cards.append(
|
||
f"""
|
||
<section class="feature-card tone-{tone_idx}">
|
||
<div class="feature-top">
|
||
<div class="feature-index">{feature.value}</div>
|
||
<div class="feature-meta">
|
||
<h3>{html.escape(title)}</h3>
|
||
<p class="feature-key">功能键:<code>{html.escape(feature.name)}</code></p>
|
||
</div>
|
||
</div>
|
||
<div class="feature-body">
|
||
<p>{command_examples}</p>
|
||
</div>
|
||
</section>
|
||
"""
|
||
)
|
||
|
||
now_text = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
return f"""
|
||
<html>
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<style>
|
||
:root {{
|
||
--card: #ffffff;
|
||
--line: #d8e4ef;
|
||
--text: #1f2d3d;
|
||
--muted: #5e748a;
|
||
--brand: #0f5fb7;
|
||
--brand-soft: #eaf3ff;
|
||
}}
|
||
* {{ box-sizing: border-box; }}
|
||
body {{
|
||
margin: 0;
|
||
padding: 14px;
|
||
font-family: "Microsoft YaHei", "PingFang SC", "Noto Sans CJK SC", sans-serif;
|
||
background:
|
||
radial-gradient(circle at 10% 10%, rgba(15,95,183,0.08), transparent 32%),
|
||
radial-gradient(circle at 90% 0%, rgba(31,126,255,0.10), transparent 30%),
|
||
linear-gradient(180deg, #eef4fa 0%, #f8fbff 100%);
|
||
color: var(--text);
|
||
}}
|
||
.page {{
|
||
width: 780px;
|
||
margin: 0 auto;
|
||
background: var(--card);
|
||
border: 1px solid var(--line);
|
||
border-radius: 14px;
|
||
overflow: hidden;
|
||
box-shadow: 0 8px 18px rgba(20, 47, 76, 0.10);
|
||
}}
|
||
.hero {{
|
||
padding: 14px 16px 12px 16px;
|
||
background:
|
||
linear-gradient(130deg, rgba(255,255,255,0.12), rgba(255,255,255,0) 45%),
|
||
linear-gradient(135deg, #0f5fb7 0%, #2f8cff 55%, #3ca1ff 100%);
|
||
color: #fff;
|
||
}}
|
||
.hero h1 {{ margin: 0 0 4px 0; font-size: 22px; }}
|
||
.hero p {{ margin: 2px 0; font-size: 12px; opacity: .96; }}
|
||
.content {{ padding: 12px 12px 14px 12px; }}
|
||
.block {{
|
||
border: 1px solid var(--line);
|
||
border-radius: 10px;
|
||
padding: 10px 12px;
|
||
margin-bottom: 10px;
|
||
background: #fff;
|
||
}}
|
||
.block h2 {{ margin: 0 0 6px 0; font-size: 15px; color: #12395f; }}
|
||
.block ul {{
|
||
margin: 0;
|
||
padding-left: 18px;
|
||
color: var(--muted);
|
||
line-height: 1.5;
|
||
font-size: 12px;
|
||
}}
|
||
/* 纵向单列流式布局:避免双列模式下因内容长度差异产生空白洞。 */
|
||
.feature-list {{ display: flex; flex-direction: column; gap: 8px; }}
|
||
.feature-card {{
|
||
border: 1px solid #d6e4f3;
|
||
border-left-width: 4px;
|
||
border-radius: 10px;
|
||
padding: 8px 10px;
|
||
background: linear-gradient(180deg, #ffffff 0%, #fbfdff 100%);
|
||
box-shadow: 0 4px 10px rgba(16, 64, 125, 0.05);
|
||
}}
|
||
.feature-card.tone-0 {{ border-left-color: #2f8cff; }}
|
||
.feature-card.tone-1 {{ border-left-color: #10b981; }}
|
||
.feature-card.tone-2 {{ border-left-color: #f59e0b; }}
|
||
.feature-card.tone-3 {{ border-left-color: #ef4444; }}
|
||
.feature-top {{ display: flex; align-items: center; gap: 8px; }}
|
||
.feature-index {{
|
||
min-width: 28px;
|
||
text-align: center;
|
||
font-weight: 700;
|
||
color: #0f5fb7;
|
||
background: var(--brand-soft);
|
||
border: 1px solid #cfe1fb;
|
||
border-radius: 6px;
|
||
padding: 2px 4px;
|
||
font-size: 11px;
|
||
}}
|
||
.feature-meta {{ flex: 1; }}
|
||
.feature-meta h3 {{ margin: 0 0 1px 0; font-size: 13px; }}
|
||
.feature-key {{ margin: 0; font-size: 11px; color: var(--muted); }}
|
||
.feature-body {{
|
||
margin-top: 6px;
|
||
padding: 6px 8px;
|
||
border-radius: 7px;
|
||
background: #f7fbff;
|
||
border: 1px dashed #d2e4f7;
|
||
}}
|
||
.feature-body p {{ margin: 0; font-size: 11px; color: #3d5870; line-height: 1.5; }}
|
||
.line-main {{ color: #1f3f62; }}
|
||
.line-sub {{ color: #5a738a; }}
|
||
code {{
|
||
background: #eef6ff;
|
||
border: 1px solid #d2e6ff;
|
||
color: #12539a;
|
||
padding: 0 5px;
|
||
border-radius: 5px;
|
||
font-size: 11px;
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="page">
|
||
<header class="hero">
|
||
<h1>机器人功能菜单</h1>
|
||
<p>目标群/会话:{html.escape(group_id)}</p>
|
||
<p>生成时间:{now_text}</p>
|
||
</header>
|
||
<main class="content">
|
||
<section class="block">
|
||
<h2>快速上手</h2>
|
||
<ul>
|
||
<li><code>菜单</code>:查看可用功能和指令</li>
|
||
<li>以下内容仅面向普通用户,聚焦“怎么用”</li>
|
||
</ul>
|
||
</section>
|
||
<section class="block">
|
||
<h2>功能与指令</h2>
|
||
<div class="feature-list">
|
||
{''.join(feature_cards)}
|
||
</div>
|
||
</section>
|
||
</main>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
async def send_menu_content(
|
||
self,
|
||
bot: WechatAPIClient,
|
||
target: str,
|
||
sender: str,
|
||
revoke: MessageAutoRevoke,
|
||
text_content: str,
|
||
markdown_content: Optional[str] = None,
|
||
html_content: Optional[str] = None,
|
||
revoke_seconds: int = 90,
|
||
) -> None:
|
||
"""按配置发送菜单内容(文本或图片)。"""
|
||
if self.output_mode != "image":
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
|
||
return
|
||
|
||
md_content = (markdown_content or "").strip() or f"```text\n{text_content}\n```"
|
||
output_image = f"robot_menu_{int(time.time() * 1000)}.png"
|
||
try:
|
||
total_timeout = max(30, self.image_render_timeout_seconds * max(1, self.image_render_retries) + 10)
|
||
output_dir = Path(os.getcwd()) / "temp" / "md2image"
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
output_path = output_dir / output_image
|
||
if html_content and html_content.strip():
|
||
await asyncio.wait_for(
|
||
html_to_image(html_content, str(output_path)),
|
||
timeout=total_timeout,
|
||
)
|
||
image_path = str(output_path.resolve())
|
||
else:
|
||
image_path = await asyncio.wait_for(
|
||
convert_md_str_to_image(
|
||
md_content,
|
||
output_image,
|
||
max_retries=max(1, self.image_render_retries),
|
||
render_timeout_seconds=max(10, self.image_render_timeout_seconds),
|
||
html_timeout_seconds=min(30, max(10, self.image_render_timeout_seconds)),
|
||
),
|
||
timeout=total_timeout,
|
||
)
|
||
await bot.send_image_message(target, Path(image_path))
|
||
except Exception as e:
|
||
self.log.error(f"[机器人菜单渲染工具] 菜单图片发送失败: {e}")
|
||
if self.image_fallback_to_text:
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(target, text_content, sender)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, revoke_seconds)
|
||
else:
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(
|
||
target,
|
||
"❌ 菜单图片生成失败,请稍后重试",
|
||
sender
|
||
)
|
||
revoke.add_message_to_revoke(target, client_msg_id, create_time, new_msg_id, 30)
|