优化斗鱼弹幕日报LLM入参压缩

- 新增面向LLM的用户索引、时间线事件块与整句高频线索压缩结构
- 将弹幕时间统一压缩为日期加时分并抽离UID尾号、牌子等级等重复元信息
- 下调中文分词结果在提示材料中的权重,改为优先使用现场原句和时间线材料
This commit is contained in:
liuwei
2026-04-29 14:25:08 +08:00
parent f475d20d17
commit 625d37018b
2 changed files with 393 additions and 6 deletions

View File

@@ -46,6 +46,10 @@ class DouyuDanmuSummaryHelper:
TEMPLATE_MIN_REPEAT = 4
REPEAT_MIN_COUNT = 3
# 统一使用分钟级时间,是因为日报场景里秒级时间几乎不会增加理解价值,
# 但会显著拉长 LLM 输入;保留到 `HH:MM` 能兼顾时序感和压缩率。
COMPACT_TIME_FORMAT = "%H:%M"
@classmethod
def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]:
text = str(line or "").strip()
@@ -225,6 +229,66 @@ class DouyuDanmuSummaryHelper:
top_terms_limit=8,
sample_limit=10,
),
# 这是专门给 LLM 准备的“压缩但保真”的材料层:
# 1. 用户画像从逐条弹幕里抽出来,避免昵称/牌子/等级在每条消息里重复出现;
# 2. 时间统一压到日期 + 时分,减少无意义的秒级噪音;
# 3. 主体按时间线块组织,更像“现场事件流”,比中文分词更适合日报生成。
"compact_prompt_assets": cls.build_compact_prompt_assets(
organized_messages,
bucket_minutes=bucket_minutes,
),
}
@classmethod
def build_compact_prompt_assets(
cls,
messages: List[Dict[str, Any]],
*,
bucket_minutes: int = 5,
speaker_limit: int = 80,
timeline_limit: int = 24,
samples_per_bucket: int = 6,
cue_limit: int = 18,
) -> Dict[str, Any]:
"""
生成专供 LLM 使用的压缩材料。
设计目标:
1. 不做中文分词,尽量保留“整句/整段弹幕”的原始信息密度;
2. 把重复出现的用户元信息抽到索引表,降低 token 浪费;
3. 把现场内容组织成时间线块,帮助模型理解节奏推进和集体起哄链路。
"""
ordered_messages = sorted(
[item for item in messages if item and str(item.get("content") or "").strip()],
key=lambda item: item.get("timestamp") or datetime.min,
)
if not ordered_messages:
return {
"speaker_index": [],
"timeline_digest": [],
"content_cues": [],
}
speaker_index, speaker_alias_map = cls._build_speaker_index(
ordered_messages,
limit=speaker_limit,
)
repeated_messages = cls._build_repeated_messages(ordered_messages, limit=cue_limit)
burst_terms = cls._build_burst_terms(ordered_messages)
return {
"speaker_index": speaker_index,
"timeline_digest": cls._build_timeline_digest(
ordered_messages,
speaker_alias_map,
bucket_minutes=bucket_minutes,
limit=timeline_limit,
samples_per_bucket=samples_per_bucket,
),
"content_cues": cls._build_content_cues(
ordered_messages,
repeated_messages=repeated_messages,
burst_terms=burst_terms,
limit=cue_limit,
),
}
@staticmethod
@@ -663,6 +727,10 @@ class DouyuDanmuSummaryHelper:
}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
"first_date": cls._format_date(first.get("timestamp")),
"first_hm": cls._format_hm(first.get("timestamp")),
"last_date": cls._format_date(items[-1].get("timestamp")),
"last_hm": cls._format_hm(items[-1].get("timestamp")),
})
repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True)
return repeated_messages[:limit]
@@ -729,20 +797,26 @@ class DouyuDanmuSummaryHelper:
@classmethod
def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]:
buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
bucket_dt_map: Dict[str, datetime] = {}
for item in messages:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
bucket_minute = (ts.minute // minutes) * minutes
bucket_key = ts.replace(minute=bucket_minute, second=0)
buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item)
bucket_text = bucket_key.strftime("%Y-%m-%d %H:%M:%S")
bucket_dt_map[bucket_text] = bucket_key
buckets[bucket_text].append(item)
results: List[Dict[str, Any]] = []
for bucket_start, items in sorted(buckets.items()):
bucket_dt = bucket_dt_map.get(bucket_start)
top_terms = cls._extract_top_terms(items, limit=8)
burst_terms = cls._build_burst_terms(items)[:5]
results.append({
"start_time": bucket_start,
"date": cls._format_date(bucket_dt),
"start_hm": cls._format_hm(bucket_dt),
"message_count": len(items),
"user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}),
"top_terms": top_terms,
@@ -765,6 +839,8 @@ class DouyuDanmuSummaryHelper:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"date": DouyuDanmuSummaryHelper._format_date(item.get("timestamp")),
"hm": DouyuDanmuSummaryHelper._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
@@ -781,6 +857,8 @@ class DouyuDanmuSummaryHelper:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"date": DouyuDanmuSummaryHelper._format_date(item.get("timestamp")),
"hm": DouyuDanmuSummaryHelper._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
@@ -810,6 +888,8 @@ class DouyuDanmuSummaryHelper:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"date": cls._format_date(item.get("timestamp")),
"hm": cls._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
@@ -833,6 +913,8 @@ class DouyuDanmuSummaryHelper:
continue
samples.append({
"time": str(sample.get("time") or ""),
"date": str(sample.get("date") or ""),
"hm": str(sample.get("hm") or ""),
"nickname": str(sample.get("nickname") or ""),
"content": content,
})
@@ -880,6 +962,8 @@ class DouyuDanmuSummaryHelper:
seen.add(normalized)
selected.append({
"time": str(item.get("timestamp_text") or ""),
"date": cls._format_date(item.get("timestamp")),
"hm": cls._format_hm(item.get("timestamp")),
"nickname": str(item.get("nickname") or ""),
"content": content[:90],
})
@@ -936,6 +1020,225 @@ class DouyuDanmuSummaryHelper:
})
return simplified
@staticmethod
def _format_date(ts: Any) -> str:
if isinstance(ts, datetime):
return ts.strftime("%Y-%m-%d")
return ""
@classmethod
def _format_hm(cls, ts: Any) -> str:
if isinstance(ts, datetime):
return ts.strftime(cls.COMPACT_TIME_FORMAT)
return ""
@classmethod
def _build_speaker_index(
cls,
messages: List[Dict[str, Any]],
*,
limit: int = 80,
) -> (List[Dict[str, Any]], Dict[str, str]):
"""
构建用户索引表。
这样时间线块里只保留 `speaker_id`,把 UID/牌子/等级这些重复元信息折叠到索引里,
既节省 token也能保留用户画像的完整性。
"""
profiles: Dict[str, Dict[str, Any]] = {}
counts = Counter()
for item in messages:
uid = str(item.get("uid") or "").strip()
if not uid:
continue
counts[uid] += 1
profile = profiles.setdefault(uid, {
"uid": uid,
"nickname": str(item.get("nickname") or "").strip(),
"room_level": 0,
"fans_name": "",
"fans_level": 0,
"noble_name": "",
})
if not profile["nickname"]:
profile["nickname"] = str(item.get("nickname") or "").strip()
profile["room_level"] = max(int(profile.get("room_level", 0) or 0), int(item.get("room_level", 0) or 0))
if int(item.get("fans_level", 0) or 0) > int(profile.get("fans_level", 0) or 0):
profile["fans_level"] = int(item.get("fans_level", 0) or 0)
if not str(profile.get("fans_name") or "").strip():
profile["fans_name"] = str(item.get("fans_name") or "").strip()
if not str(profile.get("noble_name") or "").strip():
profile["noble_name"] = str(item.get("noble_name") or "").strip()
speaker_index: List[Dict[str, Any]] = []
speaker_alias_map: Dict[str, str] = {}
for idx, (uid, count) in enumerate(counts.most_common(limit), start=1):
profile = profiles.get(uid, {})
speaker_id = f"U{idx:02d}"
speaker_alias_map[uid] = speaker_id
speaker_index.append({
"speaker_id": speaker_id,
"nickname": str(profile.get("nickname") or "").strip(),
# 只保留 UID 尾号,方便定位老观众/同名用户,又不会把整串 UUID 反复塞给模型。
"uid_tail": uid[-4:] if len(uid) >= 4 else uid,
"badge_name": str(profile.get("fans_name") or "").strip(),
"badge_level": int(profile.get("fans_level", 0) or 0),
"room_level": int(profile.get("room_level", 0) or 0),
"noble_name": str(profile.get("noble_name") or "").strip(),
"message_count": count,
})
return speaker_index, speaker_alias_map
@classmethod
def _build_timeline_digest(
cls,
messages: List[Dict[str, Any]],
speaker_alias_map: Dict[str, str],
*,
bucket_minutes: int = 5,
limit: int = 24,
samples_per_bucket: int = 6,
) -> List[Dict[str, Any]]:
"""
把弹幕按时间窗口压成事件块。
每个块里同时保留:
1. 热度数据;
2. 重复刷屏的整句线索;
3. 少量原声样本。
这比按词切碎更容易让模型理解“这一段到底发生了什么”。
"""
buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
bucket_dt_map: Dict[str, datetime] = {}
for item in messages:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
bucket_minute = (ts.minute // bucket_minutes) * bucket_minutes
bucket_dt = ts.replace(minute=bucket_minute, second=0)
bucket_key = bucket_dt.strftime("%Y-%m-%d %H:%M:%S")
bucket_dt_map[bucket_key] = bucket_dt
buckets[bucket_key].append(item)
timeline_blocks: List[Dict[str, Any]] = []
for bucket_key in sorted(buckets.keys()):
bucket_messages = buckets[bucket_key]
repeated = cls._build_repeated_messages(bucket_messages, limit=3)
samples: List[Dict[str, Any]] = []
seen_contents = set()
for item in bucket_messages:
content = str(item.get("content") or "").strip()
if not content:
continue
normalized = cls._normalize_template_text(content)
if normalized and normalized in seen_contents:
continue
if normalized:
seen_contents.add(normalized)
uid = str(item.get("uid") or "").strip()
samples.append({
"speaker_id": speaker_alias_map.get(uid, "U00"),
"hm": cls._format_hm(item.get("timestamp")),
"content": content[:90],
})
if len(samples) >= samples_per_bucket:
break
bucket_dt = bucket_dt_map.get(bucket_key)
timeline_blocks.append({
"date": cls._format_date(bucket_dt),
"start_hm": cls._format_hm(bucket_dt),
"message_count": len(bucket_messages),
"user_count": len({
str(item.get("uid") or "") for item in bucket_messages if str(item.get("uid") or "").strip()
}),
"repeated_cues": [
{
"text": str(item.get("text") or "").strip()[:80],
"count": int(item.get("count", 0) or 0),
"user_count": int(item.get("user_count", 0) or 0),
}
for item in repeated
if str(item.get("text") or "").strip()
],
"samples": samples,
})
return timeline_blocks[:limit]
@classmethod
def _build_content_cues(
cls,
messages: List[Dict[str, Any]],
*,
repeated_messages: List[Dict[str, Any]],
burst_terms: List[Dict[str, Any]],
limit: int = 18,
) -> List[Dict[str, Any]]:
"""
生成不依赖中文分词的高频内容线索。
规则:
1. 优先保留整句复读内容;
2. 短促情绪词单独保留为 burst
3. 对短句高频原话做补充,不把中文切碎成词。
"""
cues: List[Dict[str, Any]] = []
seen = set()
def push(kind: str, text: str, count: int, user_count: int = 0) -> None:
value = str(text or "").strip()
if not value:
return
normalized = cls._normalize_template_text(value)
if not normalized or normalized in seen:
return
seen.add(normalized)
cues.append({
"kind": kind,
"text": value[:90],
"count": int(count or 0),
"user_count": int(user_count or 0),
})
for item in repeated_messages:
push(
"repeat",
str(item.get("text") or ""),
int(item.get("count", 0) or 0),
int(item.get("user_count", 0) or 0),
)
short_message_counter = Counter()
short_message_users: Dict[str, Set[str]] = defaultdict(set)
short_message_text_map: Dict[str, str] = {}
for item in messages:
content = str(item.get("content") or "").strip()
if not content or len(content) > 16 or cls._looks_like_pure_punctuation(content):
continue
normalized = cls._normalize_template_text(content)
if not normalized:
continue
short_message_counter[normalized] += 1
short_message_users[normalized].add(str(item.get("uid") or "").strip())
short_message_text_map.setdefault(normalized, content)
for normalized, count in short_message_counter.most_common(limit):
if count < 2:
continue
push(
"short_repeat",
short_message_text_map.get(normalized, normalized),
count,
len([uid for uid in short_message_users.get(normalized, set()) if uid]),
)
for item in burst_terms:
push(
"burst",
str(item.get("text") or ""),
int(item.get("count", 0) or 0),
int(item.get("user_count", 0) or 0),
)
cues.sort(key=lambda item: (int(item.get("count", 0) or 0), int(item.get("user_count", 0) or 0)), reverse=True)
return cues[:limit]
@staticmethod
def _looks_like_pure_punctuation(content: str) -> bool:
text = str(content or "").strip()