unify xiaoniu topic selection and reply planning

This commit is contained in:
liuwei
2026-04-08 09:23:10 +08:00
parent 8ead2c43bf
commit 67eec32f7f
2 changed files with 286 additions and 27 deletions

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import re
from typing import Dict, List
@@ -24,8 +25,9 @@ class ContextBuilder:
quote_context: Dict | None = None,
image_context: Dict | None = None,
) -> Dict:
selected_messages = self._select_recent_messages(recent_messages, sender, content, quote_context or {})
recent_lines = []
for item in recent_messages[-self.recent_context_size:]:
for item in selected_messages:
msg_sender = item.get("sender_name") or item.get("sender") or "未知成员"
msg_content = item.get("content") or item.get("message") or ""
if msg_content:
@@ -38,6 +40,7 @@ class ContextBuilder:
"member_context": member_context or {},
},
"speaker_name_clean": self._clean_display_name(sender_name),
"recent_message_items": self._build_recent_message_items(selected_messages),
"recent_messages": recent_lines,
"recent_summary": "",
"trigger_type": trigger.get("trigger_type", "none"),
@@ -51,6 +54,123 @@ class ContextBuilder:
"current_message": f"{sender_name}: {content}",
}
@staticmethod
def _build_recent_message_items(messages: List[Dict]) -> List[Dict]:
items: List[Dict] = []
for idx, item in enumerate(messages, start=1):
content = str(item.get("content") or item.get("message") or "").strip()
if not content:
continue
items.append({
"idx": idx,
"sender": item.get("sender_name") or item.get("sender") or "未知成员",
"content": content[:120],
"is_at": bool(item.get("is_at")),
})
return items
def _select_recent_messages(
self,
recent_messages: List[Dict],
current_sender: str,
current_content: str,
quote_context: Dict,
) -> List[Dict]:
if not recent_messages:
return []
window = recent_messages[-self.recent_context_size:]
if len(window) <= 8:
return window
current_tokens = self._extract_topic_tokens(current_content)
quote_tokens = self._extract_topic_tokens(
f"{quote_context.get('title', '')} {quote_context.get('quote_body', '')}"
)
focus_tokens = current_tokens | quote_tokens
quote_sender_name = str(quote_context.get("quote_sender_name", "") or "").strip().lower()
scored: List[tuple[int, int, Dict]] = []
for idx, item in enumerate(window):
score = self._message_relevance(
item,
current_sender=current_sender,
focus_tokens=focus_tokens,
quote_sender_name=quote_sender_name,
)
if score > 0:
scored.append((score, idx, item))
# 总是保留尾部几条,维持现场感;再拼上与当前话题最相关的消息。
tail_indexes = set(range(max(len(window) - 4, 0), len(window)))
keep_indexes = set(tail_indexes)
for _, idx, _ in sorted(scored, key=lambda x: (-x[0], -x[1]))[:10]:
keep_indexes.add(idx)
selected = [window[idx] for idx in sorted(keep_indexes)]
if len(selected) < 6:
return window[-6:]
return selected[-12:]
@classmethod
def _message_relevance(
cls,
item: Dict,
*,
current_sender: str,
focus_tokens: set[str],
quote_sender_name: str,
) -> int:
content = str(item.get("content") or item.get("message") or "").strip()
if not content:
return 0
sender = str(item.get("sender", "") or "")
sender_name = str(item.get("sender_name", "") or "").strip().lower()
score = 0
if sender == current_sender:
score += 3
if quote_sender_name and quote_sender_name in sender_name:
score += 3
if item.get("is_at"):
score += 1
if focus_tokens:
tokens = cls._extract_topic_tokens(content)
overlap = focus_tokens & tokens
score += min(len(overlap) * 2, 6)
if overlap and cls._looks_like_question_or_answer(content):
score += 2
elif sender == current_sender:
score += 1
if cls._looks_like_question_or_answer(content):
score += 1
return score
@staticmethod
def _looks_like_question_or_answer(content: str) -> bool:
text = str(content or "").strip().lower()
if not text:
return False
patterns = [
r"\?$", r"$", r"怎么", r"如何", r"为啥", r"为什么", r"能不能", r"可以吗",
r"报错", r"试试", r"", r"然后", r"配置", r"日志", r"接口", r"原因",
]
return any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in patterns)
@staticmethod
def _extract_topic_tokens(content: str) -> set[str]:
text = str(content or "").lower()
tokens = set(re.findall(r"[a-z0-9_\\-]{3,}", text))
keywords = [
"openclaw", "qdrant", "ollama", "docker", "python", "api", "插件", "机器人", "模型",
"日志", "配置", "报错", "部署", "联网", "图片", "记忆", "群聊", "dota", "战绩",
]
for keyword in keywords:
if keyword in text:
tokens.add(keyword)
return tokens
@staticmethod
def _clean_display_name(sender_name: str) -> str:
import re