From 625d37018b6589a245441385e2c0efbce0b9b420 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 29 Apr 2026 14:25:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=97=E9=B1=BC=E5=BC=B9?= =?UTF-8?q?=E5=B9=95=E6=97=A5=E6=8A=A5LLM=E5=85=A5=E5=8F=82=E5=8E=8B?= =?UTF-8?q?=E7=BC=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增面向LLM的用户索引、时间线事件块与整句高频线索压缩结构 - 将弹幕时间统一压缩为日期加时分并抽离UID尾号、牌子等级等重复元信息 - 下调中文分词结果在提示材料中的权重,改为优先使用现场原句和时间线材料 --- plugins/douyu/danmu_summary.py | 305 ++++++++++++++++++++++++++++++++- plugins/douyu/main.py | 94 +++++++++- 2 files changed, 393 insertions(+), 6 deletions(-) diff --git a/plugins/douyu/danmu_summary.py b/plugins/douyu/danmu_summary.py index 9a10e32..01a7f8f 100644 --- a/plugins/douyu/danmu_summary.py +++ b/plugins/douyu/danmu_summary.py @@ -46,6 +46,10 @@ class DouyuDanmuSummaryHelper: TEMPLATE_MIN_REPEAT = 4 REPEAT_MIN_COUNT = 3 + # 统一使用分钟级时间,是因为日报场景里秒级时间几乎不会增加理解价值, + # 但会显著拉长 LLM 输入;保留到 `HH:MM` 能兼顾时序感和压缩率。 + COMPACT_TIME_FORMAT = "%H:%M" + @classmethod def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]: text = str(line or "").strip() @@ -225,6 +229,66 @@ class DouyuDanmuSummaryHelper: top_terms_limit=8, sample_limit=10, ), + # 这是专门给 LLM 准备的“压缩但保真”的材料层: + # 1. 用户画像从逐条弹幕里抽出来,避免昵称/牌子/等级在每条消息里重复出现; + # 2. 时间统一压到日期 + 时分,减少无意义的秒级噪音; + # 3. 主体按时间线块组织,更像“现场事件流”,比中文分词更适合日报生成。 + "compact_prompt_assets": cls.build_compact_prompt_assets( + organized_messages, + bucket_minutes=bucket_minutes, + ), + } + + @classmethod + def build_compact_prompt_assets( + cls, + messages: List[Dict[str, Any]], + *, + bucket_minutes: int = 5, + speaker_limit: int = 80, + timeline_limit: int = 24, + samples_per_bucket: int = 6, + cue_limit: int = 18, + ) -> Dict[str, Any]: + """ + 生成专供 LLM 使用的压缩材料。 + 设计目标: + 1. 不做中文分词,尽量保留“整句/整段弹幕”的原始信息密度; + 2. 把重复出现的用户元信息抽到索引表,降低 token 浪费; + 3. 把现场内容组织成时间线块,帮助模型理解节奏推进和集体起哄链路。 + """ + ordered_messages = sorted( + [item for item in messages if item and str(item.get("content") or "").strip()], + key=lambda item: item.get("timestamp") or datetime.min, + ) + if not ordered_messages: + return { + "speaker_index": [], + "timeline_digest": [], + "content_cues": [], + } + + speaker_index, speaker_alias_map = cls._build_speaker_index( + ordered_messages, + limit=speaker_limit, + ) + repeated_messages = cls._build_repeated_messages(ordered_messages, limit=cue_limit) + burst_terms = cls._build_burst_terms(ordered_messages) + return { + "speaker_index": speaker_index, + "timeline_digest": cls._build_timeline_digest( + ordered_messages, + speaker_alias_map, + bucket_minutes=bucket_minutes, + limit=timeline_limit, + samples_per_bucket=samples_per_bucket, + ), + "content_cues": cls._build_content_cues( + ordered_messages, + repeated_messages=repeated_messages, + burst_terms=burst_terms, + limit=cue_limit, + ), } @staticmethod @@ -663,6 +727,10 @@ class DouyuDanmuSummaryHelper: }), "first_time": str(first.get("timestamp_text") or ""), "last_time": str(items[-1].get("timestamp_text") or ""), + "first_date": cls._format_date(first.get("timestamp")), + "first_hm": cls._format_hm(first.get("timestamp")), + "last_date": cls._format_date(items[-1].get("timestamp")), + "last_hm": cls._format_hm(items[-1].get("timestamp")), }) repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True) return repeated_messages[:limit] @@ -729,20 +797,26 @@ class DouyuDanmuSummaryHelper: @classmethod def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]: buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + bucket_dt_map: Dict[str, datetime] = {} for item in messages: ts = item.get("timestamp") if not isinstance(ts, datetime): continue bucket_minute = (ts.minute // minutes) * minutes bucket_key = ts.replace(minute=bucket_minute, second=0) - buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item) + bucket_text = bucket_key.strftime("%Y-%m-%d %H:%M:%S") + bucket_dt_map[bucket_text] = bucket_key + buckets[bucket_text].append(item) results: List[Dict[str, Any]] = [] for bucket_start, items in sorted(buckets.items()): + bucket_dt = bucket_dt_map.get(bucket_start) top_terms = cls._extract_top_terms(items, limit=8) burst_terms = cls._build_burst_terms(items)[:5] results.append({ "start_time": bucket_start, + "date": cls._format_date(bucket_dt), + "start_hm": cls._format_hm(bucket_dt), "message_count": len(items), "user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}), "top_terms": top_terms, @@ -765,6 +839,8 @@ class DouyuDanmuSummaryHelper: continue selected.append({ "time": str(item.get("timestamp_text") or ""), + "date": DouyuDanmuSummaryHelper._format_date(item.get("timestamp")), + "hm": DouyuDanmuSummaryHelper._format_hm(item.get("timestamp")), "nickname": str(item.get("nickname") or ""), "content": content[:80], }) @@ -781,6 +857,8 @@ class DouyuDanmuSummaryHelper: continue selected.append({ "time": str(item.get("timestamp_text") or ""), + "date": DouyuDanmuSummaryHelper._format_date(item.get("timestamp")), + "hm": DouyuDanmuSummaryHelper._format_hm(item.get("timestamp")), "nickname": str(item.get("nickname") or ""), "content": content[:80], }) @@ -810,6 +888,8 @@ class DouyuDanmuSummaryHelper: continue selected.append({ "time": str(item.get("timestamp_text") or ""), + "date": cls._format_date(item.get("timestamp")), + "hm": cls._format_hm(item.get("timestamp")), "nickname": str(item.get("nickname") or ""), "content": content[:80], }) @@ -833,6 +913,8 @@ class DouyuDanmuSummaryHelper: continue samples.append({ "time": str(sample.get("time") or ""), + "date": str(sample.get("date") or ""), + "hm": str(sample.get("hm") or ""), "nickname": str(sample.get("nickname") or ""), "content": content, }) @@ -880,6 +962,8 @@ class DouyuDanmuSummaryHelper: seen.add(normalized) selected.append({ "time": str(item.get("timestamp_text") or ""), + "date": cls._format_date(item.get("timestamp")), + "hm": cls._format_hm(item.get("timestamp")), "nickname": str(item.get("nickname") or ""), "content": content[:90], }) @@ -936,6 +1020,225 @@ class DouyuDanmuSummaryHelper: }) return simplified + @staticmethod + def _format_date(ts: Any) -> str: + if isinstance(ts, datetime): + return ts.strftime("%Y-%m-%d") + return "" + + @classmethod + def _format_hm(cls, ts: Any) -> str: + if isinstance(ts, datetime): + return ts.strftime(cls.COMPACT_TIME_FORMAT) + return "" + + @classmethod + def _build_speaker_index( + cls, + messages: List[Dict[str, Any]], + *, + limit: int = 80, + ) -> (List[Dict[str, Any]], Dict[str, str]): + """ + 构建用户索引表。 + 这样时间线块里只保留 `speaker_id`,把 UID/牌子/等级这些重复元信息折叠到索引里, + 既节省 token,也能保留用户画像的完整性。 + """ + profiles: Dict[str, Dict[str, Any]] = {} + counts = Counter() + for item in messages: + uid = str(item.get("uid") or "").strip() + if not uid: + continue + counts[uid] += 1 + profile = profiles.setdefault(uid, { + "uid": uid, + "nickname": str(item.get("nickname") or "").strip(), + "room_level": 0, + "fans_name": "", + "fans_level": 0, + "noble_name": "", + }) + if not profile["nickname"]: + profile["nickname"] = str(item.get("nickname") or "").strip() + profile["room_level"] = max(int(profile.get("room_level", 0) or 0), int(item.get("room_level", 0) or 0)) + if int(item.get("fans_level", 0) or 0) > int(profile.get("fans_level", 0) or 0): + profile["fans_level"] = int(item.get("fans_level", 0) or 0) + if not str(profile.get("fans_name") or "").strip(): + profile["fans_name"] = str(item.get("fans_name") or "").strip() + if not str(profile.get("noble_name") or "").strip(): + profile["noble_name"] = str(item.get("noble_name") or "").strip() + + speaker_index: List[Dict[str, Any]] = [] + speaker_alias_map: Dict[str, str] = {} + for idx, (uid, count) in enumerate(counts.most_common(limit), start=1): + profile = profiles.get(uid, {}) + speaker_id = f"U{idx:02d}" + speaker_alias_map[uid] = speaker_id + speaker_index.append({ + "speaker_id": speaker_id, + "nickname": str(profile.get("nickname") or "").strip(), + # 只保留 UID 尾号,方便定位老观众/同名用户,又不会把整串 UUID 反复塞给模型。 + "uid_tail": uid[-4:] if len(uid) >= 4 else uid, + "badge_name": str(profile.get("fans_name") or "").strip(), + "badge_level": int(profile.get("fans_level", 0) or 0), + "room_level": int(profile.get("room_level", 0) or 0), + "noble_name": str(profile.get("noble_name") or "").strip(), + "message_count": count, + }) + return speaker_index, speaker_alias_map + + @classmethod + def _build_timeline_digest( + cls, + messages: List[Dict[str, Any]], + speaker_alias_map: Dict[str, str], + *, + bucket_minutes: int = 5, + limit: int = 24, + samples_per_bucket: int = 6, + ) -> List[Dict[str, Any]]: + """ + 把弹幕按时间窗口压成事件块。 + 每个块里同时保留: + 1. 热度数据; + 2. 重复刷屏的整句线索; + 3. 少量原声样本。 + 这比按词切碎更容易让模型理解“这一段到底发生了什么”。 + """ + buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + bucket_dt_map: Dict[str, datetime] = {} + for item in messages: + ts = item.get("timestamp") + if not isinstance(ts, datetime): + continue + bucket_minute = (ts.minute // bucket_minutes) * bucket_minutes + bucket_dt = ts.replace(minute=bucket_minute, second=0) + bucket_key = bucket_dt.strftime("%Y-%m-%d %H:%M:%S") + bucket_dt_map[bucket_key] = bucket_dt + buckets[bucket_key].append(item) + + timeline_blocks: List[Dict[str, Any]] = [] + for bucket_key in sorted(buckets.keys()): + bucket_messages = buckets[bucket_key] + repeated = cls._build_repeated_messages(bucket_messages, limit=3) + samples: List[Dict[str, Any]] = [] + seen_contents = set() + for item in bucket_messages: + content = str(item.get("content") or "").strip() + if not content: + continue + normalized = cls._normalize_template_text(content) + if normalized and normalized in seen_contents: + continue + if normalized: + seen_contents.add(normalized) + uid = str(item.get("uid") or "").strip() + samples.append({ + "speaker_id": speaker_alias_map.get(uid, "U00"), + "hm": cls._format_hm(item.get("timestamp")), + "content": content[:90], + }) + if len(samples) >= samples_per_bucket: + break + + bucket_dt = bucket_dt_map.get(bucket_key) + timeline_blocks.append({ + "date": cls._format_date(bucket_dt), + "start_hm": cls._format_hm(bucket_dt), + "message_count": len(bucket_messages), + "user_count": len({ + str(item.get("uid") or "") for item in bucket_messages if str(item.get("uid") or "").strip() + }), + "repeated_cues": [ + { + "text": str(item.get("text") or "").strip()[:80], + "count": int(item.get("count", 0) or 0), + "user_count": int(item.get("user_count", 0) or 0), + } + for item in repeated + if str(item.get("text") or "").strip() + ], + "samples": samples, + }) + return timeline_blocks[:limit] + + @classmethod + def _build_content_cues( + cls, + messages: List[Dict[str, Any]], + *, + repeated_messages: List[Dict[str, Any]], + burst_terms: List[Dict[str, Any]], + limit: int = 18, + ) -> List[Dict[str, Any]]: + """ + 生成不依赖中文分词的高频内容线索。 + 规则: + 1. 优先保留整句复读内容; + 2. 短促情绪词单独保留为 burst; + 3. 对短句高频原话做补充,不把中文切碎成词。 + """ + cues: List[Dict[str, Any]] = [] + seen = set() + + def push(kind: str, text: str, count: int, user_count: int = 0) -> None: + value = str(text or "").strip() + if not value: + return + normalized = cls._normalize_template_text(value) + if not normalized or normalized in seen: + return + seen.add(normalized) + cues.append({ + "kind": kind, + "text": value[:90], + "count": int(count or 0), + "user_count": int(user_count or 0), + }) + + for item in repeated_messages: + push( + "repeat", + str(item.get("text") or ""), + int(item.get("count", 0) or 0), + int(item.get("user_count", 0) or 0), + ) + + short_message_counter = Counter() + short_message_users: Dict[str, Set[str]] = defaultdict(set) + short_message_text_map: Dict[str, str] = {} + for item in messages: + content = str(item.get("content") or "").strip() + if not content or len(content) > 16 or cls._looks_like_pure_punctuation(content): + continue + normalized = cls._normalize_template_text(content) + if not normalized: + continue + short_message_counter[normalized] += 1 + short_message_users[normalized].add(str(item.get("uid") or "").strip()) + short_message_text_map.setdefault(normalized, content) + for normalized, count in short_message_counter.most_common(limit): + if count < 2: + continue + push( + "short_repeat", + short_message_text_map.get(normalized, normalized), + count, + len([uid for uid in short_message_users.get(normalized, set()) if uid]), + ) + + for item in burst_terms: + push( + "burst", + str(item.get("text") or ""), + int(item.get("count", 0) or 0), + int(item.get("user_count", 0) or 0), + ) + + cues.sort(key=lambda item: (int(item.get("count", 0) or 0), int(item.get("user_count", 0) or 0)), reverse=True) + return cues[:limit] + @staticmethod def _looks_like_pure_punctuation(content: str) -> bool: text = str(content or "").strip() diff --git a/plugins/douyu/main.py b/plugins/douyu/main.py index fbb5f20..401d044 100644 --- a/plugins/douyu/main.py +++ b/plugins/douyu/main.py @@ -2043,6 +2043,7 @@ class DouyuPlugin(MessagePluginInterface): return None session_payloads: List[Dict[str, Any]] = [] + all_messages: List[Dict[str, Any]] = [] total_message_count = 0 total_noise_filtered_count = 0 total_organized_message_count = 0 @@ -2077,6 +2078,9 @@ class DouyuPlugin(MessagePluginInterface): messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) if len(messages) < self._daily_report_min_messages: continue + # 这里额外保留一份“日报级全量消息集合”, + # 后面统一做面向 LLM 的压缩,避免按 session 先压一次、汇总时再拼回去导致结构发散。 + all_messages.extend(messages) payload = DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages) session_payloads.append(payload) meta = payload.get("session_meta", {}) or {} @@ -2169,6 +2173,14 @@ class DouyuPlugin(MessagePluginInterface): os.makedirs(artifact_dir, exist_ok=True) audience_trend = self._build_audience_trend(sessions) room_context = self._build_room_semantic_context(room_id, nickname, room_name, sessions) + llm_compact = DouyuDanmuSummaryHelper.build_compact_prompt_assets( + all_messages, + bucket_minutes=5, + speaker_limit=80, + timeline_limit=24, + samples_per_bucket=6, + cue_limit=18, + ) payload = { "report_meta": { "room_id": room_id, @@ -2227,6 +2239,11 @@ class DouyuPlugin(MessagePluginInterface): "chronological_samples": chronological_samples[:40], # 多场直播时保留每一场的轻量故事线,让粉丝日报更容易写出真正的“回放感”。 "session_storylines": session_storylines[:6], + # 专供 LLM 的压缩材料: + # 1. speaker_index 把 UID/牌子/等级从逐条消息里抽离; + # 2. timeline_digest 用“时间块 + 原句样本 + 复读线索”还原现场; + # 3. content_cues 不走中文分词,尽量保留整句高频信息。 + "llm_compact": llm_compact, } artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json") with open(artifact_path, "w", encoding="utf-8") as f: @@ -2251,7 +2268,8 @@ class DouyuPlugin(MessagePluginInterface): "5. 单独列出 2-3 个热点时段。\n" "6. 整体控制在 600 字以内。\n\n" f"{room_context_prompt}" - "下面是已经提纯给 LLM 的材料,优先依据现场弹幕片段、热点窗口和共识梗来写,不要被大段统计信息带偏。\n" + "下面是已经提纯给 LLM 的材料,其中 `compact_scene_material` 是主阅读区:\n" + "请优先依据其中的用户索引、时间线块、整句复读线索和原声样本来写,不要被大段统计信息带偏。\n" f"材料如下:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt @@ -2276,7 +2294,8 @@ class DouyuPlugin(MessagePluginInterface): f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n" f"日期:{meta.get('anchor_day', '')}\n" f"{room_context_prompt}" - "下面是已经提纯给 LLM 的现场材料,请优先使用原声弹幕、热点窗口和复读梗,不要写成词频复述。\n" + "下面是已经提纯给 LLM 的现场材料,请优先阅读 `compact_scene_material` 中的时间线块、整句复读线索和原声样本," + "不要写成词频复述。\n" f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt @@ -2308,7 +2327,8 @@ class DouyuPlugin(MessagePluginInterface): f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n" f"日期:{meta.get('anchor_day', '')}\n" f"{room_context_prompt}" - "下面是已经提纯给 LLM 的现场材料,请优先抓原声弹幕、热点窗口和集体起哄片段,少写空泛概括。\n" + "下面是已经提纯给 LLM 的现场材料,请优先抓 `compact_scene_material` 里的原声弹幕、时间线块和集体起哄片段," + "少写空泛概括。\n" f"材料:\n{json.dumps(prompt_material, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt @@ -2338,6 +2358,10 @@ class DouyuPlugin(MessagePluginInterface): peak_buckets = payload.get("peak_buckets", []) or [] chronological_samples = payload.get("chronological_samples", []) or [] session_storylines = payload.get("session_storylines", []) or [] + llm_compact = payload.get("llm_compact", {}) or {} + speaker_index = llm_compact.get("speaker_index", []) or [] + timeline_digest = llm_compact.get("timeline_digest", []) or [] + content_cues = llm_compact.get("content_cues", []) or [] material: Dict[str, Any] = { "report_meta": { @@ -2358,6 +2382,64 @@ class DouyuPlugin(MessagePluginInterface): "storyline_keywords": self._normalize_text_list(room_context.get("storyline_keywords"))[:10], "style_hints": self._normalize_text_list(room_context.get("style_hints"))[:6], }, + # 这是新的主材料层,优先级高于传统的 top_terms: + # 1. speaker_index 负责承接用户画像,避免在每条样本里重复塞 UUID/牌子/等级; + # 2. timeline_digest 让模型按时间推进理解“哪一段开始起哄、哪一段反复刷屏”; + # 3. content_cues 保留整句/短句级复读内容,不再依赖中文切词。 + "compact_scene_material": { + "speaker_index": [ + { + "speaker_id": str(item.get("speaker_id") or "").strip(), + "nickname": str(item.get("nickname") or "").strip(), + "uid_tail": str(item.get("uid_tail") or "").strip(), + "badge_name": str(item.get("badge_name") or "").strip(), + "badge_level": int(item.get("badge_level", 0) or 0), + "room_level": int(item.get("room_level", 0) or 0), + "noble_name": str(item.get("noble_name") or "").strip(), + "message_count": int(item.get("message_count", 0) or 0), + } + for item in speaker_index[:40] + if str(item.get("speaker_id") or "").strip() + ], + "content_cues": [ + { + "kind": str(item.get("kind") or "").strip(), + "text": str(item.get("text") or "").strip()[:90], + "count": int(item.get("count", 0) or 0), + "user_count": int(item.get("user_count", 0) or 0), + } + for item in content_cues[:18] + if str(item.get("text") or "").strip() + ], + "timeline_digest": [ + { + "date": str(item.get("date") or "").strip(), + "start_hm": str(item.get("start_hm") or "").strip(), + "message_count": int(item.get("message_count", 0) or 0), + "user_count": int(item.get("user_count", 0) or 0), + "repeated_cues": [ + { + "text": str(cue.get("text") or "").strip()[:80], + "count": int(cue.get("count", 0) or 0), + "user_count": int(cue.get("user_count", 0) or 0), + } + for cue in (item.get("repeated_cues", []) or [])[:3] + if str(cue.get("text") or "").strip() + ], + "samples": [ + { + "speaker_id": str(sample.get("speaker_id") or "").strip(), + "hm": str(sample.get("hm") or "").strip(), + "content": str(sample.get("content") or "").strip()[:90], + } + for sample in (item.get("samples", []) or [])[:6] + if str(sample.get("content") or "").strip() + ], + } + for item in timeline_digest[:20] + if (item.get("samples") or item.get("repeated_cues")) + ], + }, "session_overview": [ { "session_id": str(item.get("session_id") or "").strip(), @@ -2368,9 +2450,11 @@ class DouyuPlugin(MessagePluginInterface): for item in sessions[:4] ], "high_frequency_topics": { - "top_terms": [ + # 这里刻意不再把中文分词结果作为主字段喂给 LLM, + # 避免模型把碎词误当成主线;真正的内容理解优先走 compact_scene_material。 + "top_terms_legacy": [ {"term": str(item.get("term") or "").strip(), "count": int(item.get("count", 0) or 0)} - for item in top_terms[:16] + for item in top_terms[:8] if str(item.get("term") or "").strip() ], "burst_terms": [