支持向LLM上传原始弹幕TXT并补充全文输入

This commit is contained in:
liuwei
2026-04-29 13:58:57 +08:00
parent 53376546c8
commit 86c3dea1d2
2 changed files with 151 additions and 2 deletions

View File

@@ -93,6 +93,28 @@ class DouyuDanmuSummaryHelper:
continue
return collected
@classmethod
def collect_session_source_files(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[str]:
"""
收集某个 session 实际对应到的原始弹幕 txt 文件路径。
这里不读取文件内容,只返回“这场直播跨到了哪些日期文件”,
方便上层在需要时直接把原始 txt 上传给 LLM。
"""
segments = cls._normalize_segments(session.get("segments", []) or [])
if not room_id or not segments:
return []
file_paths: List[str] = []
date_keys = sorted(
{segment["start"].strftime("%Y%m%d") for segment in segments}
| {segment["end"].strftime("%Y%m%d") for segment in segments}
)
for date_key in date_keys:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if os.path.exists(file_path):
file_paths.append(file_path)
return file_paths
@classmethod
def load_day_messages(cls, room_id: str, date_key: str, base_dir: str = "temp") -> List[Dict[str, Any]]:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
@@ -213,6 +235,10 @@ class DouyuDanmuSummaryHelper:
"peak_buckets": cls._simplify_peak_buckets(peak_buckets),
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=12),
# 把去噪后、且只合并了“完全相同重复弹幕”的原始弹幕全文也保留下来。
# 这样上层如果希望直接把整场弹幕塞给 LLM而不是只喂摘要样本
# 就不需要再重新读文件和重复清洗。
"raw_transcript_lines": cls._build_raw_transcript_lines(organized_messages),
# 给日报类 LLM 再补一层“按时间推进的现场切片”。
# 这样模型除了看热点窗口,还能顺着时间线理解气氛如何起、如何变、最后怎么收,
# 对粉丝日报这类强调“节目效果”和“接梗链路”的文本尤其有帮助。
@@ -938,6 +964,27 @@ class DouyuDanmuSummaryHelper:
})
return windows
@classmethod
def _build_raw_transcript_lines(cls, messages: List[Dict[str, Any]]) -> List[str]:
"""
生成可直接给 LLM 使用的顺时序弹幕全文。
规则:
1. 输入消息已经过“系统噪音过滤 + 完全相同重复合并”;
2. 不再进一步摘要,尽量保留现场原话;
3. 对重复合并过的消息补上次数信息,帮助模型感知刷屏强度。
"""
lines: List[str] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
time_text = str(item.get("timestamp_text") or "").strip()
nickname = str(item.get("nickname") or "").strip() or "观众"
repeat_count = int(item.get("repeat_count", 1) or 1)
repeat_suffix = f" [重复{repeat_count}次]" if repeat_count > 1 else ""
lines.append(f"[{time_text}] {nickname}{content}{repeat_suffix}")
return lines
@classmethod
def _build_chronological_samples(
cls,

View File

@@ -519,7 +519,7 @@ class DouyuPlugin(MessagePluginInterface):
# - 不再对不同表达做强模板压缩,尽量保留现场讨论细节;
# - 同时让新日报结果自动避开旧缓存污染;
# 因此需要刷新旧缓存,确保新版结果真实命中新链路。
_DAILY_REPORT_CACHE_VERSION = 11
_DAILY_REPORT_CACHE_VERSION = 12
FEATURE_KEY = "DOUYU_MONITOR"
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
@@ -2054,8 +2054,10 @@ class DouyuPlugin(MessagePluginInterface):
raw_window_samples: List[Dict[str, Any]] = []
chronological_samples: List[Dict[str, Any]] = []
session_storylines: List[Dict[str, Any]] = []
raw_transcript_lines: List[str] = []
top_terms_counter = Counter()
burst_terms_counter = Counter()
source_danmu_files: List[str] = []
operator_totals = {
"fans_badge_user_count": 0,
"fans_badge_message_count": 0,
@@ -2101,6 +2103,10 @@ class DouyuPlugin(MessagePluginInterface):
raw_window_samples.append(dict(item))
for item in payload.get("chronological_samples", []) or []:
chronological_samples.append(dict(item))
for line in payload.get("raw_transcript_lines", []) or []:
line_text = str(line or "").strip()
if line_text:
raw_transcript_lines.append(line_text)
storyline = payload.get("session_storyline") or {}
if storyline:
session_storylines.append(dict(storyline))
@@ -2155,6 +2161,13 @@ class DouyuPlugin(MessagePluginInterface):
uid = str(session_message.get("uid") or "").strip()
if uid:
total_unique_users.add(uid)
# 顺手把本场真正落盘的原始弹幕 txt 也记录下来。
# 这样后面如果走 Dify 文件上传链路,就可以直接把“当天原文件”交给 LLM
# 而不是再二次拼装一个近似文本。
for file_path in DouyuDanmuSummaryHelper.collect_session_source_files(room_id, session):
normalized_path = os.path.abspath(file_path)
if normalized_path and normalized_path not in source_danmu_files:
source_danmu_files.append(normalized_path)
if not session_payloads:
return None
@@ -2227,6 +2240,13 @@ class DouyuPlugin(MessagePluginInterface):
"chronological_samples": chronological_samples[:40],
# 多场直播时保留每一场的轻量故事线,让粉丝日报更容易写出真正的“回放感”。
"session_storylines": session_storylines[:6],
# 整天范围内的原始弹幕全文。
# 这里保留的是“过滤系统噪音 + 合并完全相同重复内容”后的顺时序文本,
# 目的不是做摘要,而是尽量把现场讨论原貌交给 LLM。
"raw_danmu_transcript": "\n".join(raw_transcript_lines),
"raw_danmu_transcript_lines": raw_transcript_lines,
# 保存实际原始 txt 路径,供 Dify 模式上传文件时直接复用。
"source_danmu_files": source_danmu_files[:8],
}
artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json")
with open(artifact_path, "w", encoding="utf-8") as f:
@@ -2291,6 +2311,7 @@ class DouyuPlugin(MessagePluginInterface):
meta = payload.get("report_meta", {}) or {}
room_context_prompt = self._build_room_context_prompt_block(payload)
prompt_material = self._build_llm_prompt_material(payload, include_operator=False)
raw_transcript_block = self._build_raw_transcript_prompt_block(payload, max_lines=12000)
system_prompt = (
"你是斗鱼直播间的粉丝向整活日报编辑。"
"请只根据提供的真实弹幕材料,输出一份开心、欢乐、带一点恶搞气质的中文总结。"
@@ -2311,7 +2332,9 @@ class DouyuPlugin(MessagePluginInterface):
f"{room_context_prompt}"
"下面是已经提纯给 LLM 的现场材料。当前清洗策略只合并完全相同的重复弹幕,不同表达会尽量保留。\n"
"请优先抓原声弹幕、热点窗口、顺时序讨论推进,以及其中出现的刀圈比赛话题,不要写成空泛热闹总结。\n"
f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}"
"如果下面附带了原始弹幕全文,请把原始弹幕全文视为最高优先级证据,前面的摘要材料只是导航和补充。\n"
f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}\n\n"
f"{raw_transcript_block}"
)
return system_prompt, user_prompt
@@ -2340,6 +2363,7 @@ class DouyuPlugin(MessagePluginInterface):
peak_buckets = payload.get("peak_buckets", []) or []
chronological_samples = payload.get("chronological_samples", []) or []
session_storylines = payload.get("session_storylines", []) or []
raw_transcript_lines = payload.get("raw_danmu_transcript_lines", []) or []
material: Dict[str, Any] = {
"report_meta": {
@@ -2484,6 +2508,13 @@ class DouyuPlugin(MessagePluginInterface):
for item in session_storylines[:4]
],
},
# 保留一小段原始弹幕预览,主要用于调试和工作流里快速核对输入是否足够“像现场”。
# 真正的大全文会单独拼进 prompt避免这里的结构化 JSON 膨胀得过于夸张。
"raw_transcript_preview": [
str(line or "").strip()
for line in raw_transcript_lines[:120]
if str(line or "").strip()
],
}
if include_operator:
@@ -2517,6 +2548,27 @@ class DouyuPlugin(MessagePluginInterface):
return material
def _build_raw_transcript_prompt_block(self, payload: Dict[str, Any], max_lines: int = 12000) -> str:
"""
把原始弹幕全文整理成 prompt 可直接拼接的文本块。
这里优先满足“尽量把原始弹幕整体给 LLM 看”的目标,
同时保留一个最大行数阈值,避免极端超长场次直接把请求撑爆。
"""
raw_lines = [
str(line or "").strip()
for line in (payload.get("raw_danmu_transcript_lines", []) or [])
if str(line or "").strip()
]
if not raw_lines:
return "【按时间顺序整理的原始弹幕全文】\n(暂无可用原始弹幕全文)"
effective_lines = raw_lines[:max_lines]
lines = ["【按时间顺序整理的原始弹幕全文(已过滤系统噪音,仅合并完全相同重复内容)】"]
if len(effective_lines) < len(raw_lines):
lines.append(f"以下仅展开前 {len(effective_lines)} 行,剩余内容因长度限制未继续拼接。")
lines.extend(effective_lines)
return "\n".join(lines)
@staticmethod
def _clean_daily_report_llm_text(text: str) -> str:
"""
@@ -2940,11 +2992,59 @@ class DouyuPlugin(MessagePluginInterface):
# 控制输出长度Dify 该变量在部分工作流中配置为 paragraph(字符串)类型,
# 因此这里统一传字符串,避免出现 “max_length must be a string” 的 400 校验错误。
"max_length": str(int(self._daily_report_max_length or 1800)),
# 即使工作流节点没有消费 sys.files也尽量把整段原始弹幕文本塞进 inputs
# 这样工作流内仍可通过变量直接引用全文,不必只依赖 query 里的摘要部分。
"raw_danmu_transcript": str(payload.get("raw_danmu_transcript") or ""),
}
if self._daily_report_include_structured_inputs:
inputs["report_payload_json"] = json.dumps(payload, ensure_ascii=False)
return inputs
def _build_dify_daily_report_files(self, payload: Dict[str, Any], user_id: str) -> List[Dict[str, Any]]:
"""
组装斗鱼日报要上传给 Dify 的原始文件列表。
当前优先上传当天命中的原始弹幕 txt让工作流里的 sys.files
真正拿到“源文件级材料”,而不是只有摘要 JSON。
"""
if not self._daily_report_llm_client or self._daily_report_llm_client.provider != "dify":
return []
uploaded_files: List[Dict[str, Any]] = []
for file_path in (payload.get("source_danmu_files", []) or [])[:2]:
normalized_path = os.path.abspath(str(file_path or "").strip())
if not normalized_path or not os.path.exists(normalized_path) or not os.path.isfile(normalized_path):
continue
try:
with open(normalized_path, "rb") as file_obj:
file_bytes = file_obj.read()
except Exception as exc:
logger.warning(f"斗鱼日报原始弹幕文件读取失败: path={normalized_path}, error={exc}")
continue
if not file_bytes:
continue
upload_result = self._daily_report_llm_client.upload_dify_file(
user=user_id,
file_bytes=file_bytes,
filename=os.path.basename(normalized_path),
mime_type="text/plain",
)
upload_file_id = str((upload_result or {}).get("id") or "").strip()
if not upload_file_id:
logger.warning(
"斗鱼日报原始弹幕文件上传 Dify 失败: "
f"path={normalized_path}, last_error={self._daily_report_llm_client.last_error}"
)
continue
file_ref = self._daily_report_llm_client.build_dify_file_ref(
file_type="document",
upload_file_id=upload_file_id,
)
if file_ref:
uploaded_files.append(file_ref)
return uploaded_files
def _build_room_background_profile_seed(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
从日报载荷里抽取一份“适合给背景画像模型看的精简材料”。
@@ -3236,6 +3336,7 @@ class DouyuPlugin(MessagePluginInterface):
room_id = str(meta.get("room_id") or "").strip()
user_id = f"douyu_daily_report_{room_id or 'unknown'}"
if self._daily_report_llm_client.provider == "dify":
files = self._build_dify_daily_report_files(payload, user_id)
inputs = self._build_dify_daily_report_inputs(
task_type=task_type,
system_prompt=system_prompt,
@@ -3247,6 +3348,7 @@ class DouyuPlugin(MessagePluginInterface):
user=user_id,
inputs=inputs,
tag=tag,
files=files,
)
return str((result or {}).get("text", "") or "").strip()
return self._daily_report_llm_client.chat(