优化ai_auto_response提示词与短回复策略:场景优先级、防冲突压缩、记忆相关性筛选、可配置长度限制

- 增加场景优先级规则,技术群优先结论与排查点,降低人设冲突\n- Dify 入参新增上下文压缩、画像与记忆去重、低相关记忆过滤\n- 回复后处理支持配置化长度阈值,并增加总字数上限裁剪\n- 新增 prompt_compact/reply 配置项,便于后续按群微调
This commit is contained in:
liuwei
2026-04-16 11:24:41 +08:00
parent a68d6d5e6c
commit 5eb1e3294f
4 changed files with 273 additions and 20 deletions

View File

@@ -53,6 +53,33 @@ long_absent_member_days = 30
memory_lookback_days = 180 memory_lookback_days = 180
active_context_hours = 8 active_context_hours = 8
[reply]
social_short_char_limit = 30
social_short_total_limit = 30
qa_fast_char_limit = 34
qa_fast_total_limit = 34
qa_with_context_sentence_limit = 2
qa_with_context_chunk_limit = 2
qa_with_context_char_limit = 32
qa_with_context_total_limit = 55
default_char_limit = 28
default_total_limit = 28
[prompt_compact]
group_profile_max_chars = 560
group_profile_max_lines = 10
context_max_chars = 900
context_max_lines = 18
recent_message_max_lines = 8
recent_message_line_max_chars = 60
at_member_profile_max_chars = 300
at_member_profile_max_lines = 8
member_memory_max_chars = 520
member_memory_max_lines = 12
memory_max_chars = 900
memory_max_lines = 18
strict_memory_relevance = true
[image] [image]
recent_followup_window_minutes = 5 recent_followup_window_minutes = 5

View File

