From 6d33081e03cc4fd3e7136f99990fd69ebd3a0f3d Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 29 Apr 2026 13:19:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=97=E9=B1=BC=E7=B2=89?= =?UTF-8?q?=E4=B8=9D=E6=97=A5=E6=8A=A5=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E4=B8=8E=E5=BC=B9=E5=B9=95=E7=B4=A0=E6=9D=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 将斗鱼粉丝日报接入插件定时任务体系,支持后台独立启停与调度执行\n2. 为粉丝日报新增按群去重发送标记,避免任务补偿或多群订阅时重复推送\n3. 扩充日报传给LLM的弹幕材料,补充顺时序现场样本与场次故事线,提升语境完整度\n4. 提升斗鱼日报缓存版本,确保新链路生成结果不复用旧缓存 --- plugins/douyu/danmu_summary.py | 89 ++++++++++++++++++ plugins/douyu/main.py | 166 +++++++++++++++++++++++++++++---- 2 files changed, 236 insertions(+), 19 deletions(-) diff --git a/plugins/douyu/danmu_summary.py b/plugins/douyu/danmu_summary.py index 54fc129..9a10e32 100644 --- a/plugins/douyu/danmu_summary.py +++ b/plugins/douyu/danmu_summary.py @@ -213,6 +213,18 @@ class DouyuDanmuSummaryHelper: "peak_buckets": cls._simplify_peak_buckets(peak_buckets), "representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats), "raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=8), + # 给日报类 LLM 再补一层“按时间推进的现场切片”。 + # 这样模型除了看热点窗口,还能顺着时间线理解气氛如何起、如何变、最后怎么收, + # 对粉丝日报这类强调“节目效果”和“接梗链路”的文本尤其有帮助。 + "chronological_samples": cls._build_chronological_samples(organized_messages, limit=20), + # 每个 session 单独给一个轻量摘要,避免多场直播合并后, + # 模型只看到全局热点而丢失“第一场在聊什么、第二场为什么突然转节奏”的信息。 + "session_storyline": cls._build_session_storyline( + organized_messages, + bucket_stats, + top_terms_limit=8, + sample_limit=10, + ), } @staticmethod @@ -834,6 +846,83 @@ class DouyuDanmuSummaryHelper: }) return windows + @classmethod + def _build_chronological_samples( + cls, + messages: List[Dict[str, Any]], + limit: int = 20, + ) -> List[Dict[str, str]]: + """ + 从整场弹幕里按时间均匀抽取样本。 + 设计目的: + 1. 热点窗口只能解释“最炸的几分钟”,但日报还需要理解整体节奏; + 2. 顺时序样本能帮助 LLM 看到开场铺垫、中段起哄、尾段收束; + 3. 对粉丝日报来说,这比单纯词频更容易还原“今天到底经历了什么”。 + """ + if not messages: + return [] + + indexes = { + int(round((len(messages) - 1) * idx / max(limit - 1, 1))) + for idx in range(min(limit, len(messages))) + } + selected: List[Dict[str, str]] = [] + seen = set() + for idx in sorted(indexes): + item = messages[idx] + content = str(item.get("content") or "").strip() + if not content: + continue + normalized = cls._normalize_template_text(content) + if normalized and normalized in seen: + continue + if normalized: + seen.add(normalized) + selected.append({ + "time": str(item.get("timestamp_text") or ""), + "nickname": str(item.get("nickname") or ""), + "content": content[:90], + }) + if len(selected) >= limit: + break + return selected + + @classmethod + def _build_session_storyline( + cls, + messages: List[Dict[str, Any]], + bucket_stats: List[Dict[str, Any]], + *, + top_terms_limit: int = 8, + sample_limit: int = 10, + ) -> Dict[str, Any]: + """ + 组装单场直播的轻量叙事骨架。 + 这里不追求大而全,而是给模型一个“这场直播从头到尾在发生什么”的概览, + 让它在写日报时更容易把梗、情绪和时间顺序串起来。 + """ + first_message = messages[0] if messages else {} + last_message = messages[-1] if messages else {} + hottest_bucket = max( + bucket_stats, + key=lambda item: int(item.get("message_count", 0) or 0), + default={}, + ) + return { + "start_time": str(first_message.get("timestamp_text") or ""), + "end_time": str(last_message.get("timestamp_text") or ""), + "top_terms": cls._extract_top_terms(messages, limit=top_terms_limit), + "burst_terms": cls._build_burst_terms(messages)[:6], + "chronological_samples": cls._build_chronological_samples(messages, limit=sample_limit), + "hottest_moment": { + "start_time": str(hottest_bucket.get("start_time") or ""), + "message_count": int(hottest_bucket.get("message_count", 0) or 0), + "user_count": int(hottest_bucket.get("user_count", 0) or 0), + "top_terms": hottest_bucket.get("top_terms", [])[:6], + "sample_messages": hottest_bucket.get("sample_messages", [])[:6], + }, + } + @staticmethod def _simplify_peak_buckets(buckets: List[Dict[str, Any]]) -> List[Dict[str, Any]]: simplified = [] diff --git a/plugins/douyu/main.py b/plugins/douyu/main.py index 5b54d6d..fbb5f20 100644 --- a/plugins/douyu/main.py +++ b/plugins/douyu/main.py @@ -514,11 +514,12 @@ class DouyuRedisManager: class DouyuPlugin(MessagePluginInterface): # 报告缓存版本号: # 1. 版本升级后会自动让历史缓存失效,避免继续复用旧文本/旧图片; - # 2. 本次将版本提升到 9: - # - LLM 输入材料从“整份大 payload”改成“提纯后的现场材料”; - # - 同时统一清洗 / reasoning 输出; + # 2. 本次将版本提升到 10: + # - 新增粉丝日报定时任务链路; + # - LLM 输入材料再补充顺时序现场切片与场次故事线; + # - 同时让新日报结果自动避开旧缓存污染; # 因此需要刷新旧缓存,确保新版结果真实命中新链路。 - _DAILY_REPORT_CACHE_VERSION = 9 + _DAILY_REPORT_CACHE_VERSION = 10 FEATURE_KEY = "DOUYU_MONITOR" FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]" @@ -1025,7 +1026,8 @@ class DouyuPlugin(MessagePluginInterface): 设计说明: 1. 斗鱼“每日报告”迁移到插件任务配置体系,支持在后台可视化启停/改时; 2. 触发时间直接复用配置项 daily_report_time,避免出现“两套时间配置”; - 3. 作用域默认 all_enabled_groups,让插件调度系统按群权限先过滤目标群。 + 3. 粉丝日报和运营日报都走同一套数据库调度体系,后台可以分别启停; + 4. 作用域默认 all_enabled_groups,让插件调度系统按群权限先过滤目标群。 """ trigger_time = str(self._daily_report_time or "09:30").strip() or "09:30" return [ @@ -1039,12 +1041,24 @@ class DouyuPlugin(MessagePluginInterface): "target_config": {}, "payload": {}, "default_enabled": bool(self._daily_report_enable), + }, + { + "action_key": "douyu_fans_daily_report_push", + "name": "斗鱼粉丝日报推送", + "description": "按配置时间推送前一天斗鱼粉丝日报", + "trigger_type": "at_times", + "trigger_config": {"time_list": [trigger_time]}, + "target_scope": "all_enabled_groups", + "target_config": {}, + # 定时任务默认开启已发送保护,避免重载补偿或手动补跑时重复刷同一天内容。 + "payload": {"respect_sent_flag": True}, + "default_enabled": bool(self._daily_report_enable), } ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行插件调度动作。""" - if action_key != "douyu_daily_report_push": + if action_key not in {"douyu_daily_report_push", "douyu_fans_daily_report_push"}: return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}} # 调度器注入 bot,保证定时任务也能发消息。 @@ -1066,29 +1080,41 @@ class DouyuPlugin(MessagePluginInterface): if not target_groups: target_groups = GroupBotManager.get_group_list() + is_fans_report = action_key == "douyu_fans_daily_report_push" delivered_groups: List[str] = [] failed_groups: Dict[str, str] = {} for gid in target_groups: try: # 按群推送:内部会再基于斗鱼订阅与插件权限做二次过滤。 - delivered = await self._send_daily_reports( - anchor_day=anchor_day, - target_group_id=gid, - force=force, - force_regenerate=force_regenerate, - ) + if is_fans_report: + delivered = await self._send_fans_daily_reports( + anchor_day=anchor_day, + target_group_id=gid, + force_regenerate=force_regenerate, + force=force, + respect_sent_flag=bool(payload.get("respect_sent_flag", True)), + ) + else: + delivered = await self._send_daily_reports( + anchor_day=anchor_day, + target_group_id=gid, + force=force, + force_regenerate=force_regenerate, + ) if delivered: delivered_groups.append(gid) except Exception as e: failed_groups[gid] = self._format_exception(e) + report_label = "斗鱼粉丝日报" if is_fans_report else "斗鱼日报" return { "success": len(failed_groups) == 0, "summary": ( - f"斗鱼日报任务完成: 日期{anchor_day}, 目标群{len(target_groups)}个, " + f"{report_label}任务完成: 日期{anchor_day}, 目标群{len(target_groups)}个, " f"成功发送群{len(delivered_groups)}个, 失败群{len(failed_groups)}个" ), "detail": { + "action_key": action_key, "anchor_day": anchor_day, "force": force, "force_regenerate": force_regenerate, @@ -1796,9 +1822,31 @@ class DouyuPlugin(MessagePluginInterface): def _daily_report_job_key(self, day_key: str) -> str: return f"{self.redis_manager.prefix}daily_report_job:{day_key}" - def _daily_report_room_key(self, room_id: str, anchor_day: str) -> str: + def _daily_report_room_key(self, room_id: str, anchor_day: str, group_id: Optional[str] = None) -> str: + """ + 日报发送标记支持按群粒度区分。 + 兼容说明: + 1. 不传 group_id 时,沿用“房间 + 日期”粒度,适合一次性向所有订阅群统一推送; + 2. 传入 group_id 时,改成“房间 + 日期 + 群”粒度,适合插件调度逐群执行; + 3. 这样可以避免同一房间被多个群订阅时,前一个群的发送记录误伤后一个群。 + """ + if str(group_id or "").strip(): + return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}:{str(group_id).strip()}" return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}" + def _fans_daily_report_room_key(self, room_id: str, anchor_day: str, group_id: Optional[str] = None) -> str: + """ + 粉丝日报使用独立发送标记。 + 这样做的原因: + 1. 不和运营版日报共用去重状态,避免两种内容互相影响; + 2. 定时任务补偿/重载时可以避免同一天重复发粉丝日报; + 3. 同样支持按群粒度区分,避免多群订阅同房间时互相误伤; + 4. 手工命令仍可选择无视这个标记,保留“重复召回”的灵活性。 + """ + if str(group_id or "").strip(): + return f"{self.redis_manager.prefix}fans_daily_report:{room_id}:{anchor_day}:{str(group_id).strip()}" + return f"{self.redis_manager.prefix}fans_daily_report:{room_id}:{anchor_day}" + @staticmethod def _daily_report_cache_dir() -> str: path = os.path.join("temp", "douyu_materials") @@ -2004,6 +2052,8 @@ class DouyuPlugin(MessagePluginInterface): peak_buckets: List[Dict[str, Any]] = [] representative_messages: List[Dict[str, Any]] = [] raw_window_samples: List[Dict[str, Any]] = [] + chronological_samples: List[Dict[str, Any]] = [] + session_storylines: List[Dict[str, Any]] = [] top_terms_counter = Counter() burst_terms_counter = Counter() operator_totals = { @@ -2049,6 +2099,11 @@ class DouyuPlugin(MessagePluginInterface): representative_messages.append(dict(item)) for item in payload.get("raw_window_samples", []) or []: raw_window_samples.append(dict(item)) + for item in payload.get("chronological_samples", []) or []: + chronological_samples.append(dict(item)) + storyline = payload.get("session_storyline") or {} + if storyline: + session_storylines.append(dict(storyline)) for item in payload.get("top_terms", []) or []: term = str(item.get("term") or "").strip() if term: @@ -2107,6 +2162,8 @@ class DouyuPlugin(MessagePluginInterface): merged_templates.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) repeated_messages.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) peak_buckets.sort(key=lambda item: int(item.get("message_count", 0) or 0), reverse=True) + chronological_samples.sort(key=lambda item: str(item.get("time") or "")) + session_storylines.sort(key=lambda item: str(item.get("start_time") or "")) artifact_dir = os.path.join("temp", "douyu_materials") os.makedirs(artifact_dir, exist_ok=True) @@ -2166,6 +2223,10 @@ class DouyuPlugin(MessagePluginInterface): "peak_buckets": peak_buckets[:10], "representative_messages": representative_messages[:24], "raw_window_samples": raw_window_samples[:10], + # 顺时序样本用于补足“从开播到收尾”的完整语境,避免 LLM 只看到零散热点。 + "chronological_samples": chronological_samples[:40], + # 多场直播时保留每一场的轻量故事线,让粉丝日报更容易写出真正的“回放感”。 + "session_storylines": session_storylines[:6], } artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json") with open(artifact_path, "w", encoding="utf-8") as f: @@ -2275,6 +2336,8 @@ class DouyuPlugin(MessagePluginInterface): top_terms = payload.get("top_terms", []) or [] burst_terms = payload.get("burst_terms", []) or [] peak_buckets = payload.get("peak_buckets", []) or [] + chronological_samples = payload.get("chronological_samples", []) or [] + session_storylines = payload.get("session_storylines", []) or [] material: Dict[str, Any] = { "report_meta": { @@ -2367,6 +2430,57 @@ class DouyuPlugin(MessagePluginInterface): } for index, window in enumerate(raw_window_samples[:8]) ], + "chronological_samples": [ + { + "time": str(item.get("time") or "").strip(), + "nickname": str(item.get("nickname") or "").strip(), + "content": str(item.get("content") or "").strip()[:90], + } + for item in chronological_samples[:24] + if str(item.get("content") or "").strip() + ], + "session_storylines": [ + { + "start_time": str(item.get("start_time") or "").strip(), + "end_time": str(item.get("end_time") or "").strip(), + "top_terms": [ + { + "term": str(term.get("term") or "").strip(), + "count": int(term.get("count", 0) or 0), + } + for term in (item.get("top_terms", []) or [])[:8] + if str(term.get("term") or "").strip() + ], + "burst_terms": [ + { + "text": str(term.get("text") or "").strip(), + "count": int(term.get("count", 0) or 0), + } + for term in (item.get("burst_terms", []) or [])[:6] + if str(term.get("text") or "").strip() + ], + "hottest_moment": { + "start_time": str(((item.get("hottest_moment") or {}).get("start_time")) or "").strip(), + "message_count": int(((item.get("hottest_moment") or {}).get("message_count", 0)) or 0), + "user_count": int(((item.get("hottest_moment") or {}).get("user_count", 0)) or 0), + "top_terms": [ + str(term.get("term") or "").strip() + for term in (((item.get("hottest_moment") or {}).get("top_terms", [])) or [])[:6] + if str(term.get("term") or "").strip() + ], + }, + "chronological_samples": [ + { + "time": str(sample.get("time") or "").strip(), + "nickname": str(sample.get("nickname") or "").strip(), + "content": str(sample.get("content") or "").strip()[:90], + } + for sample in (item.get("chronological_samples", []) or [])[:8] + if str(sample.get("content") or "").strip() + ], + } + for item in session_storylines[:4] + ], }, } @@ -3423,7 +3537,8 @@ class DouyuPlugin(MessagePluginInterface): return False delivered_any = False for room_id in rooms: - if not force and self.redis_manager.get_text_value(self._daily_report_room_key(room_id, anchor_day)): + sent_key = self._daily_report_room_key(room_id, anchor_day, group_id=target_group_id) + if not force and self.redis_manager.get_text_value(sent_key): logger.info(f"斗鱼每日报告已发送过,跳过: room={room_id}, day={anchor_day}") continue sessions = self._load_sessions_for_anchor_day(room_id, anchor_day) @@ -3478,7 +3593,7 @@ class DouyuPlugin(MessagePluginInterface): logger.error(f"发送斗鱼每日报告失败(room={room_id}, group={gid}): {e}") if delivered: self.redis_manager.set_text_value( - self._daily_report_room_key(room_id, anchor_day), + sent_key, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) return delivered_any @@ -3489,12 +3604,15 @@ class DouyuPlugin(MessagePluginInterface): target_group_id: Optional[str] = None, *, force_regenerate: bool = False, + force: bool = False, + respect_sent_flag: bool = False, ) -> bool: """ 发送粉丝向恶搞日报。 - 当前刻意不复用“已发送标记”: - 1. 它不是原有定时任务的一部分,默认按手动召回理解; - 2. 群里想反复看不同版本文案时,不会被“今天已经发过”拦住。 + 发送策略说明: + 1. 手工命令默认仍允许重复召回,方便群里反复看不同版本; + 2. 接入定时任务后,可通过 respect_sent_flag 开启“同房间同日期”去重; + 3. force 只表示忽略已发送标记,不改变 force_regenerate 的重生成语义。 """ rooms = ( set(self.redis_manager.list_group_rooms(target_group_id)) @@ -3509,6 +3627,11 @@ class DouyuPlugin(MessagePluginInterface): delivered_any = False for room_id in rooms: + sent_key = self._fans_daily_report_room_key(room_id, anchor_day, group_id=target_group_id) + if respect_sent_flag and not force and self.redis_manager.get_text_value( + sent_key + ): + continue sessions = self._load_sessions_for_anchor_day(room_id, anchor_day) if not sessions: logger.info(f"斗鱼粉丝日报无 session: room={room_id}, day={anchor_day}") @@ -3554,6 +3677,11 @@ class DouyuPlugin(MessagePluginInterface): else: await self.bot.send_text_message(gid, report_text) delivered_any = True + if respect_sent_flag: + self.redis_manager.set_text_value( + sent_key, + datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + ) except Exception as e: logger.error(f"发送斗鱼粉丝日报失败(room={room_id}, group={gid}): {e}") return delivered_any