refactor ai_auto_response plugin architecture

This commit is contained in:
liuwei
2026-04-09 17:46:30 +08:00
parent cc65378544
commit f580c69736
39 changed files with 4347 additions and 1979 deletions

View File

@@ -0,0 +1,15 @@
from __future__ import annotations
from .conversation_hints import build_conversation_hints
from .context_builder import ContextBuilder
from .image_context import build_image_safety_hints, build_recent_image_context, prepare_quote_image_inputs
from .quote_context import parse_quote_context
__all__ = [
"ContextBuilder",
"build_conversation_hints",
"build_image_safety_hints",
"build_recent_image_context",
"parse_quote_context",
"prepare_quote_image_inputs",
]

View File

@@ -0,0 +1,358 @@
from __future__ import annotations
import re
from typing import Dict, List
class ContextBuilder:
def __init__(self, recent_context_size: int = 30):
self.recent_context_size = max(int(recent_context_size or 30), 1)
def build(
self,
*,
room_id: str,
group_profile: Dict,
sender: str,
sender_name: str,
content: str,
recent_messages: List[Dict],
member_context: Dict,
member_memory_focus: List[str] | None = None,
trigger: Dict,
flow_state: str,
reply_mode: str,
vector_memories: List[Dict],
social_memory: Dict | None = None,
group_facts: Dict | None = None,
quote_context: Dict | None = None,
image_context: Dict | None = None,
) -> Dict:
selected_messages = self._select_recent_messages(recent_messages, sender, content, quote_context or {})
recent_lines = []
for item in selected_messages:
msg_sender = item.get("sender_name") or item.get("sender") or "未知成员"
msg_content = item.get("content") or item.get("message") or ""
if msg_content:
recent_lines.append(f"{msg_sender}: {msg_content}")
return {
"group_profile": group_profile or {"room_id": room_id},
"speaker_profile": {
"wxid": sender,
"display_name": sender_name,
"member_context": member_context or {},
},
"speaker_name_clean": self._clean_display_name(sender_name),
"recent_message_items": self._build_recent_message_items(selected_messages),
"recent_messages": recent_lines,
"recent_summary": "",
"trigger_type": trigger.get("trigger_type", "none"),
"reply_mode": reply_mode,
"flow_state": flow_state,
"memory_prompt": self._build_member_memory_prompt(member_context, member_memory_focus or []),
"vector_memory_prompt": self._build_vector_memory_prompt(vector_memories),
"social_memory_prompt": self._build_social_memory_prompt(social_memory or {}),
"group_facts_prompt": self._build_group_facts_prompt(group_facts or {}),
"group_profile_prompt": self._build_group_profile_prompt(group_profile or {}),
"quote_prompt": self._build_quote_prompt(quote_context or {}),
"image_prompt": self._build_image_prompt(image_context or {}),
"image_safety_prompt": self._build_image_safety_prompt(
(quote_context or {}).get("image_safety") or {}
),
"current_message": f"{sender_name}: {content}",
}
@staticmethod
def _build_recent_message_items(messages: List[Dict]) -> List[Dict]:
items: List[Dict] = []
for idx, item in enumerate(messages, start=1):
content = str(item.get("content") or item.get("message") or "").strip()
if not content:
continue
items.append({
"idx": idx,
"sender": item.get("sender_name") or item.get("sender") or "未知成员",
"content": content[:120],
"is_at": bool(item.get("is_at")),
})
return items
def _select_recent_messages(
self,
recent_messages: List[Dict],
current_sender: str,
current_content: str,
quote_context: Dict,
) -> List[Dict]:
if not recent_messages:
return []
window = recent_messages[-self.recent_context_size:]
if len(window) <= 8:
return window
current_tokens = self._extract_topic_tokens(current_content)
quote_tokens = self._extract_topic_tokens(
f"{quote_context.get('title', '')} {quote_context.get('quote_body', '')}"
)
focus_tokens = current_tokens | quote_tokens
quote_sender_name = str(quote_context.get("quote_sender_name", "") or "").strip().lower()
scored: List[tuple[int, int, Dict]] = []
for idx, item in enumerate(window):
score = self._message_relevance(
item,
current_sender=current_sender,
focus_tokens=focus_tokens,
quote_sender_name=quote_sender_name,
)
if score > 0:
scored.append((score, idx, item))
# 总是保留尾部几条,维持现场感;再拼上与当前话题最相关的消息。
tail_indexes = set(range(max(len(window) - 4, 0), len(window)))
keep_indexes = set(tail_indexes)
for _, idx, _ in sorted(scored, key=lambda x: (-x[0], -x[1]))[:10]:
keep_indexes.add(idx)
selected = [window[idx] for idx in sorted(keep_indexes)]
if len(selected) < 6:
return window[-6:]
return selected[-12:]
@classmethod
def _message_relevance(
cls,
item: Dict,
*,
current_sender: str,
focus_tokens: set[str],
quote_sender_name: str,
) -> int:
content = str(item.get("content") or item.get("message") or "").strip()
if not content:
return 0
sender = str(item.get("sender", "") or "")
sender_name = str(item.get("sender_name", "") or "").strip().lower()
score = 0
if sender == current_sender:
score += 3
if quote_sender_name and quote_sender_name in sender_name:
score += 3
if item.get("is_at"):
score += 1
if focus_tokens:
tokens = cls._extract_topic_tokens(content)
overlap = focus_tokens & tokens
score += min(len(overlap) * 2, 6)
if overlap and cls._looks_like_question_or_answer(content):
score += 2
elif sender == current_sender:
score += 1
if cls._looks_like_question_or_answer(content):
score += 1
return score
@staticmethod
def _looks_like_question_or_answer(content: str) -> bool:
text = str(content or "").strip().lower()
if not text:
return False
patterns = [
r"\?$", r"$", r"怎么", r"如何", r"为啥", r"为什么", r"能不能", r"可以吗",
r"报错", r"试试", r"", r"然后", r"配置", r"日志", r"接口", r"原因",
]
return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns)
@staticmethod
def _extract_topic_tokens(content: str) -> set[str]:
text = str(content or "").lower()
tokens = set(re.findall(r"[a-z0-9_\\-]{3,}", text))
keywords = [
"openclaw", "qdrant", "ollama", "docker", "python", "api", "插件", "机器人", "模型",
"日志", "配置", "报错", "部署", "联网", "图片", "记忆", "群聊", "dota", "战绩",
]
for keyword in keywords:
if keyword in text:
tokens.add(keyword)
return tokens
@staticmethod
def _clean_display_name(sender_name: str) -> str:
import re
text = str(sender_name or "").strip()
if not text:
return ""
text = re.sub(r"\s+", "", text)
text = re.sub(r"[^\u4e00-\u9fffA-Za-z0-9_]", "", text)
return text[:8]
@staticmethod
def _build_member_memory_prompt(member_context: Dict, focus_lines: List[str] | None = None) -> str:
if not member_context:
return "暂无稳定成员画像。"
meta = member_context.get("meta", {}) or {}
topics = member_context.get("topics_of_interest", []) or []
recent_focus = member_context.get("recent_focus", []) or []
common_scenarios = ContextBuilder._stringify_items(meta.get("common_scenarios", []), 4)
skills = ContextBuilder._stringify_items(meta.get("skill_profile", []), 5)
problem_solving = ContextBuilder._stringify_items(meta.get("problem_solving_profile", []), 4)
stable_traits = ContextBuilder._stringify_items(meta.get("stable_traits", []), 4)
habits = ContextBuilder._stringify_items(meta.get("habit_patterns", []), 4)
expression_profile = ContextBuilder._stringify_items(meta.get("expression_profile", []), 4)
reply_entry = ContextBuilder._stringify_items(meta.get("reply_entry_profile", []), 4)
reply_prefs = ContextBuilder._stringify_items(meta.get("long_term_reply_preferences", []), 4)
recent_state = ContextBuilder._stringify_items(meta.get("recent_state", []), 4)
reply_taboos = ContextBuilder._stringify_items(meta.get("reply_taboos", []), 3)
lines = [
f"成员摘要:{member_context.get('summary_text', '')}".strip(),
f"互动风格:{member_context.get('interaction_style', '')}".strip(),
f"回复偏好:{member_context.get('response_style_hint', '')}".strip(),
f"本次相关记忆:{''.join((focus_lines or [])[:4])}" if focus_lines else "",
f"长期主题:{', '.join(topics[:5])}" if topics else "",
f"近期关注:{', '.join(recent_focus[:4])}" if recent_focus else "",
f"常见发言场景:{common_scenarios}" if common_scenarios else "",
f"技能侧重点:{skills}" if skills else "",
f"处理问题方式:{problem_solving}" if problem_solving else "",
f"稳定特征:{stable_traits}" if stable_traits else "",
f"习惯模式:{habits}" if habits else "",
f"表达标记:{expression_profile}" if expression_profile else "",
f"有效接话点:{reply_entry}" if reply_entry else "",
f"长期回复偏好:{reply_prefs}" if reply_prefs else "",
f"近期状态:{recent_state}" if recent_state else "",
f"气质倾向:{meta.get('temperament_tendency', '')}".strip(),
f"群内角色:{meta.get('group_role', '')}".strip(),
f"回复禁忌:{reply_taboos}" if reply_taboos else "",
]
return "\n".join([line for line in lines if line])
@staticmethod
def _stringify_items(items: List | str, limit: int) -> str:
if isinstance(items, str):
return items.strip()
values: List[str] = []
for item in items[:limit]:
if isinstance(item, dict):
value = str(
item.get("name")
or item.get("label")
or item.get("value")
or item.get("text")
or ""
).strip()
else:
value = str(item or "").strip()
if value and value not in values:
values.append(value)
return ", ".join(values)
@staticmethod
def _build_vector_memory_prompt(vector_memories: List[Dict]) -> str:
if not vector_memories:
return ""
lines = []
for item in vector_memories[:2]:
summary = item.get("content_summary") or item.get("summary_text") or item.get("text") or ""
memory_type = item.get("memory_type", "memory")
if summary:
lines.append(f"[{memory_type}] {summary}")
return "\n".join(lines)
@staticmethod
def _build_social_memory_prompt(social_memory: Dict) -> str:
prompt = str((social_memory or {}).get("prompt", "") or "").strip()
return prompt
@staticmethod
def _build_group_facts_prompt(group_facts: Dict) -> str:
return str((group_facts or {}).get("prompt", "") or "").strip()
@staticmethod
def _build_group_profile_prompt(group_profile: Dict) -> str:
if not group_profile:
return "当前群没有特殊知识域限制。"
focus = ", ".join(group_profile.get("knowledge_focus", [])[:6])
boundaries = ", ".join(group_profile.get("topic_boundaries", [])[:6])
summary = str(group_profile.get("group_memory_summary", "") or "").replace("\n", " ").strip()
if len(summary) > 120:
summary = summary[:117] + "..."
lines = [
f"群模式:{group_profile.get('mode', 'social')}",
f"知识域:{group_profile.get('knowledge_domain', 'general')}",
f"配置知识域:{group_profile.get('configured_domain', 'general')}",
f"历史推断知识域:{group_profile.get('group_memory_domain', 'general')}",
f"回答风格:{group_profile.get('reply_style', '自然短句')}",
f"互动调性:{group_profile.get('interaction_tone', '自然群友感')}",
f"幽默强度:{group_profile.get('humor_style', '轻微')}",
f"嘴硬程度:{group_profile.get('sharpness_style', '轻微嘴硬,不刻薄')}",
f"表达松弛度:{group_profile.get('expressiveness_style', '克制')}",
f"称呼强度:{group_profile.get('address_style', '低频称呼,默认直接接话')}",
f"知识重点:{focus}" if focus else "",
f"群长期摘要:{summary}" if summary else "",
f"历史推断社交风格:{ContextBuilder._build_style_summary(group_profile.get('group_memory_style', {}))}"
if group_profile.get("group_memory_style")
else "",
f"边界提醒:{boundaries}" if boundaries else "",
f"人格叠加:{group_profile.get('persona_overlay', '')}".strip(),
]
return "\n".join([line for line in lines if line])
@staticmethod
def _build_style_summary(style_profile: Dict) -> str:
if not style_profile:
return ""
return " / ".join(
[
str(style_profile.get("interaction_tone", "") or "").strip(),
str(style_profile.get("humor_style", "") or "").strip(),
str(style_profile.get("sharpness_style", "") or "").strip(),
str(style_profile.get("expressiveness_style", "") or "").strip(),
]
).strip(" /")
@staticmethod
def _build_quote_prompt(quote_context: Dict) -> str:
if not quote_context:
return ""
quote_type = quote_context.get("quote_type_label", "引用消息")
quote_sender = quote_context.get("quote_sender_name", "") or "未知成员"
quote_body = quote_context.get("quote_body", "") or ""
title = quote_context.get("title", "") or ""
lines = [
f"用户这次是在引用消息后发言。",
f"引用类型:{quote_type}",
f"被引用发送者:{quote_sender}",
f"图片附件:已附带原图" if quote_context.get("has_image_attachment") else "",
f"引用标题:{title}" if title else "",
f"被引用内容:{quote_body}" if quote_body else "",
]
return "\n".join([line for line in lines if line])
@staticmethod
def _build_image_prompt(image_context: Dict) -> str:
if not image_context:
return ""
lines = [
"已附带最近一张群图片作为上下文。",
f"图片发送者:{image_context.get('sender_name', '未知成员')}",
f"图片说明:{image_context.get('hint', '')}" if image_context.get("hint") else "",
]
return "\n".join([line for line in lines if line])
@staticmethod
def _build_image_safety_prompt(image_safety: Dict) -> str:
if not image_safety or not image_safety.get("suspected"):
return ""
if image_safety.get("has_visual_context"):
return "当前发言疑似是在评论图片,但本次已附带图片上下文,可以基于图片谨慎理解。"
reason = str(image_safety.get("reason", "") or "").strip()
lines = [
"当前发言疑似是在评论图片,但你这次没有看到图片本身。",
f"原因:{reason}" if reason else "",
"不要假装看过图,不要直接评价画面细节、人物状态、构图、文字内容或颜色元素。",
"如果要回,只能轻微承认信息不足,或请对方引用图片/补一句文字说明,再继续。",
]
return "\n".join([line for line in lines if line])

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
import re
from typing import Any, Dict, List
TECH_OVERLAP_KEYWORDS = [
"报错", "日志", "配置", "接口", "插件", "部署", "docker", "python", "openclaw", "机器人", "qdrant", "ollama",
]
ANSWER_KEYWORDS = [
"", "然后", "重启", "配置", "日志", "接口", "看一下", "试试", "排查",
"报错", "原因", "因为", "改成", "", "部署", "重现", "检查", "确认",
]
def build_conversation_hints(
recent_messages: List[Dict],
current_sender: str,
current_content: str,
quote_context: Dict[str, Any],
bot_name: str,
) -> Dict[str, Any]:
previous_messages = list(recent_messages[:-1]) if recent_messages else []
recent_window = previous_messages[-4:]
solver_count = 0
solver_senders = set()
current_tokens = extract_overlap_tokens(current_content)
for item in recent_window:
sender = str(item.get("sender", "") or "")
if not sender or sender == current_sender:
continue
content = str(item.get("content") or item.get("message") or "").strip().lower()
if looks_like_answer(content) and has_topic_overlap(current_tokens, content):
solver_count += 1
solver_senders.add(sender)
previous_same_sender_directed = False
same_sender_recent_count = 0
bot_name_lower = str(bot_name or "").lower()
for item in reversed(previous_messages[-6:]):
sender = str(item.get("sender", "") or "")
if sender != current_sender:
continue
same_sender_recent_count += 1
content = str(item.get("content") or item.get("message") or "").strip().lower()
if bool(item.get("is_at")) or (bot_name_lower and bot_name_lower in content):
previous_same_sender_directed = True
break
quote_targets_bot = False
quote_sender_name = str(quote_context.get("quote_sender_name", "") or "").strip().lower()
if quote_sender_name and bot_name_lower and bot_name_lower in quote_sender_name:
quote_targets_bot = True
return {
"has_recent_human_solver": solver_count >= 2 and len(solver_senders) >= 1,
"solver_count": solver_count,
"previous_same_sender_directed": previous_same_sender_directed,
"same_sender_recent_count": same_sender_recent_count,
"quote_targets_bot": quote_targets_bot,
}
def looks_like_answer(content: str) -> bool:
if not content:
return False
if len(content) >= 18:
return True
return any(keyword in content for keyword in ANSWER_KEYWORDS)
def extract_overlap_tokens(content: str) -> set[str]:
text = str(content or "").lower()
tokens = set(re.findall(r"[a-z0-9_\\-]{3,}", text))
for keyword in TECH_OVERLAP_KEYWORDS:
if keyword in text:
tokens.add(keyword)
return tokens
def has_topic_overlap(current_tokens: set[str], previous_content: str) -> bool:
if not current_tokens:
return False
previous_tokens = extract_overlap_tokens(previous_content)
return bool(current_tokens & previous_tokens)

