Files
abot/plugins/douyu/danmu_summary.py

1123 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import re
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
class DouyuDanmuSummaryHelper:
"""斗鱼弹幕场次抽取与压缩辅助器。"""
LINE_PATTERN = re.compile(
r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+"
r"(?P<nickname>.*?)\s+\(UID:\s*(?P<uid>[^,\)]+)(?P<profile>.*?)\)(?P<content>.*)$"
)
STOPWORDS: Set[str] = {
"哈哈", "哈哈哈", "hh", "hhh", "啊啊", "啊啊啊", "可以", "这个", "那个", "真的", "就是",
"一下", "主播", "兄弟", "老铁", "大家", "我们", "你们", "他们", "不是", "什么", "怎么",
"为啥", "有点", "感觉", "这里", "那里", "然后", "今天", "刚刚", "现在", "一个", "一下子",
}
SHORT_BURST_WORDS: Set[str] = {
"666", "6666", "", "牛逼", "", "", "", "", "", "", "", "", "",
"哈哈", "哈哈哈", "笑死", "卧槽", "wc", "awsl", "nb", "nbl", "c", "6",
}
NOISE_PATTERNS = [
re.compile(r"本条弹幕.*机器人", re.I),
re.compile(r"请不要.*统计机器人数", re.I),
re.compile(r"原来直播间有.*非机器人用户", re.I),
re.compile(r"如果您的直播内容是以观看他人操作为主", re.I),
]
TEMPLATE_HINT_PATTERNS = [
re.compile(r"闭目不语任由"),
re.compile(r"你就忍心一辈"),
re.compile(r"刚刚偷看你直播被老板发现"),
re.compile(r"还好老板是蝙蝠侠"),
re.compile(r"原来直播间有这么多姐妹"),
re.compile(r"之前我一直不敢发弹幕"),
re.compile(r"强子也就是吃了直播的红利"),
re.compile(r"你声音好像强子"),
re.compile(r"怎么回事强子"),
re.compile(r"你是个人吗强"),
]
TEMPLATE_MIN_LENGTH = 14
TEMPLATE_MIN_REPEAT = 4
REPEAT_MIN_COUNT = 3
@classmethod
def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]:
text = str(line or "").strip()
if not text:
return None
match = cls.LINE_PATTERN.match(text)
if not match:
return None
try:
ts = datetime.strptime(match.group("timestamp"), "%Y-%m-%d %H:%M:%S")
except Exception:
return None
return {
"timestamp": ts,
"timestamp_text": match.group("timestamp"),
"nickname": match.group("nickname").strip(),
"uid": str(match.group("uid")).strip(),
**cls._parse_profile_text(str(match.group("profile") or "")),
"content": match.group("content").strip(),
"raw": text,
}
@classmethod
def load_session_messages(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[Dict[str, Any]]:
segments = cls._normalize_segments(session.get("segments", []) or [])
if not room_id or not segments:
return []
date_keys = sorted({segment["start"].strftime("%Y%m%d") for segment in segments} |
{segment["end"].strftime("%Y%m%d") for segment in segments})
collected: List[Dict[str, Any]] = []
for date_key in date_keys:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if not parsed:
continue
if cls._in_any_segment(parsed["timestamp"], segments):
collected.append(parsed)
except Exception:
continue
return collected
@classmethod
def collect_session_source_files(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[str]:
"""
收集某个 session 实际对应到的原始弹幕 txt 文件路径。
这里不读取文件内容,只返回“这场直播跨到了哪些日期文件”,
方便上层在需要时直接把原始 txt 上传给 LLM。
"""
segments = cls._normalize_segments(session.get("segments", []) or [])
if not room_id or not segments:
return []
file_paths: List[str] = []
date_keys = sorted(
{segment["start"].strftime("%Y%m%d") for segment in segments}
| {segment["end"].strftime("%Y%m%d") for segment in segments}
)
for date_key in date_keys:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if os.path.exists(file_path):
file_paths.append(file_path)
return file_paths
@classmethod
def load_day_messages(cls, room_id: str, date_key: str, base_dir: str = "temp") -> List[Dict[str, Any]]:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
return []
collected: List[Dict[str, Any]] = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if parsed:
collected.append(parsed)
return collected
@classmethod
def build_summary_material(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 10,
) -> Dict[str, Any]:
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages(normalized)
organized_messages = prepared["organized_messages"]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
deduped_messages = cls._dedupe_consecutive_messages(organized_messages)
burst_terms = cls._build_burst_terms(organized_messages)
top_terms = cls._extract_top_terms(organized_messages, limit=30)
bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes)
peak_buckets = sorted(bucket_stats, key=lambda item: item.get("message_count", 0), reverse=True)[:top_bucket_count]
return {
"session_id": session.get("session_id", ""),
"room_id": room_id,
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"deduped_message_count": len(deduped_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
"merged_templates": prepared["merged_templates"],
"top_terms": top_terms,
"burst_terms": burst_terms,
"time_buckets": bucket_stats,
"peak_buckets": peak_buckets,
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
}
@classmethod
def build_llm_payload(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 8,
top_repeat_count: int = 24,
) -> Dict[str, Any]:
"""
面向 LLM 的高保真弹幕载荷。
规则:
1. 仅过滤平台/机器人类系统噪音。
2. 只对“同内容重复弹幕”做合并,不再做偏激进的模板化折叠。
3. 其他不同内容尽量保留,让模型看到更接近当晚现场的讨论全貌。
"""
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages_for_llm(normalized)
source_messages = prepared["llm_source_messages"]
organized_messages = prepared["organized_messages"]
bucket_stats = cls._build_time_buckets(source_messages, minutes=bucket_minutes)
peak_buckets = sorted(
bucket_stats,
key=lambda item: item.get("message_count", 0),
reverse=True,
)[:top_bucket_count]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
return {
"session_meta": {
"room_id": room_id,
"session_id": session.get("session_id", ""),
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
},
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
"cleaning_rules": [
"仅过滤系统噪音、机器人探测、平台提示类弹幕。",
"只合并同内容重复弹幕,保留出现次数、人数、首末时间。",
"不同句式、不同观点、不同刀圈讨论尽量原样保留,不再做模板化压缩。",
"热点时段、顺时序样本和原声片段共同保留,方便 LLM 还原完整语境。",
],
# 字段名继续沿用 merged_templates / repeated_messages
# 目的是兼容下游模板和主流程,实际语义已经切换成“同内容重复弹幕聚合结果”。
"merged_templates": prepared["duplicate_groups"][:top_repeat_count],
"repeated_messages": prepared["duplicate_groups"][:top_repeat_count],
"top_terms": cls._extract_top_terms(source_messages, limit=40),
"burst_terms": cls._build_burst_terms(source_messages),
"peak_buckets": cls._simplify_peak_buckets(peak_buckets),
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=12),
# 把去噪后、且只合并了“完全相同重复弹幕”的原始弹幕全文也保留下来。
# 这样上层如果希望直接把整场弹幕塞给 LLM而不是只喂摘要样本
# 就不需要再重新读文件和重复清洗。
"raw_transcript_lines": cls._build_raw_transcript_lines(organized_messages),
# 给日报类 LLM 再补一层“按时间推进的现场切片”。
# 这样模型除了看热点窗口,还能顺着时间线理解气氛如何起、如何变、最后怎么收,
# 对粉丝日报这类强调“节目效果”和“接梗链路”的文本尤其有帮助。
"chronological_samples": cls._build_chronological_samples(organized_messages, limit=28),
# 每个 session 单独给一个轻量摘要,避免多场直播合并后,
# 模型只看到全局热点而丢失“第一场在聊什么、第二场为什么突然转节奏”的信息。
"session_storyline": cls._build_session_storyline(
source_messages,
bucket_stats,
top_terms_limit=12,
sample_limit=14,
),
}
@staticmethod
def _parse_profile_text(profile_text: str) -> Dict[str, Any]:
text = str(profile_text or "").strip()
room_level = 0
fans_name = ""
fans_level = 0
noble_name = ""
room_match = re.search(r",\s*Lv\s*(\d+)", text, re.I)
if room_match:
try:
room_level = int(room_match.group(1))
except Exception:
room_level = 0
fans_match = re.search(r"/\s*([^/]+?)\s+Lv\s*(\d+)", text, re.I)
if fans_match:
fans_name = str(fans_match.group(1) or "").strip()
try:
fans_level = int(fans_match.group(2))
except Exception:
fans_level = 0
noble_match = re.search(r"/\s*(骑士|子爵|伯爵|公爵|国王|皇帝|游侠|超级皇帝|幻神)\b", text)
if noble_match:
noble_name = str(noble_match.group(1) or "").strip()
return {
"room_level": room_level,
"fans_name": fans_name,
"fans_level": fans_level,
"noble_name": noble_name,
"has_fans_badge": bool(fans_name),
"has_noble": bool(noble_name),
}
@classmethod
def _build_operator_metrics(cls, messages: List[Dict[str, Any]], organized_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
user_profiles: Dict[str, Dict[str, Any]] = {}
user_message_count = Counter()
user_organized_count = Counter()
fans_badge_users: Set[str] = set()
noble_users: Set[str] = set()
high_room_level_users: Set[str] = set()
high_fans_level_users: Set[str] = set()
fans_badge_message_count = 0
noble_message_count = 0
room_level_histogram = Counter()
fans_level_histogram = Counter()
badge_user_counter = Counter()
badge_message_counter = Counter()
for item in messages:
uid = str(item.get("uid") or "").strip()
if not uid:
continue
user_message_count[uid] += 1
profile = user_profiles.setdefault(uid, {
"uid": uid,
"nickname": str(item.get("nickname") or "").strip(),
"room_level": 0,
"fans_name": "",
"fans_level": 0,
"noble_name": "",
})
room_level = int(item.get("room_level") or 0)
fans_level = int(item.get("fans_level") or 0)
fans_name = str(item.get("fans_name") or "").strip()
noble_name = str(item.get("noble_name") or "").strip()
if room_level > int(profile.get("room_level") or 0):
profile["room_level"] = room_level
if fans_level > int(profile.get("fans_level") or 0):
profile["fans_level"] = fans_level
if fans_name and not profile.get("fans_name"):
profile["fans_name"] = fans_name
if noble_name and not profile.get("noble_name"):
profile["noble_name"] = noble_name
if profile.get("nickname") == "" and str(item.get("nickname") or "").strip():
profile["nickname"] = str(item.get("nickname") or "").strip()
if fans_name:
fans_badge_users.add(uid)
fans_badge_message_count += 1
badge_user_counter[fans_name] += 0
badge_message_counter[fans_name] += 1
if noble_name:
noble_users.add(uid)
noble_message_count += 1
for uid, profile in user_profiles.items():
room_level = int(profile.get("room_level") or 0)
fans_level = int(profile.get("fans_level") or 0)
fans_name = str(profile.get("fans_name") or "").strip()
noble_name = str(profile.get("noble_name") or "").strip()
room_level_histogram[cls._level_bucket(room_level)] += 1
if fans_name:
badge_user_counter[fans_name] += 1
fans_level_histogram[cls._fans_level_bucket(fans_level)] += 1
if noble_name:
noble_users.add(uid)
if room_level >= 30:
high_room_level_users.add(uid)
if fans_level >= 10:
high_fans_level_users.add(uid)
for item in organized_messages:
uid = str(item.get("uid") or "").strip()
if uid:
user_organized_count[uid] += 1
active_users_5plus = sum(1 for uid, count in user_message_count.items() if count >= 5)
active_users_10plus = sum(1 for uid, count in user_message_count.items() if count >= 10)
top_active_users = []
for uid, count in user_message_count.most_common(12):
profile = user_profiles.get(uid, {})
top_active_users.append({
"uid": uid,
"nickname": str(profile.get("nickname") or ""),
"message_count": count,
"organized_message_count": int(user_organized_count.get(uid, 0) or 0),
"room_level": int(profile.get("room_level", 0) or 0),
"fans_name": str(profile.get("fans_name") or ""),
"fans_level": int(profile.get("fans_level", 0) or 0),
"noble_name": str(profile.get("noble_name") or ""),
})
top_badges = []
for badge_name, unique_user_count in badge_user_counter.most_common(10):
if not badge_name:
continue
top_badges.append({
"badge_name": badge_name,
"user_count": unique_user_count,
"message_count": int(badge_message_counter.get(badge_name, 0) or 0),
})
total_unique_users = max(len(user_profiles), 1)
return {
"active_unique_users": len(user_profiles),
"active_users_5plus": active_users_5plus,
"active_users_10plus": active_users_10plus,
"fans_badge_user_count": len(fans_badge_users),
"fans_badge_user_ratio": round(len(fans_badge_users) / total_unique_users, 4),
"fans_badge_message_count": fans_badge_message_count,
"high_room_level_user_count": len(high_room_level_users),
"high_room_level_threshold": 30,
"high_fans_level_user_count": len(high_fans_level_users),
"high_fans_level_threshold": 10,
"noble_user_count": len(noble_users),
"noble_message_count": noble_message_count,
"room_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(room_level_histogram.items())
],
"fans_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(fans_level_histogram.items())
],
"top_badges": top_badges,
"top_active_users": top_active_users,
}
@staticmethod
def _level_bucket(level: int) -> str:
if level >= 40:
return "40+"
if level >= 30:
return "30-39"
if level >= 20:
return "20-29"
if level >= 10:
return "10-19"
return "1-9"
@staticmethod
def _fans_level_bucket(level: int) -> str:
if level >= 20:
return "20+"
if level >= 15:
return "15-19"
if level >= 10:
return "10-14"
if level >= 5:
return "5-9"
return "1-4"
@classmethod
def infer_sessions_from_messages(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
merge_gap_hours: int = 4,
min_session_messages: int = 50,
) -> List[Dict[str, Any]]:
if not messages:
return []
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
sessions: List[Dict[str, Any]] = []
current_messages: List[Dict[str, Any]] = []
def flush_current():
if len(current_messages) < min_session_messages:
return
session = cls._build_inferred_session(
room_id,
current_messages,
session_cutoff_hour=session_cutoff_hour,
)
if session:
sessions.append(session)
prev_dt: Optional[datetime] = None
for item in ordered:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
if prev_dt is not None:
gap_seconds = (ts - prev_dt).total_seconds()
if gap_seconds > merge_gap_hours * 3600:
flush_current()
current_messages = []
current_messages.append(item)
prev_dt = ts
flush_current()
return sessions
@classmethod
def _build_inferred_session(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
) -> Optional[Dict[str, Any]]:
if not messages:
return None
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
start_dt = ordered[0]["timestamp"]
end_dt = ordered[-1]["timestamp"]
anchor_dt = start_dt
if start_dt.hour < session_cutoff_hour:
from datetime import timedelta
anchor_dt = start_dt - timedelta(days=1)
anchor_day = anchor_dt.strftime("%Y-%m-%d")
segments = []
seg_start = ordered[0]["timestamp"]
prev_dt = ordered[0]["timestamp"]
for item in ordered[1:]:
current_dt = item["timestamp"]
if (current_dt - prev_dt).total_seconds() > 30 * 60:
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": prev_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
seg_start = current_dt
prev_dt = current_dt
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
return {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_{start_dt.strftime('%H%M%S')}",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": segments,
"is_live": False,
"source": "inferred_from_danmu",
}
@staticmethod
def _normalize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, datetime]]:
normalized = []
for item in segments:
try:
start_dt = datetime.strptime(str(item.get("start_time") or ""), "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(str(item.get("end_time") or ""), "%Y-%m-%d %H:%M:%S")
except Exception:
continue
if end_dt < start_dt:
continue
normalized.append({"start": start_dt, "end": end_dt})
return normalized
@staticmethod
def _serialize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, str]]:
result = []
for item in segments:
start_time = str(item.get("start_time") or "").strip()
end_time = str(item.get("end_time") or "").strip()
if start_time and end_time:
result.append({"start_time": start_time, "end_time": end_time})
return result
@staticmethod
def _in_any_segment(target_dt: datetime, segments: List[Dict[str, datetime]]) -> bool:
for segment in segments:
if segment["start"] <= target_dt <= segment["end"]:
return True
return False
@staticmethod
def _dedupe_consecutive_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result: List[Dict[str, Any]] = []
prev_key = None
repeat_count = 0
for item in messages:
current_key = (item.get("uid"), item.get("content"))
if current_key == prev_key:
repeat_count += 1
result[-1]["repeat_count"] = repeat_count
continue
copied = dict(item)
copied["repeat_count"] = 1
result.append(copied)
prev_key = current_key
repeat_count = 1
return result
@classmethod
def _filter_noise_messages(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
continue
result.append(item)
return result
@classmethod
def _prepare_messages(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
noise_messages: List[Dict[str, Any]] = []
candidate_messages: List[Dict[str, Any]] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
noise_messages.append(item)
continue
candidate_messages.append(item)
merged_templates, organized_messages = cls._merge_template_messages(candidate_messages)
return {
"noise_messages": noise_messages,
"merged_templates": merged_templates,
"organized_messages": organized_messages,
}
@classmethod
def _prepare_messages_for_llm(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
为日报/粉丝日报的 LLM 链路做更保守的清洗。
设计原则:
1. 只过滤系统噪音,不再做“像模板就折叠掉”的强压缩;
2. 只对完全同内容的重复弹幕做合并,避免复读海啸把语料挤爆;
3. 其他不同表达、不同观点、不同讨论方向尽量完整保留。
"""
noise_messages: List[Dict[str, Any]] = []
candidate_messages: List[Dict[str, Any]] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
noise_messages.append(item)
continue
candidate_messages.append(item)
duplicate_groups, organized_messages = cls._merge_exact_duplicate_messages(candidate_messages)
return {
"noise_messages": noise_messages,
# llm_source_messages 保留所有非噪音原始消息,用于词频、热点时段和讨论热度统计。
"llm_source_messages": candidate_messages,
# organized_messages 只去掉完全重复内容,用于给模型喂更丰富但不至于刷屏的原声样本。
"organized_messages": organized_messages,
"duplicate_groups": duplicate_groups,
}
@classmethod
def _is_noise_message(cls, content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
for pattern in cls.NOISE_PATTERNS:
if pattern.search(text):
return True
if len(text) >= 30 and len(set(text)) <= 6:
return True
return False
@classmethod
def _merge_template_messages(cls, messages: List[Dict[str, Any]]) -> (List[Dict[str, Any]], List[Dict[str, Any]]):
normalized_counter = Counter()
normalized_to_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
normalized = cls._normalize_template_text(str(item.get("content") or ""))
if not normalized:
continue
normalized_counter[normalized] += 1
normalized_to_messages[normalized].append(item)
template_keys = {
key for key, count in normalized_counter.items()
if len(key) >= cls.TEMPLATE_MIN_LENGTH and count >= cls.TEMPLATE_MIN_REPEAT
}
for key in list(normalized_counter.keys()):
if any(pattern.search(key) for pattern in cls.TEMPLATE_HINT_PATTERNS):
template_keys.add(key)
merged_templates: List[Dict[str, Any]] = []
organized_messages: List[Dict[str, Any]] = []
for normalized, items in normalized_to_messages.items():
if normalized in template_keys:
first = items[0]
merged_templates.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({str(item.get('uid') or '') for item in items if str(item.get('uid') or '').strip()}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
else:
organized_messages.extend(items)
merged_templates.sort(key=lambda item: item.get("count", 0), reverse=True)
organized_messages.sort(key=lambda item: item.get("timestamp") or datetime.min)
return merged_templates[:20], organized_messages
@classmethod
def _merge_exact_duplicate_messages(cls, messages: List[Dict[str, Any]]) -> (List[Dict[str, Any]], List[Dict[str, Any]]):
"""
只合并“同内容重复弹幕”。
注意这里的归一化非常保守:
1. 只处理大小写、空白和零宽字符;
2. 不再去掉标点,不再做模板化抽象;
3. 目的是尽量保留不同句式、不同观点和不同刀圈细节。
"""
grouped_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
ordered_keys: List[str] = []
for item in messages:
normalized = cls._normalize_duplicate_text(str(item.get("content") or ""))
if not normalized:
continue
if normalized not in grouped_messages:
ordered_keys.append(normalized)
grouped_messages[normalized].append(item)
duplicate_groups: List[Dict[str, Any]] = []
organized_messages: List[Dict[str, Any]] = []
for normalized in ordered_keys:
items = grouped_messages[normalized]
first = items[0]
copied = dict(first)
copied["repeat_count"] = len(items)
copied["repeat_user_count"] = len({
str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()
})
organized_messages.append(copied)
if len(items) < 2:
continue
duplicate_groups.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({
str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()
}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
duplicate_groups.sort(key=lambda item: item.get("count", 0), reverse=True)
organized_messages.sort(key=lambda item: item.get("timestamp") or datetime.min)
return duplicate_groups[:80], organized_messages
@classmethod
def _build_repeated_messages(cls, messages: List[Dict[str, Any]], limit: int = 24) -> List[Dict[str, Any]]:
grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
normalized = cls._normalize_template_text(content)
if not normalized:
continue
if cls._looks_like_pure_punctuation(content):
continue
grouped[normalized].append(item)
repeated_messages: List[Dict[str, Any]] = []
for items in grouped.values():
if len(items) < cls.REPEAT_MIN_COUNT:
continue
first = items[0]
repeated_messages.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({
str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()
}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True)
return repeated_messages[:limit]
@staticmethod
def _normalize_duplicate_text(content: str) -> str:
"""
LLM 清洗链路的“同内容”判定保持保守。
只做最小化标准化,避免把本来不同的表达误并成一类:
1. 转小写;
2. 去零宽字符;
3. 折叠空白。
"""
text = str(content or "").strip().lower()
if not text:
return ""
text = text.replace("\u200b", "").replace("\ufeff", "")
text = re.sub(r"\s+", " ", text)
return text
@classmethod
def _build_burst_terms(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
counters: Dict[str, Dict[str, Any]] = {}
for item in messages:
content = str(item.get("content") or "").strip().lower()
if not content:
continue
if content not in cls.SHORT_BURST_WORDS and len(content) > 6:
continue
target = counters.setdefault(content, {"text": content, "count": 0, "users": set()})
target["count"] += 1
target["users"].add(str(item.get("uid") or ""))
result = []
for item in sorted(counters.values(), key=lambda entry: entry["count"], reverse=True)[:15]:
result.append({
"text": item["text"],
"count": item["count"],
"user_count": len([uid for uid in item["users"] if uid]),
})
return result
@classmethod
def _extract_top_terms(cls, messages: List[Dict[str, Any]], limit: int = 30) -> List[Dict[str, Any]]:
counter = Counter()
for item in messages:
for token in cls._tokenize(str(item.get("content") or "")):
counter[token] += 1
result = []
for term, count in counter.most_common(limit):
result.append({"term": term, "count": count})
return result
@classmethod
def _tokenize(cls, content: str) -> List[str]:
text = str(content or "").lower().strip()
if not text:
return []
chinese_terms = re.findall(r"[\u4e00-\u9fff]{2,6}", text)
alpha_terms = re.findall(r"[a-z0-9_\-]{3,20}", text)
tokens = []
for token in chinese_terms + alpha_terms:
normalized = token.strip().lower()
if not normalized or normalized in cls.STOPWORDS:
continue
if normalized.isdigit() and len(normalized) <= 2:
continue
tokens.append(normalized)
return tokens
@staticmethod
def _normalize_template_text(content: str) -> str:
text = str(content or "").strip().lower()
if not text:
return ""
text = re.sub(r"\s+", "", text)
text = re.sub(r"[`~!@#$%^&*()_\-+=\[\]{}\\|;:'\",.<>/?,。!?、…()【】《》“”‘’·]", "", text)
text = re.sub(r"(.)\1{4,}", r"\1\1\1", text)
return text
@classmethod
def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]:
buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
bucket_minute = (ts.minute // minutes) * minutes
bucket_key = ts.replace(minute=bucket_minute, second=0)
buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item)
results: List[Dict[str, Any]] = []
for bucket_start, items in sorted(buckets.items()):
top_terms = cls._extract_top_terms(items, limit=8)
burst_terms = cls._build_burst_terms(items)[:5]
results.append({
"start_time": bucket_start,
"message_count": len(items),
"user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}),
"top_terms": top_terms,
"burst_terms": burst_terms,
"sample_messages": cls._pick_bucket_samples(items, limit=6),
})
return results
@staticmethod
def _pick_bucket_samples(items: List[Dict[str, Any]], limit: int = 6) -> List[Dict[str, str]]:
if not items:
return []
indexes = sorted({0, len(items) // 3, len(items) // 2, (len(items) * 2) // 3, len(items) - 1})
selected = []
seen = set()
for idx in indexes:
item = items[idx]
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= limit:
break
# 固定位置采样只能快速抓到“窗口骨架”,但在弹幕量大时通常不足以凑满 limit。
# 这里继续顺序补样本,把同一热点窗口里更多真实原声带给 LLM
# 减少模型只看到 4-5 条孤立短句、难以还原现场氛围的问题。
if len(selected) < limit:
for item in items:
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= limit:
break
return selected
@classmethod
def _pick_representative_messages(cls, messages: List[Dict[str, Any]], buckets: List[Dict[str, Any]]) -> List[Dict[str, str]]:
selected: List[Dict[str, str]] = []
seen = set()
for bucket in sorted(buckets, key=lambda item: item.get("message_count", 0), reverse=True)[:6]:
for sample in bucket.get("sample_messages", []):
content = str(sample.get("content") or "").strip()
if not content or content in seen:
continue
selected.append(sample)
seen.add(content)
if len(selected) >= 18:
return selected
for item in messages:
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= 18:
break
return selected
@classmethod
def _build_raw_window_samples(
cls,
peak_buckets: List[Dict[str, Any]],
per_bucket_limit: int = 8,
) -> List[Dict[str, Any]]:
windows: List[Dict[str, Any]] = []
for bucket in peak_buckets:
samples = []
for sample in bucket.get("sample_messages", [])[:per_bucket_limit]:
content = str(sample.get("content") or "").strip()
if not content:
continue
samples.append({
"time": str(sample.get("time") or ""),
"nickname": str(sample.get("nickname") or ""),
"content": content,
})
if not samples:
continue
windows.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"samples": samples,
})
return windows
@classmethod
def _build_raw_transcript_lines(cls, messages: List[Dict[str, Any]]) -> List[str]:
"""
生成可直接给 LLM 使用的顺时序弹幕全文。
规则:
1. 输入消息已经过“系统噪音过滤 + 完全相同重复合并”;
2. 不再进一步摘要,尽量保留现场原话;
3. 对重复合并过的消息补上次数信息,帮助模型感知刷屏强度。
"""
lines: List[str] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
time_text = str(item.get("timestamp_text") or "").strip()
nickname = str(item.get("nickname") or "").strip() or "观众"
repeat_count = int(item.get("repeat_count", 1) or 1)
normalized_content = cls._format_llm_transcript_content(content, repeat_count)
# 本地清洗给 LLM 的 txt 时,统一移除 UID。
# UID 对日报写作没有帮助,还会占 token、污染阅读流。
lines.append(f"[{time_text}] {nickname}{normalized_content}")
return lines
@classmethod
def _format_llm_transcript_content(cls, content: str, repeat_count: int) -> str:
"""
规范化给 LLM 的弹幕正文显示形式。
目标:
1. 像“哈哈哈”“666”“”这类典型短刷屏直接压成 `哈哈哈*120`
2. 正常讨论内容仍保留原句,只在后面标一次重复次数;
3. 既减小文本体积,又尽量不牺牲讨论语义。
"""
text = str(content or "").strip()
count = int(repeat_count or 1)
if count <= 1:
return text
if cls._should_compact_burst_text(text):
return f"{text}*{count}"
return f"{text} [重复{count}次]"
@classmethod
def _should_compact_burst_text(cls, content: str) -> bool:
"""
判断某条弹幕是否属于“适合压缩成 xN”的短刷屏文本。
这里故意保持保守,只压缩:
1. 已知短 burst 词;
2. 纯问号/感叹号/句号等情绪符号;
3. 很短、且由同类字符重复组成的刷屏文本。
"""
text = str(content or "").strip().lower()
if not text:
return False
if text in cls.SHORT_BURST_WORDS:
return True
if re.fullmatch(r"[?!!。\.~]+", text):
return True
if len(text) <= 8 and len(set(text)) <= 3:
return True
return False
@classmethod
def _build_chronological_samples(
cls,
messages: List[Dict[str, Any]],
limit: int = 20,
) -> List[Dict[str, str]]:
"""
从整场弹幕里按时间均匀抽取样本。
设计目的:
1. 热点窗口只能解释“最炸的几分钟”,但日报还需要理解整体节奏;
2. 顺时序样本能帮助 LLM 看到开场铺垫、中段起哄、尾段收束;
3. 对粉丝日报来说,这比单纯词频更容易还原“今天到底经历了什么”。
"""
if not messages:
return []
indexes = {
int(round((len(messages) - 1) * idx / max(limit - 1, 1)))
for idx in range(min(limit, len(messages)))
}
selected: List[Dict[str, str]] = []
seen = set()
for idx in sorted(indexes):
item = messages[idx]
content = str(item.get("content") or "").strip()
if not content:
continue
normalized = cls._normalize_template_text(content)
if normalized and normalized in seen:
continue
if normalized:
seen.add(normalized)
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:90],
})
if len(selected) >= limit:
break
return selected
@classmethod
def _build_session_storyline(
cls,
messages: List[Dict[str, Any]],
bucket_stats: List[Dict[str, Any]],
*,
top_terms_limit: int = 8,
sample_limit: int = 10,
) -> Dict[str, Any]:
"""
组装单场直播的轻量叙事骨架。
这里不追求大而全,而是给模型一个“这场直播从头到尾在发生什么”的概览,
让它在写日报时更容易把梗、情绪和时间顺序串起来。
"""
first_message = messages[0] if messages else {}
last_message = messages[-1] if messages else {}
hottest_bucket = max(
bucket_stats,
key=lambda item: int(item.get("message_count", 0) or 0),
default={},
)
return {
"start_time": str(first_message.get("timestamp_text") or ""),
"end_time": str(last_message.get("timestamp_text") or ""),
"top_terms": cls._extract_top_terms(messages, limit=top_terms_limit),
"burst_terms": cls._build_burst_terms(messages)[:6],
"chronological_samples": cls._build_chronological_samples(messages, limit=sample_limit),
"hottest_moment": {
"start_time": str(hottest_bucket.get("start_time") or ""),
"message_count": int(hottest_bucket.get("message_count", 0) or 0),
"user_count": int(hottest_bucket.get("user_count", 0) or 0),
"top_terms": hottest_bucket.get("top_terms", [])[:6],
"sample_messages": hottest_bucket.get("sample_messages", [])[:6],
},
}
@staticmethod
def _simplify_peak_buckets(buckets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
simplified = []
for bucket in buckets:
simplified.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"top_terms": bucket.get("top_terms", [])[:6],
"burst_terms": bucket.get("burst_terms", [])[:5],
})
return simplified
@staticmethod
def _looks_like_pure_punctuation(content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
return re.fullmatch(r"[\W_]+", text, re.UNICODE) is not None