重构斗鱼粉丝日报信息提纯链路

- 新增本地弹幕文件测试入口,支持直接对样本文件生成提纯结果
- 将本地统计、主题证据簇和语义事实提示接入斗鱼日报LLM材料
- 明确降低情绪刷屏权重,改为优先提取赛事、位置、英雄、对局和场外互动信息
This commit is contained in:
liuwei
2026-04-29 14:47:42 +08:00
parent 44cd42d5f7
commit 31848f67f6
5 changed files with 12916 additions and 9 deletions

View File

@@ -24,6 +24,49 @@ class DouyuDanmuSummaryHelper:
"666", "6666", "", "牛逼", "", "", "", "", "", "", "", "", "",
"哈哈", "哈哈哈", "笑死", "卧槽", "wc", "awsl", "nb", "nbl", "c", "6",
}
# 这些词对“直播间气氛”有价值,但对“事实提炼”帮助有限。
# 后面在内容线索排序时会适当降权,避免把真正的赛事/英雄/剧情信息淹掉。
GENERIC_REACTION_TERMS: Set[str] = {
"哈哈", "哈哈哈", "哈哈哈哈", "哈哈哈哈哈", "哈哈哈哈哈哈",
"gg", "g", "888", "1", "", "啊?", "坏了", "好起来了", "翻了",
}
# 高频语义簇配置:
# 1. 不做中文分词,而是直接按“直播圈常见话题簇”收证据;
# 2. 每个簇都会保留计数、时间范围和原声样本;
# 3. 这样模型更容易抓到“今天到底发生了哪些具体事”,而不是只看到大量情绪词。
FACT_CLUSTER_CONFIGS: List[Dict[str, Any]] = [
{
"label": "赛事预告与报名动态",
"keywords": ["老头杯", "选人", "报名", "开赛", "比赛", "30号", "4月30", "4月30日"],
},
{
"label": "比赛位置与身份讨论",
"keywords": ["1号位", "5号位", "打1", "打5", "教练", "carry", "辅助"],
},
{
"label": "镜头与外形调侃",
"keywords": ["摄像头", "开摄像头", "光头", "秃头", "洗头", "面容", "露脸"],
},
{
"label": "团播人物与场外关系",
"keywords": ["糯糯", "瑶瑶", "冬瓜", "冬瓜强", "白队", "团播", "户外"],
},
{
"label": "关键对局与局势转折",
"keywords": ["奶绿", "muerta", "gg", "翻了", "拿下", "上高地", "守高地", "队友", "大炮", "萨尔", "炸弹人"],
},
]
HERO_ALIASES: Dict[str, List[str]] = {
"Muerta/奶绿": ["奶绿", "muerta"],
"德鲁伊/Lone Druid": ["德鲁伊", "lone druid", "熊德"],
"小小/Tiny": ["小小", "tiny"],
"帕克/Puck": ["帕克", "puck"],
"火猫/Ember Spirit": ["火猫", "ember"],
"敌法/Anti-Mage": ["敌法", "am", "anti-mage"],
"兽王/Beastmaster": ["兽王", "beastmaster"],
"萨尔/Disruptor": ["萨尔", "disruptor"],
"炸弹人/Techies": ["炸弹人", "techies"],
}
NOISE_PATTERNS = [
re.compile(r"本条弹幕.*机器人", re.I),
re.compile(r"请不要.*统计机器人数", re.I),
@@ -110,6 +153,26 @@ class DouyuDanmuSummaryHelper:
collected.append(parsed)
return collected
@classmethod
def load_messages_from_file(cls, file_path: str) -> List[Dict[str, Any]]:
"""
从指定文本文件直接读取弹幕。
这个入口主要用于本地调试和样本回归:
1. 不依赖 Redis session
2. 不要求文件落在 temp/douyu_danmu 目录;
3. 便于直接拿用户提供的测试样本跑提纯链路。
"""
path = str(file_path or "").strip()
if not path or not os.path.exists(path):
return []
collected: List[Dict[str, Any]] = []
with open(path, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if parsed:
collected.append(parsed)
return collected
@classmethod
def build_summary_material(
cls,
@@ -289,6 +352,8 @@ class DouyuDanmuSummaryHelper:
burst_terms=burst_terms,
limit=cue_limit,
),
# 再补一层“事实型提示”,专门抬高赛事、位置、英雄、镜头梗、关键对局等信息密度高的内容。
"semantic_fact_hints": cls._build_semantic_fact_hints(ordered_messages),
}
@staticmethod
@@ -1199,7 +1264,7 @@ class DouyuDanmuSummaryHelper:
for item in repeated_messages:
push(
"repeat",
"emotion" if cls._normalize_template_text(str(item.get("text") or "")) in cls.GENERIC_REACTION_TERMS else "repeat",
str(item.get("text") or ""),
int(item.get("count", 0) or 0),
int(item.get("user_count", 0) or 0),
@@ -1222,7 +1287,7 @@ class DouyuDanmuSummaryHelper:
if count < 2:
continue
push(
"short_repeat",
"emotion" if normalized in cls.GENERIC_REACTION_TERMS else "short_repeat",
short_message_text_map.get(normalized, normalized),
count,
len([uid for uid in short_message_users.get(normalized, set()) if uid]),
@@ -1230,15 +1295,142 @@ class DouyuDanmuSummaryHelper:
for item in burst_terms:
push(
"burst",
"emotion" if cls._normalize_template_text(str(item.get("text") or "")) in cls.GENERIC_REACTION_TERMS else "burst",
str(item.get("text") or ""),
int(item.get("count", 0) or 0),
int(item.get("user_count", 0) or 0),
)
cues.sort(key=lambda item: (int(item.get("count", 0) or 0), int(item.get("user_count", 0) or 0)), reverse=True)
kind_priority = {
"repeat": 5,
"short_repeat": 4,
"burst": 3,
"emotion": 1,
}
cues.sort(
key=lambda item: (
int(kind_priority.get(str(item.get("kind") or ""), 0)),
int(item.get("count", 0) or 0),
int(item.get("user_count", 0) or 0),
),
reverse=True,
)
return cues[:limit]
@classmethod
def _build_semantic_fact_hints(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
生成“事实型语义提示”。
这层不是做总结,而是把模型容易漏掉的高价值信息提前挂出来:
1. 赛事/选人/位置讨论;
2. 英雄与关键局;
3. 摄像头、团播人物等场外互动梗。
"""
return {
"topic_clusters": cls._build_fact_topic_clusters(messages),
"hero_mentions": cls._build_hero_mentions(messages),
}
@classmethod
def _build_fact_topic_clusters(cls, messages: List[Dict[str, Any]], limit: int = 8) -> List[Dict[str, Any]]:
clusters: List[Dict[str, Any]] = []
for config in cls.FACT_CLUSTER_CONFIGS:
matched_items: List[Dict[str, Any]] = []
keywords = [str(item).lower() for item in (config.get("keywords") or []) if str(item).strip()]
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
lowered = content.lower()
if any(keyword in lowered for keyword in keywords):
matched_items.append(item)
if not matched_items:
continue
sample_messages = []
seen = set()
for item in matched_items:
content = str(item.get("content") or "").strip()
normalized = cls._normalize_template_text(content)
if not normalized or normalized in seen:
continue
seen.add(normalized)
sample_messages.append({
"date": cls._format_date(item.get("timestamp")),
"hm": cls._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or "").strip(),
"content": content[:100],
})
if len(sample_messages) >= 5:
break
if not sample_messages:
continue
clusters.append({
"label": str(config.get("label") or "").strip(),
"match_count": len(matched_items),
"user_count": len({
str(item.get("uid") or "") for item in matched_items if str(item.get("uid") or "").strip()
}),
"first_hm": cls._format_hm(matched_items[0].get("timestamp")),
"last_hm": cls._format_hm(matched_items[-1].get("timestamp")),
"keywords": config.get("keywords", [])[:8],
"samples": sample_messages,
})
clusters.sort(
key=lambda item: (
int(item.get("match_count", 0) or 0),
int(item.get("user_count", 0) or 0),
),
reverse=True,
)
return clusters[:limit]
@classmethod
def _build_hero_mentions(cls, messages: List[Dict[str, Any]], limit: int = 6) -> List[Dict[str, Any]]:
hero_results: List[Dict[str, Any]] = []
for hero_name, aliases in cls.HERO_ALIASES.items():
matched_items: List[Dict[str, Any]] = []
alias_list = [str(alias).lower() for alias in aliases if str(alias).strip()]
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
lowered = content.lower()
if any(alias in lowered for alias in alias_list):
matched_items.append(item)
if not matched_items:
continue
samples = []
seen = set()
for item in matched_items:
content = str(item.get("content") or "").strip()
normalized = cls._normalize_template_text(content)
if not normalized or normalized in seen:
continue
seen.add(normalized)
samples.append({
"hm": cls._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or "").strip(),
"content": content[:100],
})
if len(samples) >= 4:
break
hero_results.append({
"hero": hero_name,
"mention_count": len(matched_items),
"user_count": len({
str(item.get("uid") or "") for item in matched_items if str(item.get("uid") or "").strip()
}),
"samples": samples,
})
hero_results.sort(
key=lambda item: (
int(item.get("mention_count", 0) or 0),
int(item.get("user_count", 0) or 0),
),
reverse=True,
)
return hero_results[:limit]
@staticmethod
def _looks_like_pure_punctuation(content: str) -> bool:
text = str(content or "").strip()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
"""
斗鱼弹幕本地测试脚本。
用途:
1. 直接读取用户提供的本地弹幕文本样本;
2. 跑一遍“本地提纯 + 证据簇提炼”链路;
3. 将结果输出到 temp/douyu_materials方便人工查看
4. 不依赖 Redis、Dify、直播 session。
"""
import importlib.util
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
def _load_helper():
current_dir = Path(__file__).resolve().parent
module_path = current_dir / "danmu_summary.py"
spec = importlib.util.spec_from_file_location("douyu_danmu_summary_local", module_path)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module.DouyuDanmuSummaryHelper
def _build_session(room_id: str, anchor_day: str, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
if not ordered:
return {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_empty",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": [],
}
return {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_local_test",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": [{
"start_time": ordered[0]["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
"end_time": ordered[-1]["timestamp"].strftime("%Y-%m-%d %H:%M:%S"),
}],
}
def run_local_test(file_path: str) -> str:
helper = _load_helper()
resolved_path = str(Path(file_path).resolve())
messages = helper.load_messages_from_file(resolved_path)
file_name = Path(file_path).stem
room_id, date_key = file_name.split("_", 1)
anchor_day = f"{date_key[:4]}-{date_key[4:6]}-{date_key[6:8]}"
session = _build_session(room_id, anchor_day, messages)
payload = helper.build_llm_payload(room_id, session, messages)
compact = payload.get("compact_prompt_assets", {}) or {}
result = {
"file_path": resolved_path,
"message_count": len(messages),
"session_meta": payload.get("session_meta", {}) or {},
"local_stats_preview": {
"top_repeated_messages": (payload.get("repeated_messages", []) or [])[:10],
"top_burst_terms": (payload.get("burst_terms", []) or [])[:10],
"peak_buckets": (payload.get("peak_buckets", []) or [])[:6],
},
"topic_evidence_clusters": ((compact.get("semantic_fact_hints", {}) or {}).get("topic_clusters", []) or [])[:8],
"hero_mentions": ((compact.get("semantic_fact_hints", {}) or {}).get("hero_mentions", []) or [])[:8],
"content_cues": (compact.get("content_cues", []) or [])[:16],
"timeline_digest": (compact.get("timeline_digest", []) or [])[:12],
"representative_messages": (payload.get("representative_messages", []) or [])[:18],
}
output_dir = Path(os.getcwd()) / "temp" / "douyu_materials"
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / f"{file_name}_local_test_result.json"
output_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
return str(output_path)
if __name__ == "__main__":
sample_files = [
r"plugins\douyu\danmu_test\52876_20260428.txt",
r"plugins\douyu\danmu_test\52876_20260429.txt",
]
for sample in sample_files:
path = run_local_test(sample)
print(path)

View File

@@ -2173,8 +2173,10 @@ class DouyuPlugin(MessagePluginInterface):
os.makedirs(artifact_dir, exist_ok=True)
audience_trend = self._build_audience_trend(sessions)
room_context = self._build_room_semantic_context(room_id, nickname, room_name, sessions)
prepared_all_messages = DouyuDanmuSummaryHelper._prepare_messages(all_messages)
compact_source_messages = prepared_all_messages.get("organized_messages", []) or all_messages
llm_compact = DouyuDanmuSummaryHelper.build_compact_prompt_assets(
all_messages,
compact_source_messages,
bucket_minutes=5,
speaker_limit=80,
timeline_limit=24,
@@ -2268,8 +2270,8 @@ class DouyuPlugin(MessagePluginInterface):
"5. 单独列出 2-3 个热点时段。\n"
"6. 整体控制在 600 字以内。\n\n"
f"{room_context_prompt}"
"下面是已经提纯给 LLM 的材料,其中 `compact_scene_material` 是主阅读区:\n"
"请优先依据其中的用户索引、时间线块、整句复读线索和原声样本来写,不要被大段统计信息带偏。\n"
"下面是已经提纯给 LLM 的材料,其中 `topic_evidence_clusters` 和 `compact_scene_material.semantic_fact_hints` 是主阅读区:\n"
"请优先依据其中的事实证据簇、用户索引、时间线块、整句复读线索和原声样本来写,不要被大段统计信息带偏。\n"
f"材料如下:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}"
)
return system_prompt, user_prompt
@@ -2294,7 +2296,7 @@ class DouyuPlugin(MessagePluginInterface):
f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n"
f"日期:{meta.get('anchor_day', '')}\n"
f"{room_context_prompt}"
"下面是已经提纯给 LLM 的现场材料,请优先阅读 `compact_scene_material` 中的时间线块、整句复读线索和原声样本,"
"下面是已经提纯给 LLM 的现场材料,请优先阅读 `topic_evidence_clusters` 以及 `compact_scene_material` 中的 `semantic_fact_hints`、时间线块、整句复读线索和原声样本,"
"不要写成词频复述。\n"
f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}"
)
@@ -2327,7 +2329,8 @@ class DouyuPlugin(MessagePluginInterface):
f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n"
f"日期:{meta.get('anchor_day', '')}\n"
f"{room_context_prompt}"
"下面是已经提纯给 LLM 的现场材料,请优先抓 `compact_scene_material` 里的原声弹幕、时间线块和集体起哄片段,"
"下面是已经提纯给 LLM 的现场材料,请优先抓 `topic_evidence_clusters` 和 `compact_scene_material` 里的 `semantic_fact_hints`、原声弹幕、时间线块和集体起哄片段,"
"尤其留意赛事预告、位置讨论、英雄选择、关键对局、镜头调侃和团播人物关系,"
"少写空泛概括。\n"
f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}"
)
@@ -2362,6 +2365,9 @@ class DouyuPlugin(MessagePluginInterface):
speaker_index = llm_compact.get("speaker_index", []) or []
timeline_digest = llm_compact.get("timeline_digest", []) or []
content_cues = llm_compact.get("content_cues", []) or []
semantic_fact_hints = llm_compact.get("semantic_fact_hints", {}) or {}
fact_topic_clusters = semantic_fact_hints.get("topic_clusters", []) or []
hero_mentions = semantic_fact_hints.get("hero_mentions", []) or []
material: Dict[str, Any] = {
"report_meta": {
@@ -2382,6 +2388,69 @@ class DouyuPlugin(MessagePluginInterface):
"storyline_keywords": self._normalize_text_list(room_context.get("storyline_keywords"))[:10],
"style_hints": self._normalize_text_list(room_context.get("style_hints"))[:6],
},
# 本地统计层:
# 1. 这里只放“本地就能确定”的结果;
# 2. 让 LLM 只把这些统计当作背景,不再浪费能力去数哈哈哈和复读次数。
"local_stats": {
"message_count": int(meta.get("message_count", 0) or 0),
"unique_user_count": int(meta.get("unique_user_count", 0) or 0),
"top_emotion_bursts": [
{
"text": str(item.get("text") or "").strip(),
"count": int(item.get("count", 0) or 0),
}
for item in content_cues[:12]
if str(item.get("kind") or "").strip() == "emotion" and str(item.get("text") or "").strip()
][:8],
"top_repeated_messages": [
{
"text": str(item.get("text") or "").strip()[:90],
"count": int(item.get("count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
}
for item in (repeated_messages[:12] if repeated_messages else content_cues[:12])
if str(item.get("text") or "").strip()
][:8],
"peak_windows": [
{
"start_time": str(item.get("start_time") or "").strip(),
"message_count": int(item.get("message_count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
}
for item in peak_buckets[:6]
],
},
# 这是后续给 LLM 的主工作区:
# 1. 每个簇都代表“今天弹幕里正在讨论的一件事”;
# 2. 本地只做聚类和保留证据,不替模型写结论;
# 3. LLM 负责从这些簇里提炼赛事、位置、英雄、背景和场外互动信息。
"topic_evidence_clusters": [
{
"label": str(item.get("label") or "").strip(),
"count": int(item.get("match_count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
"time_range": (
f"{str(item.get('first_hm') or '').strip()}-{str(item.get('last_hm') or '').strip()}"
).strip("-"),
"keywords": [
str(keyword).strip()
for keyword in (item.get("keywords", []) or [])[:8]
if str(keyword).strip()
],
"samples": [
{
"date": str(sample.get("date") or "").strip(),
"hm": str(sample.get("hm") or "").strip(),
"nickname": str(sample.get("nickname") or "").strip(),
"content": str(sample.get("content") or "").strip()[:100],
}
for sample in (item.get("samples", []) or [])[:5]
if str(sample.get("content") or "").strip()
],
}
for item in fact_topic_clusters[:6]
if str(item.get("label") or "").strip()
],
# 这是新的主材料层,优先级高于传统的 top_terms
# 1. speaker_index 负责承接用户画像,避免在每条样本里重复塞 UUID/牌子/等级;
# 2. timeline_digest 让模型按时间推进理解“哪一段开始起哄、哪一段反复刷屏”;
@@ -2411,6 +2480,52 @@ class DouyuPlugin(MessagePluginInterface):
for item in content_cues[:18]
if str(item.get("text") or "").strip()
],
"semantic_fact_hints": {
"topic_clusters": [
{
"label": str(item.get("label") or "").strip(),
"match_count": int(item.get("match_count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
"first_hm": str(item.get("first_hm") or "").strip(),
"last_hm": str(item.get("last_hm") or "").strip(),
"keywords": [
str(keyword).strip()
for keyword in (item.get("keywords", []) or [])[:8]
if str(keyword).strip()
],
"samples": [
{
"date": str(sample.get("date") or "").strip(),
"hm": str(sample.get("hm") or "").strip(),
"nickname": str(sample.get("nickname") or "").strip(),
"content": str(sample.get("content") or "").strip()[:100],
}
for sample in (item.get("samples", []) or [])[:5]
if str(sample.get("content") or "").strip()
],
}
for item in fact_topic_clusters[:6]
if str(item.get("label") or "").strip()
],
"hero_mentions": [
{
"hero": str(item.get("hero") or "").strip(),
"mention_count": int(item.get("mention_count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
"samples": [
{
"hm": str(sample.get("hm") or "").strip(),
"nickname": str(sample.get("nickname") or "").strip(),
"content": str(sample.get("content") or "").strip()[:100],
}
for sample in (item.get("samples", []) or [])[:4]
if str(sample.get("content") or "").strip()
],
}
for item in hero_mentions[:6]
if str(item.get("hero") or "").strip()
],
},
"timeline_digest": [
{
"date": str(item.get("date") or "").strip(),