View File

@@ -0,0 +1,200 @@
from __future__ import annotations
import base64
import imghdr
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional
from wechat_ipad import WechatAPIClient
def build_recent_image_context(
*,
message: Dict[str, Any],
room_id: str,
content: str,
quote_context: Dict[str, str],
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
get_sender_name: Callable[[str, str], str],
image_config: Dict[str, Any],
) -> Dict[str, str]:
if quote_context:
return {}
latest_image = get_latest_image_message(
room_id,
before_timestamp=str(message.get("timestamp") or ""),
)
if not latest_image:
return {}
if not is_recent_image_followup(content, latest_image, image_config):
return {}
sender = str(latest_image.get("sender", "") or "")
sender_name = get_sender_name(room_id, sender) if sender else "未知成员"
return {
"sender_name": sender_name,
"image_path": str(latest_image.get("image_path", "") or ""),
"hint": "用户当前这句大概率是在追问这张最近图片",
"timestamp": str(latest_image.get("timestamp", "") or ""),
}
def is_recent_image_followup(content: str, latest_image: Optional[Dict[str, Any]] = None, image_config: Dict[str, Any] | None = None) -> bool:
text = str(content or "").strip().lower()
if not text:
return False
image_words = ["", "图片", "照片", "截图", "表情包", "这张", "那张", "这图", "这p"]
ask_words = ["看看", "看下", "帮我看", "帮看看", "这个", "咋样", "什么", "识别", "分析", "评价", "点评"]
comment_words = [
"好看", "", "离谱", "抽象", "逆天", "蚌埠住", "绷不住", "", "笑死",
"", "", "", "", "绝了", "一般", "可以", "不行", "", "", "",
]
pronoun_words = ["这个", "", "", "", "", ""]
if any(word in text for word in image_words) and any(word in text for word in ask_words + comment_words):
return True
if latest_image and is_recent_image_close_enough(latest_image, image_config or {}):
short_text = len(text) <= 18
has_pronoun = any(word in text for word in pronoun_words)
has_comment = any(word in text for word in comment_words + ask_words)
if short_text and has_pronoun and has_comment:
return True
return False
def build_image_safety_hints(
*,
message: Dict[str, Any],
content: str,
quote_context: Dict[str, str],
image_context: Dict[str, str],
image_urls: List[str],
get_latest_image_message: Callable[..., Optional[Dict[str, Any]]],
image_config: Dict[str, Any],
) -> Dict[str, Any]:
if quote_context.get("quote_type_label") == "引用图片":
return {
"suspected": True,
"has_visual_context": bool(image_urls),
"reason": "用户当前是在引用图片后发言",
}
if image_context:
has_visual_context = bool(image_urls)
reason = "用户当前大概率在接最近一张群图片"
if not has_visual_context:
reason = "识别到图片跟评,但本地图片未成功附带给模型"
return {
"suspected": True,
"has_visual_context": has_visual_context,
"reason": reason,
}
latest_image = get_latest_image_message(
str(message.get("roomid") or ""),
before_timestamp=str(message.get("timestamp") or ""),
)
if latest_image and is_recent_image_followup(content, latest_image, image_config):
return {
"suspected": True,
"has_visual_context": False,
"reason": "最近刚出现图片,但这次没有拿到图片内容",
}
return {
"suspected": False,
"has_visual_context": bool(image_urls),
"reason": "",
}
def is_recent_image_close_enough(latest_image: Dict[str, Any], image_config: Dict[str, Any]) -> bool:
max_gap_minutes = max(int(image_config.get("recent_followup_window_minutes", 5) or 5), 1)
image_time = parse_message_time(str(latest_image.get("timestamp") or ""))
if not image_time:
return False
return (datetime.now() - image_time).total_seconds() <= max_gap_minutes * 60
def parse_message_time(value: str) -> Optional[datetime]:
if not value:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
return None
async def prepare_quote_image_inputs(
*,
bot: WechatAPIClient,
quote_context: Dict[str, str],
log_event: Callable[..., None],
) -> List[str]:
if not quote_context or quote_context.get("quote_type_label") != "引用图片":
return []
ref_content = quote_context.get("raw_ref_content", "") or ""
image_info = extract_quote_image_info(ref_content)
if not image_info:
return []
try:
base64_str = await bot.download_image(
aeskey=image_info["aeskey"],
cdnmidimgurl=image_info["url"],
)
except Exception as exc:
log_event("quote_image_fail", reason=f"download:{exc}")
return []
data_url = build_image_data_url(base64_str)
if not data_url:
log_event("quote_image_fail", reason="invalid_base64")
return []
return [data_url]
def build_local_image_data_url(image_path: str, main_path: Path) -> str:
if not image_path:
return ""
relative_path = image_path.lstrip("/\\").replace("/", "\\")
full_path = main_path / relative_path
if not full_path.exists():
return ""
try:
image_bytes = full_path.read_bytes()
except Exception:
return ""
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
raw_base64 = base64.b64encode(image_bytes).decode("utf-8")
return f"data:image/{image_type};base64,{raw_base64}"
def extract_quote_image_info(ref_content: str) -> Dict[str, str]:
if not ref_content:
return {}
aeskey_match = re.search(r'aeskey="([^"]+)"', ref_content)
if not aeskey_match:
return {}
url_match = re.search(r'cdnmidimgurl="([^"]+)"', ref_content)
if not url_match:
url_match = re.search(r'cdnbigimgurl="([^"]+)"', ref_content)
if not url_match:
url_match = re.search(r'cdnthumburl="([^"]+)"', ref_content)
if not url_match:
return {}
return {
"aeskey": aeskey_match.group(1),
"url": url_match.group(1),
}
def build_image_data_url(base64_str: str) -> str:
raw_base64 = str(base64_str or "").strip()
if not raw_base64:
return ""
if "," in raw_base64 and raw_base64.startswith("data:"):
raw_base64 = raw_base64.split(",", 1)[1]
try:
image_bytes = base64.b64decode(raw_base64)
except Exception:
return ""
image_type = imghdr.what(None, h=image_bytes) or "jpeg"
return f"data:image/{image_type};base64,{raw_base64}"

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import html
import xml.etree.ElementTree as ET
from typing import Any, Callable, Dict
from wechat_ipad.models.message import MessageType
def parse_quote_context(full_msg: Any, room_id: str, get_sender_name: Callable[[str, str], str]) -> Dict[str, str]:
if not full_msg or not getattr(full_msg, "content", None):
return {}
xml_content = getattr(full_msg.content, "xml_content", "") or ""
if not xml_content:
return {}
try:
root = ET.fromstring(xml_content)
except ET.ParseError:
return {}
appmsg = root.find(".//appmsg")
if appmsg is None or appmsg.findtext("type", "").strip() != "57":
return {}
refer = appmsg.find("refermsg")
if refer is None:
return {}
title = html.unescape(appmsg.findtext("title", "") or "").strip()
quote_sender_name = html.unescape(refer.findtext("displayname", "") or "").strip()
if not quote_sender_name:
quote_sender = html.unescape(refer.findtext("chatusr", "") or "").strip()
quote_sender_name = get_sender_name(room_id, quote_sender) if quote_sender else "未知成员"
ref_type = int(refer.findtext("type", "0") or 0)
ref_content = html.unescape(refer.findtext("content", "") or "").strip()
quote_type_label = quote_type_label_for(ref_type)
quote_body = build_quote_body(ref_type, ref_content, title)
return {
"title": title,
"quote_sender_name": quote_sender_name,
"quote_type_label": quote_type_label,
"quote_body": quote_body,
"raw_ref_content": ref_content,
}
def quote_type_label_for(ref_type: int) -> str:
mapping = {
MessageType.TEXT.value: "引用文本",
MessageType.IMAGE.value: "引用图片",
MessageType.VIDEO.value: "引用视频",
MessageType.APP.value: "引用应用消息",
MessageType.EMOTICON.value: "引用表情",
}
return mapping.get(ref_type, f"引用消息[{ref_type}]")
def build_quote_body(ref_type: int, ref_content: str, title: str) -> str:
if ref_type == MessageType.TEXT.value:
return ref_content[:220].strip()
if ref_type == MessageType.IMAGE.value:
details = []
if title:
details.append(f"当前追问文案:{title}")
if ref_content:
details.append("被引用的是一张图片")
return "".join(details) or "被引用的是一张图片"
if title:
return title[:220].strip()
return ref_content[:220].strip()