@@ -25,6 +25,7 @@ def build_user_prompt(context: Dict, memory_hints: Dict) -> str:
rules = [ rules = [
"只处理当前发言对应的一个话题,优先直接回答当前发言。", "只处理当前发言对应的一个话题,优先直接回答当前发言。",
"规则优先级:当前发言可验证信息 > 群场景约束 > 人设措辞润色。",
"如果是明确问题,先给结论;只给第一层答案,不主动展开第二层解释。", "如果是明确问题,先给结论;只给第一层答案,不主动展开第二层解释。",
length_rule, length_rule,
"能少说就少说,优先像群友随口接一句,不要写成说明文。", "能少说就少说,优先像群友随口接一句,不要写成说明文。",
@@ -48,6 +49,8 @@ def build_user_prompt(context: Dict, memory_hints: Dict) -> str:
rules.append("这次是对方点名互动,优先参考“本次@发起者画像”,语气贴近对方,但不要过度装熟。") rules.append("这次是对方点名互动,优先参考“本次@发起者画像”,语气贴近对方,但不要过度装熟。")
if group_profile.get("knowledge_domain") == "dota": if group_profile.get("knowledge_domain") == "dota":
rules.append("如果对方问的是 Dota2 最近战绩、实时战绩、最新对局数据,要委婉说明现在没法提取,不要硬编。") rules.append("如果对方问的是 Dota2 最近战绩、实时战绩、最新对局数据,要委婉说明现在没法提取,不要硬编。")
if str(group_profile.get("mode", "") or "") in {"robotics", "openclaw"}:
rules.append("当前是技术群场景,优先给结论+一个关键排查点,少情绪铺垫,不用夸张亲昵称呼。")
sections = [ sections = [
_section( _section(

View File

@@ -1,23 +1,46 @@
from __future__ import annotations from __future__ import annotations
import re import re
from typing import List from typing import Dict, List
def finalize_reply(response: str, reply_mode: str) -> List[str]: def finalize_reply(response: str, reply_mode: str, limits: Dict | None = None) -> List[str]:
text = str(response or "").strip() text = str(response or "").strip()
if not text: if not text:
return [] return []
text = re.sub(r"\s+", " ", text) text = re.sub(r"\s+", " ", text)
text = text.replace("\n", " ").strip() text = text.replace("\n", " ").strip()
options = _resolve_limits(reply_mode, limits or {})
if reply_mode == "social_short": if reply_mode == "social_short":
return split_reply_chunks(text, sentence_limit=1, char_limit=30, chunk_limit=1, allow_clip_split=False) chunks = split_reply_chunks(
text,
sentence_limit=1,
char_limit=options["char_limit"],
chunk_limit=1,
allow_clip_split=False,
)
return _clip_total_chars(chunks, options["total_limit"])
if reply_mode == "qa_fast": if reply_mode == "qa_fast":
return split_reply_chunks(text, sentence_limit=1, char_limit=34, chunk_limit=1, allow_clip_split=False) chunks = split_reply_chunks(
text,
sentence_limit=1,
char_limit=options["char_limit"],
chunk_limit=1,
allow_clip_split=False,
)
return _clip_total_chars(chunks, options["total_limit"])
if reply_mode == "qa_with_context": if reply_mode == "qa_with_context":
return split_reply_chunks(text, sentence_limit=2, char_limit=36, chunk_limit=2, allow_clip_split=False) chunks = split_reply_chunks(
return [take_first_sentence(text, 28).strip()] text,
sentence_limit=options["sentence_limit"],
char_limit=options["char_limit"],
chunk_limit=options["chunk_limit"],
allow_clip_split=False,
)
return _clip_total_chars(chunks, options["total_limit"])
chunks = [take_first_sentence(text, options["default_char_limit"]).strip()]
return _clip_total_chars(chunks, options["total_limit"])
def preview_text(text: str, limit: int = 80) -> str: def preview_text(text: str, limit: int = 80) -> str:
@@ -33,7 +56,7 @@ def build_length_rule(reply_mode: str) -> str:
if reply_mode == "qa_fast": if reply_mode == "qa_fast":
return "优先1句话尽量控制在34字内先给结论不要展开。" return "优先1句话尽量控制在34字内先给结论不要展开。"
if reply_mode == "qa_with_context": if reply_mode == "qa_with_context":
return "优先1句必要时最多2句每句尽量控制在36字内,只给第一层答案。" return "优先1句必要时最多2句每句尽量控制在32字内,只给第一层答案。"
return "尽量短,像群友临时接一句,不要长篇大论。" return "尽量短,像群友临时接一句,不要长篇大论。"
@@ -101,3 +124,67 @@ def _find_split_at(window: str, punctuation: str, lookback: int = 10) -> int:
if window[idx] in punctuation: if window[idx] in punctuation:
return idx return idx
return -1 return -1
def _resolve_limits(reply_mode: str, limits: Dict) -> Dict[str, int]:
mode_defaults = {
"social_short": {"sentence_limit": 1, "char_limit": 30, "chunk_limit": 1, "total_limit": 30},
"qa_fast": {"sentence_limit": 1, "char_limit": 34, "chunk_limit": 1, "total_limit": 34},
"qa_with_context": {"sentence_limit": 2, "char_limit": 32, "chunk_limit": 2, "total_limit": 55},
}
defaults = mode_defaults.get(reply_mode, {"sentence_limit": 1, "char_limit": 28, "chunk_limit": 1, "total_limit": 28})
if reply_mode == "social_short":
return {
"sentence_limit": 1,
"chunk_limit": 1,
"char_limit": max(int(limits.get("social_short_char_limit", defaults["char_limit"]) or defaults["char_limit"]), 8),
"total_limit": max(int(limits.get("social_short_total_limit", defaults["total_limit"]) or defaults["total_limit"]), 8),
"default_char_limit": max(int(limits.get("default_char_limit", 28) or 28), 8),
}
if reply_mode == "qa_fast":
return {
"sentence_limit": 1,
"chunk_limit": 1,
"char_limit": max(int(limits.get("qa_fast_char_limit", defaults["char_limit"]) or defaults["char_limit"]), 8),
"total_limit": max(int(limits.get("qa_fast_total_limit", defaults["total_limit"]) or defaults["total_limit"]), 8),
"default_char_limit": max(int(limits.get("default_char_limit", 28) or 28), 8),
}
if reply_mode == "qa_with_context":
return {
"sentence_limit": max(int(limits.get("qa_with_context_sentence_limit", defaults["sentence_limit"]) or defaults["sentence_limit"]), 1),
"chunk_limit": max(int(limits.get("qa_with_context_chunk_limit", defaults["chunk_limit"]) or defaults["chunk_limit"]), 1),
"char_limit": max(int(limits.get("qa_with_context_char_limit", defaults["char_limit"]) or defaults["char_limit"]), 8),
"total_limit": max(int(limits.get("qa_with_context_total_limit", defaults["total_limit"]) or defaults["total_limit"]), 8),
"default_char_limit": max(int(limits.get("default_char_limit", 28) or 28), 8),
}
return {
"sentence_limit": 1,
"chunk_limit": 1,
"char_limit": max(int(limits.get("default_char_limit", defaults["char_limit"]) or defaults["char_limit"]), 8),
"total_limit": max(int(limits.get("default_total_limit", defaults["total_limit"]) or defaults["total_limit"]), 8),
"default_char_limit": max(int(limits.get("default_char_limit", 28) or 28), 8),
}
def _clip_total_chars(chunks: List[str], total_limit: int) -> List[str]:
if not chunks:
return []
normalized_limit = max(int(total_limit or 0), 8)
result: List[str] = []
used = 0
for chunk in chunks:
current = str(chunk or "").strip()
if not current:
continue
remain = normalized_limit - used
if remain <= 0:
break
if len(current) <= remain:
result.append(current)
used += len(current)
continue
clipped = smart_clip(current, remain)
if clipped:
result.append(clipped)
break
return result

View File

@@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import re
import time import time
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
@@ -98,6 +99,8 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.queue_worker_count = 1 self.queue_worker_count = 1
self.queue_maxsize = 200 self.queue_maxsize = 200
self.queue_workers: List[asyncio.Task] = [] self.queue_workers: List[asyncio.Task] = []
self.reply_limits: Dict[str, Any] = {}
self.prompt_compact_config: Dict[str, Any] = {}
def initialize(self, context: Dict[str, Any]) -> bool: def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger self.LOG = logger
@@ -134,6 +137,8 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.filters = self._config.get("filters", {}) or {} self.filters = self._config.get("filters", {}) or {}
self.mode_config = self._config.get("mode", {}) or {} self.mode_config = self._config.get("mode", {}) or {}
self.cooldown_config = self._config.get("cooldown", {}) or {} self.cooldown_config = self._config.get("cooldown", {}) or {}
self.reply_limits = self._config.get("reply", {}) or {}
self.prompt_compact_config = self._config.get("prompt_compact", {}) or {}
self.cooldown = CooldownManager(self.cooldown_config) self.cooldown = CooldownManager(self.cooldown_config)
self.image_config = self._config.get("image", {}) or {} self.image_config = self._config.get("image", {}) or {}
self.spam_config = self._config.get("spam_guard", {}) or {} self.spam_config = self._config.get("spam_guard", {}) or {}
@@ -573,7 +578,7 @@ class AIAutoResponsePlugin(MessagePluginInterface):
) )
return False, "llm_empty_reply" return False, "llm_empty_reply"
reply_chunks = finalize_reply(reply_text, reply_mode) reply_chunks = finalize_reply(reply_text, reply_mode, self.reply_limits)
final_response_text = "\n".join(reply_chunks) final_response_text = "\n".join(reply_chunks)
reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90)) reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90))
if not reply_chunks or self.dedup.should_skip_duplicate_reply( if not reply_chunks or self.dedup.should_skip_duplicate_reply(
@@ -753,28 +758,68 @@ class AIAutoResponsePlugin(MessagePluginInterface):
files: List[Dict[str, Any]], files: List[Dict[str, Any]],
) -> Dict[str, Any]: ) -> Dict[str, Any]:
persona = self._compose_dify_persona_text(group_profile, context) persona = self._compose_dify_persona_text(group_profile, context)
group_profile_text = str(context.get("group_profile_prompt", "") or "").strip() or "当前群没有特殊画像。" group_profile_text = self._compact_text(
str(context.get("group_profile_prompt", "") or "").strip() or "当前群没有特殊画像。",
max_chars=int(self.prompt_compact_config.get("group_profile_max_chars", 560) or 560),
max_lines=int(self.prompt_compact_config.get("group_profile_max_lines", 10) or 10),
)
context_parts = [ context_parts = [
self._string_block("最近上下文", self._join_recent_messages(context)), self._string_block(
"最近上下文",
self._join_recent_messages(
context,
max_lines=int(self.prompt_compact_config.get("recent_message_max_lines", 8) or 8),
max_line_chars=int(self.prompt_compact_config.get("recent_message_line_max_chars", 60) or 60),
),
),
self._string_block("引用补充", context.get("quote_prompt", "")), self._string_block("引用补充", context.get("quote_prompt", "")),
self._string_block("图片补充", context.get("image_prompt", "")), self._string_block("图片补充", context.get("image_prompt", "")),
self._string_block("图片谨慎提示", context.get("image_safety_prompt", "")), self._string_block("图片谨慎提示", context.get("image_safety_prompt", "")),
] ]
context_text = "\n\n".join([part for part in context_parts if part]).strip() or "无额外上下文。" context_text = self._compact_text(
"\n\n".join([part for part in context_parts if part]).strip() or "无额外上下文。",
max_chars=int(self.prompt_compact_config.get("context_max_chars", 900) or 900),
max_lines=int(self.prompt_compact_config.get("context_max_lines", 18) or 18),
)
at_member_profile_text = self._compact_text(
str(context.get("at_member_profile_prompt", "") or ""),
max_chars=int(self.prompt_compact_config.get("at_member_profile_max_chars", 300) or 300),
max_lines=int(self.prompt_compact_config.get("at_member_profile_max_lines", 8) or 8),
)
member_memory_text = self._compact_text(
str(context.get("memory_prompt", "") or ""),
max_chars=int(self.prompt_compact_config.get("member_memory_max_chars", 520) or 520),
max_lines=int(self.prompt_compact_config.get("member_memory_max_lines", 12) or 12),
)
member_memory_text = self._remove_overlap_lines(member_memory_text, at_member_profile_text)
memory_parts = [ memory_parts = [
self._string_block("本次@发起者画像(优先)", context.get("at_member_profile_prompt", "")), self._string_block("本次@发起者画像(优先)", at_member_profile_text),
self._string_block("成员记忆", context.get("memory_prompt", "")), self._string_block("成员记忆", member_memory_text),
self._string_block("群关系记忆", context.get("social_memory_prompt", "")), self._string_block(
self._string_block("事实记忆", context.get("group_facts_prompt", "")), "关系记忆",
self._string_block("向量召回记忆", context.get("vector_memory_prompt", "")), self._memory_if_relevant(content, str(context.get("social_memory_prompt", "") or ""), "social"),
),
self._string_block(
"群事实记忆",
self._memory_if_relevant(content, str(context.get("group_facts_prompt", "") or ""), "facts"),
),
self._string_block(
"向量召回记忆",
self._memory_if_relevant(content, str(context.get("vector_memory_prompt", "") or ""), "vector"),
),
self._string_block( self._string_block(
"回归状态", "回归状态",
str(memory_hints.get("returning_member_state", "") or "").strip() or "none", str(memory_hints.get("returning_member_state", "") or "").strip() or "none",
), ),
] ]
memory_text = "\n\n".join([part for part in memory_parts if part]).strip() or "无直接相关记忆。" memory_text = self._compact_text(
"\n\n".join([part for part in memory_parts if part]).strip() or "无直接相关记忆。",
max_chars=int(self.prompt_compact_config.get("memory_max_chars", 900) or 900),
max_lines=int(self.prompt_compact_config.get("memory_max_lines", 18) or 18),
)
control_lines = [ control_lines = [
f"reply_mode={context.get('reply_mode', 'social_short')}", f"reply_mode={context.get('reply_mode', 'social_short')}",
@@ -805,11 +850,13 @@ class AIAutoResponsePlugin(MessagePluginInterface):
preset = self.persona_engine.presets.get( preset = self.persona_engine.presets.get(
str(group_profile.get("persona_id", "") or self.persona_engine.default_persona_id) str(group_profile.get("persona_id", "") or self.persona_engine.default_persona_id)
) or {} ) or {}
mode = str(group_profile.get("mode", "") or "").strip().lower()
lines = [ lines = [
str(preset.get("persona_text", "") or "").strip(), str(preset.get("persona_text", "") or "").strip(),
f"整体风格:{preset.get('style', '')}".strip(), f"整体风格:{preset.get('style', '')}".strip(),
f"熟悉感边界:{preset.get('familiarity_hint', '')}".strip(), f"熟悉感边界:{preset.get('familiarity_hint', '')}".strip(),
f"最多输出:{preset.get('max_reply_sentences', 3)}".strip(), f"最多输出:{preset.get('max_reply_sentences', 3)}".strip(),
"冲突优先级:当前发言可验证信息 > 群场景约束 > 人设措辞。",
"强约束默认1句短回复尽量30字内必要时最多2句总体不超过55字。", "强约束默认1句短回复尽量30字内必要时最多2句总体不超过55字。",
"不要暴露 AI、模型、提示词、system 或记忆来源。", "不要暴露 AI、模型、提示词、system 或记忆来源。",
"不要输出 markdown、代码块、标签。", "不要输出 markdown、代码块、标签。",
@@ -818,20 +865,25 @@ class AIAutoResponsePlugin(MessagePluginInterface):
"如果信息不足就收着说,不要硬编。", "如果信息不足就收着说,不要硬编。",
"哪怕短回复,也尽量保留一点人格味道,别压成纯功能性短句。", "哪怕短回复,也尽量保留一点人格味道,别压成纯功能性短句。",
] ]
if mode in {"robotics", "openclaw"}:
lines.append("当前技术群场景:优先结论+一个关键排查点,少铺垫,避免夸张亲昵称呼。")
length_rule = str(context.get("reply_mode", "") or "").strip() length_rule = str(context.get("reply_mode", "") or "").strip()
if length_rule: if length_rule:
lines.append(f"当前回复模式:{length_rule}") lines.append(f"当前回复模式:{length_rule}")
return "\n".join([line for line in lines if line]) return "\n".join([line for line in lines if line])
@staticmethod @staticmethod
def _join_recent_messages(context: Dict) -> str: def _join_recent_messages(context: Dict, max_lines: int = 8, max_line_chars: int = 60) -> str:
items = context.get("recent_message_items", []) or [] items = context.get("recent_message_items", []) or []
lines = [] lines = []
for item in items: for item in items[-max(max_lines, 1):]:
sender = str(item.get("sender", "") or "未知成员").strip() sender = str(item.get("sender", "") or "未知成员").strip()
content = str(item.get("content", "") or "").strip() content = str(item.get("content", "") or "").strip()
if sender and content: if sender and content:
lines.append(f"{sender}: {content}") compact = re.sub(r"\s+", " ", content).strip()
if len(compact) > max_line_chars:
compact = compact[: max_line_chars - 3].rstrip() + "..."
lines.append(f"{sender}: {compact}")
return "\n".join(lines) return "\n".join(lines)
@staticmethod @staticmethod
@@ -841,6 +893,90 @@ class AIAutoResponsePlugin(MessagePluginInterface):
return "" return ""
return f"{title}\n{text}" return f"{title}\n{text}"
def _memory_if_relevant(self, content: str, memory_text: str, memory_type: str) -> str:
text = str(memory_text or "").strip()
if not text:
return ""
strict = bool(self.prompt_compact_config.get("strict_memory_relevance", True))
if not strict:
return self._compact_text(text, max_chars=360, max_lines=8)
if self._is_text_relevant(content, text):
return self._compact_text(text, max_chars=360, max_lines=8)
self._log_event(
"memory_skip",
memory_type=memory_type,
reason="not_relevant",
content_preview=preview_text(content, 36),
)
return ""
@staticmethod
def _compact_text(text: str, max_chars: int, max_lines: int) -> str:
raw = str(text or "").strip()
if not raw:
return ""
lines = [re.sub(r"\s+", " ", line).strip() for line in raw.splitlines() if line and line.strip()]
if max_lines > 0 and len(lines) > max_lines:
lines = lines[:max_lines]
merged = "\n".join(lines).strip()
if len(merged) <= max_chars:
return merged
return merged[: max_chars - 3].rstrip(" ,;。.!?:") + "..."
@staticmethod
def _remove_overlap_lines(base_text: str, reference_text: str) -> str:
base_lines = [line.strip() for line in str(base_text or "").splitlines() if line.strip()]
if not base_lines:
return ""
refs = [line.strip() for line in str(reference_text or "").splitlines() if line.strip()]
if not refs:
return "\n".join(base_lines)
ref_norm = [AIAutoResponsePlugin._normalize_overlap_token(line) for line in refs]
kept: List[str] = []
for line in base_lines:
norm = AIAutoResponsePlugin._normalize_overlap_token(line)
if not norm:
continue
overlapped = False
for item in ref_norm:
if not item:
continue
if norm == item or norm in item or item in norm:
overlapped = True
break
if not overlapped:
kept.append(line)
return "\n".join(kept)
@staticmethod
def _normalize_overlap_token(text: str) -> str:
value = str(text or "").strip().lower()
value = re.sub(r"[:,;。.!?\-\s]", "", value)
return value
@staticmethod
def _is_text_relevant(content: str, memory_text: str) -> bool:
content_tokens = AIAutoResponsePlugin._extract_relevance_tokens(content)
memory_tokens = AIAutoResponsePlugin._extract_relevance_tokens(memory_text)
if not content_tokens or not memory_tokens:
return False
overlap = content_tokens & memory_tokens
return len(overlap) >= 1
@staticmethod
def _extract_relevance_tokens(text: str) -> set[str]:
raw = str(text or "").lower()
tokens = set(re.findall(r"[a-z0-9_\\-]{2,}", raw))
zh_keywords = [
"机器人", "插件", "部署", "报错", "配置", "接口", "脚本", "微信", "", "记忆", "成本",
"价格", "api", "模型", "功能", "菜单", "指令", "回复", "引用", "上下文",
]
for keyword in zh_keywords:
if keyword in raw:
tokens.add(keyword)
return tokens
def _build_dify_image_files(self, *, user_id: str, image_urls: List[str]) -> List[Dict[str, Any]]: def _build_dify_image_files(self, *, user_id: str, image_urls: List[str]) -> List[Dict[str, Any]]:
files: List[Dict[str, Any]] = [] files: List[Dict[str, Any]] = []
for index, image_url in enumerate(image_urls or [], start=1): for index, image_url in enumerate(image_urls or [], start=1):