import asyncio import json from collections import Counter from datetime import datetime, timedelta import os from pathlib import Path import threading import time from typing import Dict, Any, List, Optional, Tuple, Set import aiohttp from loguru import logger import ssl import zlib try: import websocket except ImportError: websocket = None from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from utils.ai.unified_llm import UnifiedLLMClient from plugins.douyu.danmu_summary import DouyuDanmuSummaryHelper from plugins.douyu.report_template import render_daily_report_html, render_fans_daily_report_html from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML class DouyuDanmuRecorder: def __init__(self, room_id: str, user_agent: str, stats_callback=None, stats_sample_interval_seconds: int = 60): self.room_id = room_id self.user_agent = user_agent self.stats_callback = stats_callback self.stats_sample_interval_seconds = max(0, int(stats_sample_interval_seconds or 0)) self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._ws: Optional[websocket.WebSocketApp] = None self._buffer: List[str] = [] self._buffer_limit = 10 self._buffer_date: Optional[str] = None self._lock = threading.Lock() self._websocket_available = websocket is not None self._latest_vip_count: Optional[int] = None self._latest_diamond_count: Optional[int] = None self._last_stats_signature: Tuple[Optional[int], Optional[int]] = (None, None) self._connect_retry_count = 3 self._connect_retry_delay_seconds = 1 def _encode(self, msg: str) -> bytes: content = msg.encode("utf-8") + b"\x00" length = len(content) + 8 head = length.to_bytes(4, "little") * 2 head += (689).to_bytes(2, "little") head += b"\x00\x00" return head + content @staticmethod def _parse_parts(line: str) -> Dict[str, Any]: parts: Dict[str, Any] = {} for pair in line.split("/"): if "@=" in pair: key, value = pair.split("@=", 1) parts[key] = value return parts @staticmethod def _safe_int(value: Any, default: Optional[int] = None) -> Optional[int]: try: return int(str(value)) except Exception: return default def _maybe_emit_stats(self, force: bool = False) -> None: if not self.stats_callback: return if self._latest_vip_count is None and self._latest_diamond_count is None: return signature = (self._latest_vip_count, self._latest_diamond_count) if not force: if signature == self._last_stats_signature: return point = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "vip_count": self._latest_vip_count, "diamond_count": self._latest_diamond_count, } try: self.stats_callback(self.room_id, point) self._last_stats_signature = signature except Exception as e: logger.warning(f"斗鱼人数采样回调失败({self.room_id}): {e}") def _on_message(self, ws, message): try: decompressed = zlib.decompress(message, -zlib.MAX_WBITS) data = decompressed.decode("utf-8", errors="ignore") except Exception: data = message.decode("utf-8", errors="ignore") for line in data.split("\x00"): line = line.strip() if not line: continue parts = self._parse_parts(line) msg_type = str(parts.get("type") or "").strip() if msg_type == "oni": vip_count = self._safe_int(parts.get("vn")) if vip_count is not None: self._latest_vip_count = vip_count self._maybe_emit_stats() continue if msg_type == "dfnum": diamond_count = self._safe_int(parts.get("dfc")) if diamond_count is not None: self._latest_diamond_count = diamond_count self._maybe_emit_stats() continue if msg_type != "chatmsg": continue nick = parts.get("nn", "未知") txt = parts.get("txt", "") uid = parts.get("uid", "未知") level = parts.get("level", "0") fan_group = parts.get("bnn", "") fan_level = parts.get("bl", "0") time_stamp = parts.get("cst", "") if time_stamp: try: if time_stamp.isdigit(): ts = int(time_stamp) if ts > 10 ** 12: ts = ts / 1000 dt = datetime.fromtimestamp(ts) time_str = dt.strftime("%Y-%m-%d %H:%M:%S") else: dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") time_str = dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}" if fan_group: output += f" / {fan_group} Lv{fan_level}" output += f"):{txt}" self._append_and_maybe_flush(output) def _flush_locked(self): if not self._buffer or self._buffer_date is None: return dir_path = os.path.join("temp", "douyu_danmu", self._buffer_date) os.makedirs(dir_path, exist_ok=True) file_name = os.path.join(dir_path, f"{self.room_id}_{self._buffer_date}.txt") data = "\n".join(self._buffer) + "\n" with open(file_name, "a", encoding="utf-8") as f: f.write(data) self._buffer.clear() def _append_and_maybe_flush(self, line: str): now = datetime.now() date_str = now.strftime("%Y%m%d") with self._lock: if self._buffer_date is None: self._buffer_date = date_str elif date_str != self._buffer_date: self._flush_locked() self._buffer_date = date_str self._buffer.append(line) if len(self._buffer) >= self._buffer_limit: self._flush_locked() def _flush(self): with self._lock: self._flush_locked() def _on_open(self, ws): ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/")) ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@=-9999/")) def heartbeat(): while ws.sock and ws.sock.connected and not self._stop_event.is_set(): try: ws.send(self._encode("type@=mrkl/")) except Exception: break time.sleep(38) threading.Thread(target=heartbeat, daemon=True).start() def _on_error(self, ws, error): logger.error(f"斗鱼弹幕错误({self.room_id}): {error}") def _on_close(self, ws, code, msg): logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}") def _run(self): if not self._websocket_available: logger.error(f"websocket-client 未安装,无法记录弹幕({self.room_id})") return try: websocket.enableTrace(False) ws_urls = [ "wss://danmuproxy.douyu.com:8501/", "wss://danmuproxy.douyu.com:8502/", "wss://danmuproxy.douyu.com:8503/", "wss://danmuproxy.douyu.com:8504/", "wss://danmuproxy.douyu.com:8505/", "wss://danmuproxy.douyu.com:8506/", ] sslopt = { "cert_reqs": ssl.CERT_NONE, "ssl_version": ssl.PROTOCOL_TLS_CLIENT, "ciphers": "DEFAULT@SECLEVEL=1", } headers = {"User-Agent": self.user_agent} for url in ws_urls: if self._stop_event.is_set(): break for attempt in range(1, self._connect_retry_count + 1): if self._stop_event.is_set(): break reconnect_needed = False try: self._ws = websocket.WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, header=headers, ) self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) if self._stop_event.is_set(): break reconnect_needed = True except Exception as e: if attempt < self._connect_retry_count: logger.warning( f"斗鱼弹幕连接失败({self.room_id}),第{attempt}/{self._connect_retry_count}次重试: " f"url={url} err={e}" ) time.sleep(self._connect_retry_delay_seconds) continue logger.error( f"斗鱼弹幕连接失败({self.room_id}),已重试{self._connect_retry_count}次: " f"url={url} err={e}" ) finally: self._ws = None if reconnect_needed and attempt < self._connect_retry_count: logger.warning( f"斗鱼弹幕连接中断({self.room_id}),第{attempt}/{self._connect_retry_count}次重试: url={url}" ) time.sleep(self._connect_retry_delay_seconds) continue if reconnect_needed and attempt >= self._connect_retry_count: logger.error( f"斗鱼弹幕连接中断({self.room_id}),已重试{self._connect_retry_count}次: url={url}" ) break if self._stop_event.is_set(): break time.sleep(self._connect_retry_delay_seconds) finally: self._ws = None def start(self): if self._thread and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._maybe_emit_stats(force=True) self._flush() self._stop_event.set() if self._ws: try: self._ws.close() except Exception: pass class DouyuRedisManager: def __init__(self, db_manager: DBConnectionManager): self.redis = db_manager.get_redis_connection() self.prefix = "bot:douyu:" def add_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.sadd(key, room_id) >= 0 def remove_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.srem(key, room_id) >= 0 def list_group_rooms(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:rooms" rooms = self.redis.smembers(key) or set() result = [] for r in rooms: result.append(r.decode("utf-8") if isinstance(r, bytes) else r) return sorted(result) def all_subscribed_rooms(self) -> Set[str]: groups = GroupBotManager.get_group_list() rooms: Set[str] = set() for gid in groups: for r in self.list_group_rooms(gid): rooms.add(r) return rooms def groups_for_room(self, room_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if room_id in set(self.list_group_rooms(gid)): res.append(gid) return res # --- 鱼吧相关方法 --- def add_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.sadd(key, hash_id) >= 0 def remove_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.srem(key, hash_id) >= 0 def list_group_yubas(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:yubas" yubas = self.redis.smembers(key) or set() result = [] for y in yubas: result.append(y.decode("utf-8") if isinstance(y, bytes) else y) return sorted(result) def all_subscribed_yubas(self) -> Set[str]: groups = GroupBotManager.get_group_list() yubas: Set[str] = set() for gid in groups: for y in self.list_group_yubas(gid): yubas.add(y) return yubas def groups_for_yuba(self, hash_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if hash_id in set(self.list_group_yubas(gid)): res.append(gid) return res def get_yuba_last_id(self, hash_id: str) -> Optional[str]: key = f"{self.prefix}yuba_last_id:{hash_id}" data = self.redis.get(key) if not data: return None return data.decode("utf-8") if isinstance(data, bytes) else data def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool: key = f"{self.prefix}yuba_last_id:{hash_id}" return self.redis.set(key, feed_id) # --- 提醒名单方法 --- def add_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.sadd(key, user_id) >= 0 def remove_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.srem(key, user_id) >= 0 def list_group_subscribers(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:subscribers" subs = self.redis.smembers(key) or set() result = [] for s in subs: result.append(s.decode("utf-8") if isinstance(s, bytes) else s) return sorted(result) def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]: key = f"{self.prefix}room_status:{room_id}" data = self.redis.get(key) if not data: return None if isinstance(data, bytes): data = data.decode("utf-8") try: return json.loads(data) except Exception: return None def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool: key = f"{self.prefix}room_status:{room_id}" return self.redis.set(key, json.dumps(status, ensure_ascii=False)) def get_room_session(self, room_id: str, session_id: str) -> Optional[Dict[str, Any]]: key = f"{self.prefix}room:{room_id}:session:{session_id}" data = self.redis.get(key) if not data: return None if isinstance(data, bytes): data = data.decode("utf-8") try: return json.loads(data) except Exception: return None def save_room_session(self, room_id: str, session: Dict[str, Any]) -> bool: session_id = str(session.get("session_id") or "").strip() if not session_id: return False payload = json.dumps(session, ensure_ascii=False) session_key = f"{self.prefix}room:{room_id}:session:{session_id}" latest_key = f"{self.prefix}room:{room_id}:latest_session" index_key = f"{self.prefix}room:{room_id}:session_ids" pipe = self.redis.pipeline() pipe.set(session_key, payload) pipe.set(latest_key, session_id) pipe.lrem(index_key, 0, session_id) pipe.lpush(index_key, session_id) pipe.ltrim(index_key, 0, 29) result = pipe.execute() return bool(result) def get_latest_room_session(self, room_id: str) -> Optional[Dict[str, Any]]: latest_key = f"{self.prefix}room:{room_id}:latest_session" session_id = self.redis.get(latest_key) if not session_id: return None if isinstance(session_id, bytes): session_id = session_id.decode("utf-8") return self.get_room_session(room_id, str(session_id)) def list_room_session_ids(self, room_id: str, limit: int = 10) -> List[str]: key = f"{self.prefix}room:{room_id}:session_ids" rows = self.redis.lrange(key, 0, max(limit - 1, 0)) or [] result = [] for row in rows: result.append(row.decode("utf-8") if isinstance(row, bytes) else str(row)) return result def get_text_value(self, key: str) -> Optional[str]: data = self.redis.get(key) if not data: return None return data.decode("utf-8") if isinstance(data, bytes) else str(data) def set_text_value(self, key: str, value: str) -> bool: return bool(self.redis.set(key, value)) class DouyuPlugin(MessagePluginInterface): # 报告缓存版本号: # 1. 版本升级后会自动让历史缓存失效,避免继续复用旧文本/旧图片; # 2. 本次将版本提升到 6,新增“粉丝向恶搞日报”的独立结果类型,并同步刷新旧缓存, # 确保上线后不会误复用旧版图片结构或旧版摘要文案。 _DAILY_REPORT_CACHE_VERSION = 6 FEATURE_KEY = "DOUYU_MONITOR" FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]" @property def name(self) -> str: return "斗鱼直播" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "斗鱼主播开播下播提醒与群订阅管理" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.bot: WechatAPIClient = None self.feature = self.register_feature() self.redis_manager: Optional[DouyuRedisManager] = None self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒", "订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表", "#斗鱼弹幕日报", "斗鱼弹幕日报", "#强制斗鱼弹幕日报", "强制斗鱼弹幕日报", "#斗鱼粉丝日报", "斗鱼粉丝日报", "#强制斗鱼粉丝日报", "强制斗鱼粉丝日报"] self._api_template = "https://www.douyu.com/betard/{room_id}" self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList" self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" self._check_interval = 5 self._session_cutoff_hour = 6 self._merge_gap_hours = 4 self._daily_report_enable = True self._daily_report_time = "10:05" self._daily_report_min_messages = 120 self._daily_report_use_llm = False self._daily_report_max_sessions = 4 self._daily_report_max_length = 1800 self._daily_report_send_image = True # Dify 入参策略: # 默认发送精简字段,避免某些 Workflow 对复杂对象输入校验严格导致 400。 # 如需在工作流中使用完整结构化 payload,可在 report_api 显式开启。 self._daily_report_include_structured_inputs = False self._audience_stats_sample_interval_seconds = 60 self._status_check_retry_count = 3 self._status_check_retry_delay_seconds = 1 self._daily_report_llm_client: Optional[UnifiedLLMClient] = None # 直播间语义画像: # 1. 允许按房间号补充“主播职业生涯、圈内关系、常见梗来源”等背景; # 2. 这些信息不会直接替代真实弹幕,只用于帮助 LLM 更准确理解圈内黑话; # 3. 当前主要用于 Dota2 这类强语境直播间,但结构保持通用。 self._room_context_profiles: Dict[str, Dict[str, Any]] = {} self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {} # 直播状态/鱼吧轮询继续保留在轻量 async_job 中,保障现网行为稳定。 async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) @staticmethod def _format_exception(exc: Exception) -> str: message = str(exc).strip() if message: return f"{type(exc).__name__}: {message}" return type(exc).__name__ @staticmethod def _parse_anchor_day_from_command(parts: List[str]) -> Tuple[bool, str]: """ 统一解析日报命令里的日期参数。 返回值说明: 1. 第一个布尔值表示日期是否合法; 2. 第二个字符串在合法时是最终日期,不合法时保留原始输入,方便上层提示用户。 """ anchor_day = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") if len(parts) < 2: return True, anchor_day day_text = parts[1].strip() try: return True, datetime.strptime(day_text, "%Y-%m-%d").strftime("%Y-%m-%d") except Exception: return False, day_text @staticmethod def _normalize_text_list(values: Any) -> List[str]: """ 将配置或接口返回的“字符串列表”统一规整成干净的 list[str]。 这样做的原因: 1. TOML 里有些字段可能写成单字符串,有些写成字符串数组; 2. 后续 prompt 拼装只关心“有序文本集合”,不希望每处都重复判空和类型判断。 """ if values is None: return [] if isinstance(values, str): value = values.strip() return [value] if value else [] if not isinstance(values, (list, tuple, set)): return [] result: List[str] = [] for item in values: value = str(item or "").strip() if value: result.append(value) return result def _extract_room_runtime_context(self, room_info: Dict[str, Any]) -> Dict[str, Any]: """ 从斗鱼房间接口里尽量多抽取“语义上下文”。 注意: 1. 斗鱼字段在不同房间或接口版本里可能并不完全一致,所以这里做多 key 兜底; 2. 就算某些字段拿不到,也保留空结构,避免后续 prompt 拼装分支过多。 """ if not isinstance(room_info, dict): room_info = {} def pick(*keys: str) -> str: for key in keys: value = str(room_info.get(key) or "").strip() if value: return value return "" tags = self._normalize_text_list( room_info.get("tag") or room_info.get("tags") or room_info.get("room_tags") or room_info.get("show_details") ) return { "primary_category": pick("cate1Name", "cate_name", "game_name", "gameCateName"), "secondary_category": pick("cate2Name", "second_lvl_name", "secondCateName", "sub_cate_name"), "game_name": pick("game_name", "gameCateName", "cate2Name", "second_lvl_name"), "tags": tags, } def _match_room_context_profile(self, room_id: str) -> Dict[str, Any]: """ 从配置中读取指定房间号的人设/圈内背景。 配置优先按 room_id 精确匹配,避免不同主播之间串用职业生涯信息。 """ if not isinstance(self._room_context_profiles, dict): return {} profile = self._room_context_profiles.get(str(room_id)) or {} return dict(profile) if isinstance(profile, dict) else {} def _build_room_semantic_context( self, room_id: str, nickname: str, room_name: str, sessions: List[Dict[str, Any]], ) -> Dict[str, Any]: """ 构建直播间语义上下文。 核心思想: 1. 先用实时房间信息判断“这是不是 Dota2/电竞强语境房间”; 2. 再叠加人工配置的主播职业生涯、圈内人物、常见梗来源; 3. 最终给 LLM 一份“理解背景”,但不替代真实弹幕证据。 """ latest_session = sessions[-1] if sessions else {} latest_runtime_context = dict(latest_session.get("room_context") or {}) latest_status_context = {} if self.redis_manager: latest_status = self.redis_manager.get_room_status(room_id) or {} latest_status_context = dict(latest_status.get("room_context") or {}) merged_runtime_context = { "primary_category": str( latest_runtime_context.get("primary_category") or latest_status_context.get("primary_category") or "" ).strip(), "secondary_category": str( latest_runtime_context.get("secondary_category") or latest_status_context.get("secondary_category") or "" ).strip(), "game_name": str( latest_runtime_context.get("game_name") or latest_status_context.get("game_name") or "" ).strip(), "tags": self._normalize_text_list( latest_runtime_context.get("tags") or latest_status_context.get("tags") or [] ), } profile = self._match_room_context_profile(room_id) category_text = " ".join([ merged_runtime_context.get("primary_category", ""), merged_runtime_context.get("secondary_category", ""), merged_runtime_context.get("game_name", ""), room_name, nickname, " ".join(merged_runtime_context.get("tags", [])), " ".join(self._normalize_text_list(profile.get("domain_keywords"))), ]).lower() inferred_domains: List[str] = [] if any(keyword in category_text for keyword in ["dota", "dota2", "刀塔", "ti", "major"]): inferred_domains.append("Dota2") if any(keyword in category_text for keyword in ["电竞", "esports", "职业", "选手"]): inferred_domains.append("电竞直播") # 如果配置明确写了 domain,则放在最前面,作为最强语义锚点。 configured_domain = str(profile.get("domain") or "").strip() if configured_domain: inferred_domains = [configured_domain] + [item for item in inferred_domains if item != configured_domain] return { "domain": configured_domain, "inferred_domains": inferred_domains, "runtime_context": merged_runtime_context, "career_background": str(profile.get("career_background") or "").strip(), "identity_summary": str(profile.get("identity_summary") or "").strip(), "related_people": self._normalize_text_list(profile.get("related_people")), "storyline_keywords": self._normalize_text_list(profile.get("storyline_keywords")), "meme_explanations": self._normalize_text_list(profile.get("meme_explanations")), "style_hints": self._normalize_text_list(profile.get("style_hints")), } def _build_room_context_prompt_block(self, payload: Dict[str, Any]) -> str: """ 将直播间语义上下文整理成一段可以直接喂给 LLM 的提示块。 目标不是要求模型“背设定”,而是提醒它: 1. 先按 Dota2 / 电竞圈语境理解黑话和人物; 2. 看到选手、主播、职业生涯梗时,优先往房间背景上靠; 3. 仍然必须以当天真实弹幕和统计材料为主,不得凭空补剧情。 """ room_context = payload.get("room_context", {}) or {} runtime_context = room_context.get("runtime_context", {}) or {} parts: List[str] = [] domains = [str(item or "").strip() for item in room_context.get("inferred_domains", []) or [] if str(item or "").strip()] if domains: parts.append(f"- 直播间领域语境:{', '.join(domains)}。若出现圈内黑话、人物简称、老梗,优先按这个语境理解。") if runtime_context.get("game_name") or runtime_context.get("secondary_category") or runtime_context.get("primary_category"): parts.append( "- 房间分区信息:" f"{runtime_context.get('primary_category') or '未知大类'} / " f"{runtime_context.get('secondary_category') or runtime_context.get('game_name') or '未知小类'}。" ) if runtime_context.get("tags"): parts.append(f"- 房间标签:{'、'.join(self._normalize_text_list(runtime_context.get('tags'))[:8])}。") if room_context.get("identity_summary"): parts.append(f"- 主播身份提示:{room_context.get('identity_summary')}。") if room_context.get("career_background"): parts.append(f"- 职业生涯背景:{room_context.get('career_background')}。") related_people = self._normalize_text_list(room_context.get("related_people")) if related_people: parts.append(f"- 重点相关人物:{'、'.join(related_people[:12])}。弹幕提到这些人时,优先考虑圈内关联。") storyline_keywords = self._normalize_text_list(room_context.get("storyline_keywords")) if storyline_keywords: parts.append(f"- 常见剧情关键词:{'、'.join(storyline_keywords[:12])}。") meme_explanations = self._normalize_text_list(room_context.get("meme_explanations")) if meme_explanations: parts.append("- 常见梗解释:") for item in meme_explanations[:6]: parts.append(f" * {item}") style_hints = self._normalize_text_list(room_context.get("style_hints")) if style_hints: parts.append(f"- 风格提示:{';'.join(style_hints[:6])}。") if not parts: return "" return "【直播间语义上下文】\n" + "\n".join(parts) + "\n\n" async def _fetch_json_with_retries(self, session: aiohttp.ClientSession, url: str, headers: Dict[str, str], context: str, params: Optional[Dict[str, Any]] = None) -> Any: last_error: Optional[Exception] = None for attempt in range(1, self._status_check_retry_count + 1): try: async with session.get( url, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=10) ) as resp: resp.raise_for_status() return await resp.json(content_type=None) except Exception as e: last_error = e if attempt < self._status_check_retry_count: logger.warning( f"{context}失败,第{attempt}/{self._status_check_retry_count}次重试: " f"{self._format_exception(e)}" ) await asyncio.sleep(self._status_check_retry_delay_seconds) continue raise if last_error: raise last_error raise RuntimeError(f"{context}失败,未获取到有效响应") async def _scheduled_unified_check_job(self): """统一检查直播和鱼吧动态""" await self._scheduled_check_job() await self._scheduled_yuba_check_job() async def _scheduled_daily_report_tick(self): """每 5 分钟检查一次,命中配置时间后发送前一天日报。""" if not self._daily_report_enable or not self.redis_manager or not self.bot: return now_dt = datetime.now() if not self._should_run_daily_report(now_dt): return anchor_day = (now_dt - timedelta(days=1)).strftime("%Y-%m-%d") try: await self._send_daily_reports(anchor_day) self.redis_manager.set_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")), now_dt.strftime("%Y-%m-%d %H:%M:%S")) except Exception as e: logger.error(f"斗鱼每日报告任务失败(anchor_day={anchor_day}): {e}") def get_schedule_actions(self) -> List[Dict[str, Any]]: """声明插件可调度动作。 设计说明: 1. 斗鱼“每日报告”迁移到插件任务配置体系,支持在后台可视化启停/改时; 2. 触发时间直接复用配置项 daily_report_time,避免出现“两套时间配置”; 3. 作用域默认 all_enabled_groups,让插件调度系统按群权限先过滤目标群。 """ trigger_time = str(self._daily_report_time or "09:30").strip() or "09:30" return [ { "action_key": "douyu_daily_report_push", "name": "斗鱼弹幕日报推送", "description": "按配置时间推送前一天斗鱼弹幕日报", "trigger_type": "at_times", "trigger_config": {"time_list": [trigger_time]}, "target_scope": "all_enabled_groups", "target_config": {}, "payload": {}, "default_enabled": bool(self._daily_report_enable), } ] async def run_scheduled_action(self, action_key: str, context: Dict[str, Any]) -> Dict[str, Any]: """执行插件调度动作。""" if action_key != "douyu_daily_report_push": return {"success": False, "summary": f"不支持动作: {action_key}", "detail": {}} # 调度器注入 bot,保证定时任务也能发消息。 self.bot = context.get("bot") or self.bot if not self._daily_report_enable: return {"success": True, "summary": "斗鱼每日报告已关闭,跳过执行", "detail": {"enabled": False}} if not self.redis_manager or not self.bot: return {"success": False, "summary": "斗鱼每日报告执行失败:依赖未就绪(redis/bot)", "detail": {}} payload = context.get("payload") or {} # 支持后台手动触发时覆盖 anchor_day,便于补发历史某天日报。 anchor_day = str(payload.get("anchor_day") or "").strip() if not anchor_day: anchor_day = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") force = bool(payload.get("force", False)) force_regenerate = bool(payload.get("force_regenerate", False)) target_groups = [str(g).strip() for g in (context.get("target_groups") or []) if str(g).strip()] if not target_groups: target_groups = GroupBotManager.get_group_list() delivered_groups: List[str] = [] failed_groups: Dict[str, str] = {} for gid in target_groups: try: # 按群推送:内部会再基于斗鱼订阅与插件权限做二次过滤。 delivered = await self._send_daily_reports( anchor_day=anchor_day, target_group_id=gid, force=force, force_regenerate=force_regenerate, ) if delivered: delivered_groups.append(gid) except Exception as e: failed_groups[gid] = self._format_exception(e) return { "success": len(failed_groups) == 0, "summary": ( f"斗鱼日报任务完成: 日期{anchor_day}, 目标群{len(target_groups)}个, " f"成功发送群{len(delivered_groups)}个, 失败群{len(failed_groups)}个" ), "detail": { "anchor_day": anchor_day, "force": force, "force_regenerate": force_regenerate, "target_groups": target_groups, "delivered_groups": delivered_groups, "failed_groups": failed_groups, }, } def initialize(self, context: Dict[str, Any]) -> bool: try: dbm = DBConnectionManager.get_instance() self.redis_manager = DouyuRedisManager(dbm) self.bot = context.get("bot", self.bot) cfg = self._config.get("Douyu", {}) cfg_cmds = cfg.get("command", []) if isinstance(cfg_cmds, list) and cfg_cmds: self._commands = list(dict.fromkeys(cfg_cmds + self._commands)) self._api_template = cfg.get("api_url_template", self._api_template) self._user_agent = cfg.get("user_agent", self._user_agent) self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval)) self._session_cutoff_hour = int(cfg.get("session_cutoff_hour", self._session_cutoff_hour)) self._merge_gap_hours = int(cfg.get("merge_gap_hours", self._merge_gap_hours)) self._daily_report_enable = bool(cfg.get("daily_report_enable", self._daily_report_enable)) self._daily_report_time = str(cfg.get("daily_report_time", self._daily_report_time) or self._daily_report_time) self._daily_report_min_messages = int( cfg.get("daily_report_min_messages", self._daily_report_min_messages) ) self._daily_report_use_llm = bool(cfg.get("daily_report_use_llm", self._daily_report_use_llm)) self._daily_report_max_sessions = int(cfg.get("daily_report_max_sessions", self._daily_report_max_sessions)) self._daily_report_max_length = int(cfg.get("daily_report_max_length", self._daily_report_max_length)) self._daily_report_send_image = bool(cfg.get("daily_report_send_image", self._daily_report_send_image)) self._audience_stats_sample_interval_seconds = int( cfg.get("audience_stats_sample_interval_seconds", self._audience_stats_sample_interval_seconds) ) raw_room_context_profiles = cfg.get("room_context_profiles", {}) or {} self._room_context_profiles = ( raw_room_context_profiles if isinstance(raw_room_context_profiles, dict) else {} ) report_api_cfg = cfg.get("report_api", {}) or {} self._daily_report_include_structured_inputs = bool( report_api_cfg.get( "include_structured_inputs", self._daily_report_include_structured_inputs, ) ) if report_api_cfg: self._daily_report_llm_client = UnifiedLLMClient(report_api_cfg) return True except Exception as e: logger.error(f"{self.name} 初始化失败: {e}") return False def start(self) -> bool: self.status = PluginStatus.RUNNING return True def stop(self) -> bool: for recorder in getattr(self, "_danmu_recorders", {}).values(): recorder.stop() self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: content = str(message.get("content", "")).strip() if not content: return False first_token = content.split()[0] return first_token in self._commands @plugin_stats_decorator(plugin_name="斗鱼直播") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") self.bot: WechatAPIClient = message.get("bot") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" first_token = content.split()[0] if content == "斗鱼订阅列表": rooms = self.redis_manager.list_group_rooms(roomid or sender) if not rooms: await self.bot.send_text_message(roomid or sender, "暂无订阅", sender) return True, "暂无订阅" text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.add_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender]) return True, "加入提醒名单成功" if ok else "加入提醒名单失败" if first_token == "取消斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.remove_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender]) return True, "取消提醒成功" if ok else "取消提醒失败" if first_token == "斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.add_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.remove_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender) return True, "取消成功" if ok else "取消失败" if content == "鱼吧订阅列表": yubas = self.redis_manager.list_group_yubas(roomid or sender) if not yubas: await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender) return True, "暂无鱼吧订阅" text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:取消订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender) return True, "取消成功" if ok else "取消失败" if first_token in {"#斗鱼弹幕日报", "斗鱼弹幕日报"}: if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" parts = content.split() ok, anchor_day = self._parse_anchor_day_from_command(parts) if not ok: await self.bot.send_text_message(roomid, "日期格式错误,请使用:#斗鱼弹幕日报 2026-04-07", sender) return True, "日期格式错误" await self.bot.send_text_message(roomid, f"⏳ 正在生成斗鱼弹幕日报:{anchor_day}", sender) # 普通手动命令也默认重生成,避免命中缓存后看起来“没有走 Dify”。 # 定时任务仍保留缓存策略,这里只影响人工触发路径。 delivered = await self._send_daily_reports( anchor_day, target_group_id=roomid, force=True, force_regenerate=True, ) if delivered: return True, f"斗鱼弹幕日报已发送:{anchor_day}" await self.bot.send_text_message(roomid, f"暂无可发送的斗鱼弹幕日报:{anchor_day}", sender) return True, "暂无日报" if first_token in {"#强制斗鱼弹幕日报", "强制斗鱼弹幕日报"}: if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" parts = content.split() ok, anchor_day = self._parse_anchor_day_from_command(parts) if not ok: await self.bot.send_text_message(roomid, "日期格式错误,请使用:#强制斗鱼弹幕日报 2026-04-07", sender) return True, "日期格式错误" # 这里明确提示“强制重生成”,便于群内区分普通日报和回归测试操作。 await self.bot.send_text_message(roomid, f"⏳ 正在强制重生成斗鱼弹幕日报:{anchor_day}", sender) delivered = await self._send_daily_reports( anchor_day, target_group_id=roomid, force=True, force_regenerate=True, ) if delivered: return True, f"斗鱼弹幕日报已强制重生成并发送:{anchor_day}" await self.bot.send_text_message(roomid, f"暂无可发送的斗鱼弹幕日报:{anchor_day}", sender) return True, "暂无日报" if first_token in {"#斗鱼粉丝日报", "斗鱼粉丝日报"}: if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" parts = content.split() ok, anchor_day = self._parse_anchor_day_from_command(parts) if not ok: await self.bot.send_text_message(roomid, "日期格式错误,请使用:#斗鱼粉丝日报 2026-04-07", sender) return True, "日期格式错误" await self.bot.send_text_message(roomid, f"🎉 正在生成斗鱼粉丝日报:{anchor_day}", sender) # 粉丝版定位为“开心图文”,手动触发时默认直接重生成, # 这样群里测试文案、模板时可以马上看到最新乐子版本。 delivered = await self._send_fans_daily_reports( anchor_day, target_group_id=roomid, force_regenerate=True, ) if delivered: return True, f"斗鱼粉丝日报已发送:{anchor_day}" await self.bot.send_text_message(roomid, f"暂无可发送的斗鱼粉丝日报:{anchor_day}", sender) return True, "暂无日报" if first_token in {"#强制斗鱼粉丝日报", "强制斗鱼粉丝日报"}: if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" parts = content.split() ok, anchor_day = self._parse_anchor_day_from_command(parts) if not ok: await self.bot.send_text_message(roomid, "日期格式错误,请使用:#强制斗鱼粉丝日报 2026-04-07", sender) return True, "日期格式错误" await self.bot.send_text_message(roomid, f"🎉 正在强制重生成斗鱼粉丝日报:{anchor_day}", sender) delivered = await self._send_fans_daily_reports( anchor_day, target_group_id=roomid, force_regenerate=True, ) if delivered: return True, f"斗鱼粉丝日报已强制重生成并发送:{anchor_day}" await self.bot.send_text_message(roomid, f"暂无可发送的斗鱼粉丝日报:{anchor_day}", sender) return True, "暂无日报" return False, None async def _scheduled_check_job(self): try: rooms = self.redis_manager.all_subscribed_rooms() if not rooms: return async with aiohttp.ClientSession() as session: for room_id in rooms: try: url = self._api_template.format(room_id=room_id) headers = { "User-Agent": self._user_agent, "Referer": f"https://www.douyu.com/{room_id}" } data = await self._fetch_json_with_retries( session, url, headers, context=f"斗鱼在线检查(room_id={room_id})" ) room_info = data.get("room", {}) if isinstance(data, dict) else {} show_status = room_info.get("show_status") nickname = room_info.get("nickname", "") room_name = room_info.get("room_name", "") room_context = self._extract_room_runtime_context(room_info) avatar = room_info.get("avatar", {}) or {} thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip() video_loop_raw = room_info.get("videoLoop", 0) try: video_loop = int(str(video_loop_raw)) except Exception: video_loop = 0 prev = self.redis_manager.get_room_status(room_id) or {} prev_live = prev.get("is_live") curr_live = True if show_status == 1 and video_loop == 0 else False status_obj = { "is_live": curr_live, "nickname": nickname, "room_name": room_name, "is_loop": True if video_loop == 1 else False, # 保存最近一次探测到的房间上下文,供日报生成阶段辅助理解圈内梗。 "room_context": room_context, } self.redis_manager.set_room_status(room_id, status_obj) if prev_live is None and curr_live is False: continue if prev_live is None and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url, room_context) continue if prev_live is False and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url, room_context) continue if prev_live is True and curr_live is False: await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1, room_context) continue if prev_live is True and curr_live is True and room_id not in self._danmu_recorders: try: room_session = self._open_or_resume_session(room_id, nickname, room_name, room_context) if room_session: logger.info( f"检测到持续直播状态,续接斗鱼直播会话({room_id}): " f"session={room_session.get('session_id')}" ) logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: logger.exception( f"补偿启动斗鱼弹幕记录失败({room_id}): {self._format_exception(e)}" ) continue await asyncio.sleep(0.1) except Exception as e: logger.exception( f"斗鱼检查失败(room_id={room_id}): {self._format_exception(e)}" ) continue except Exception as e: logger.exception(f"斗鱼定时任务异常: {self._format_exception(e)}") async def _notify_groups_live( self, room_id: str, nickname: str, room_name: str, thumb_url: str, room_context: Optional[Dict[str, Any]] = None, ): groups = self.redis_manager.groups_for_room(room_id) text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看:https://www.douyu.com/{room_id}" xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url) for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: subs = self.redis_manager.list_group_subscribers(gid) if subs: await self.bot.send_at_message(gid, text, subs) else: await self.bot.send_text_message(gid, text) await self.bot.send_link_xml_message(xml_content, gid) except Exception as e: logger.error(f"发送斗鱼开播提醒失败: {e}") continue try: session = self._open_or_resume_session(room_id, nickname, room_name, room_context or {}) if session: logger.info( f"斗鱼直播会话开启/续接: room={room_id}, session={session.get('session_id')}, " f"segments={len(session.get('segments', []))}, anchor_day={session.get('anchor_day')}" ) logger.info(f"启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: logger.error(f"启动斗鱼弹幕记录失败({room_id}): {e}") async def _notify_groups_offline( self, room_id: str, nickname: str, room_name: str, is_loop: bool = False, room_context: Optional[Dict[str, Any]] = None, ): groups = self.redis_manager.groups_for_room(room_id) text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}" if is_loop: text += "(当前为轮播)" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼下播提醒失败: {e}") continue try: session = self._close_active_session(room_id, nickname, room_name, room_context or {}) if session: logger.info( f"斗鱼直播会话关闭片段: room={room_id}, session={session.get('session_id')}, " f"segments={len(session.get('segments', []))}, is_live={session.get('is_live')}" ) logger.info(f"停止斗鱼弹幕记录({room_id})") self._stop_danmu_record(room_id) except Exception as e: logger.error(f"停止斗鱼弹幕记录失败({room_id}): {e}") async def _scheduled_yuba_check_job(self): try: yubas = self.redis_manager.all_subscribed_yubas() if not yubas: return async with aiohttp.ClientSession() as session: for hash_id in yubas: try: params = { "filter_type": 1, "hash_id": hash_id, "limit": 10, "offset": 0 } headers = { "User-Agent": self._user_agent, "Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news", } data = await self._fetch_json_with_retries( session, self._yuba_api, headers, context=f"斗鱼鱼吧检查(hash_id={hash_id})", params=params ) if data.get("error") != 0: logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}") continue feed_list = data.get("data", {}).get("feed_list", []) # 查找第一条【非置顶】动态 target_feed = None for feed in feed_list: if feed.get("home_feed_top") == 1: continue target_feed = feed break if not target_feed: continue feed_id = str(target_feed.get("feed_id")) last_id = self.redis_manager.get_yuba_last_id(hash_id) if last_id and feed_id == last_id: continue # 发现新动态 nickname = target_feed.get("publisher", {}).get("nickname", "未知主播") content = target_feed.get("text", "") ctime = target_feed.get("ctime") from datetime import datetime publish_time = datetime.fromtimestamp(int(ctime)).strftime( '%Y-%m-%d %H:%M:%S') if ctime else "未知时间" # 限制内容长度 if len(content) > 200: content = content[:200] + "..." full_url = f"https://yuba.douyu.com/feed/{feed_id}" await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time) # 保存标记 self.redis_manager.set_yuba_last_id(hash_id, feed_id) await asyncio.sleep(0.5) except Exception as e: logger.exception( f"检查斗鱼鱼吧({hash_id})失败: {self._format_exception(e)}" ) continue except Exception as e: logger.exception(f"斗鱼鱼吧定时任务异常: {self._format_exception(e)}") async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str, publish_time: str = "未知时间"): groups = self.redis_manager.groups_for_yuba(hash_id) text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}") continue def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder: recorder = self._danmu_recorders.get(room_id) if not recorder: recorder = DouyuDanmuRecorder( room_id, self._user_agent, stats_callback=self._record_room_audience_point, stats_sample_interval_seconds=self._audience_stats_sample_interval_seconds, ) self._danmu_recorders[room_id] = recorder return recorder @classmethod def _normalize_audience_points(cls, points: List[Dict[str, Any]], limit: int = 720) -> List[Dict[str, Any]]: minute_map: Dict[str, Dict[str, Any]] = {} for item in points or []: timestamp = str(item.get("timestamp") or "").strip() point_dt = cls._parse_session_time(timestamp) if not point_dt: continue minute_key = point_dt.strftime("%Y-%m-%d %H:%M") minute_map[minute_key] = { "timestamp": timestamp, "vip_count": int(item.get("vip_count", 0) or 0), "diamond_count": int(item.get("diamond_count", 0) or 0), } normalized = list(minute_map.values()) normalized.sort(key=lambda row: row.get("timestamp", "")) if len(normalized) <= limit: return normalized bucket_size_minutes = max((len(normalized) + limit - 1) // limit, 1) bucket_map: Dict[str, Dict[str, Any]] = {} for item in normalized: point_dt = cls._parse_session_time(str(item.get("timestamp") or "")) if not point_dt: continue total_minutes = int(point_dt.timestamp() // 60) bucket_start_minutes = total_minutes - (total_minutes % bucket_size_minutes) bucket_key = str(bucket_start_minutes) bucket_map[bucket_key] = item compressed = list(bucket_map.values()) compressed.sort(key=lambda row: row.get("timestamp", "")) if len(compressed) > limit: compressed = compressed[-limit:] return compressed def _record_room_audience_point(self, room_id: str, point: Dict[str, Any]) -> None: if not self.redis_manager or not room_id: return session = self.redis_manager.get_latest_room_session(room_id) if not session or not bool(session.get("is_live")): return current_points = self._normalize_audience_points(list(session.get("audience_points", []) or [])) merged_points = self._normalize_audience_points(current_points + [point]) if merged_points == current_points: return session["audience_points"] = merged_points session["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.redis_manager.save_room_session(room_id, session) def _resolve_anchor_day(self, target_dt: datetime) -> str: if target_dt.hour < self._session_cutoff_hour: target_dt = target_dt - timedelta(days=1) return target_dt.strftime("%Y-%m-%d") @staticmethod def _parse_session_time(value: str) -> Optional[datetime]: if not value: return None try: return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S") except Exception: return None @staticmethod def _find_open_segment(session: Dict[str, Any]) -> Optional[Dict[str, Any]]: for segment in reversed(session.get("segments", []) or []): if not str(segment.get("end_time") or "").strip(): return segment return None def _should_merge_with_latest_session(self, latest_session: Optional[Dict[str, Any]], now_dt: datetime) -> bool: if not latest_session: return False if latest_session.get("is_live"): return True segments = latest_session.get("segments", []) or [] if not segments: return False last_segment = segments[-1] end_dt = self._parse_session_time(last_segment.get("end_time", "")) if not end_dt: return False gap_seconds = (now_dt - end_dt).total_seconds() return 0 <= gap_seconds <= self._merge_gap_hours * 3600 def _open_or_resume_session( self, room_id: str, nickname: str, room_name: str, room_context: Optional[Dict[str, Any]] = None, ) -> Optional[Dict[str, Any]]: if not self.redis_manager: return None now_dt = datetime.now() now_str = now_dt.strftime("%Y-%m-%d %H:%M:%S") latest_session = self.redis_manager.get_latest_room_session(room_id) or {} if self._should_merge_with_latest_session(latest_session, now_dt): session = dict(latest_session) open_segment = self._find_open_segment(session) if not open_segment: segments = list(session.get("segments", []) or []) segments.append({"start_time": now_str, "end_time": ""}) session["segments"] = segments else: anchor_day = self._resolve_anchor_day(now_dt) session = { "session_id": f"{room_id}_{anchor_day.replace('-', '')}_{now_dt.strftime('%H%M%S')}", "room_id": room_id, "anchor_day": anchor_day, "nickname": nickname, "room_name": room_name, "room_context": dict(room_context or {}), "segments": [{"start_time": now_str, "end_time": ""}], "audience_points": [], "is_live": True, "summary_status": "pending", "summary_generated_at": "", "created_at": now_str, } session["nickname"] = nickname or session.get("nickname", "") session["room_name"] = room_name or session.get("room_name", "") if room_context: session["room_context"] = dict(room_context) session["audience_points"] = self._normalize_audience_points(list(session.get("audience_points", []) or [])) session["is_live"] = True session["updated_at"] = now_str session["last_live_at"] = now_str self.redis_manager.save_room_session(room_id, session) return session def _close_active_session( self, room_id: str, nickname: str, room_name: str, room_context: Optional[Dict[str, Any]] = None, ) -> Optional[Dict[str, Any]]: if not self.redis_manager: return None session = self.redis_manager.get_latest_room_session(room_id) if not session: return None now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") open_segment = self._find_open_segment(session) if open_segment: open_segment["end_time"] = now_str session["nickname"] = nickname or session.get("nickname", "") session["room_name"] = room_name or session.get("room_name", "") if room_context: session["room_context"] = dict(room_context) session["is_live"] = False session["updated_at"] = now_str session["last_offline_at"] = now_str self.redis_manager.save_room_session(room_id, session) return session def get_room_session(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: if not self.redis_manager or not room_id: return None if session_id: return self.redis_manager.get_room_session(room_id, session_id) return self.redis_manager.get_latest_room_session(room_id) def build_session_danmu_material(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: """ 旁路能力:从已有日文件中按直播 session 抽取有效弹幕,并压缩成可供后续总结使用的材料。 当前不影响提醒、采集、群消息发送主流程。 """ session = self.get_room_session(room_id, session_id=session_id) if not session: return None messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) material = DouyuDanmuSummaryHelper.build_summary_material(room_id, session, messages) material["session"] = { "session_id": session.get("session_id", ""), "anchor_day": session.get("anchor_day", ""), "nickname": session.get("nickname", ""), "room_name": session.get("room_name", ""), "is_live": bool(session.get("is_live")), } return material def build_session_llm_payload(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: """ 旁路能力:构造可直接发送给 LLM 的弹幕总结载荷。 不改变现有弹幕采集和通知主流程。 """ session = self.get_room_session(room_id, session_id=session_id) if not session: return None messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) return DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages) def _daily_report_job_key(self, day_key: str) -> str: return f"{self.redis_manager.prefix}daily_report_job:{day_key}" def _daily_report_room_key(self, room_id: str, anchor_day: str) -> str: return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}" @staticmethod def _daily_report_cache_dir() -> str: path = os.path.join("temp", "douyu_materials") os.makedirs(path, exist_ok=True) return path def _daily_report_cache_path(self, room_id: str, anchor_day: str, report_kind: str = "operator") -> str: # 把不同风格的日报结果拆到独立缓存文件中: # 1. 运营版继续使用 operator; # 2. 粉丝向恶搞版使用 fans; # 3. 这样两套模板和文本互不覆盖,便于分别调试和回归。 safe_kind = str(report_kind or "operator").strip().lower() or "operator" return os.path.join( self._daily_report_cache_dir(), f"{room_id}_{anchor_day.replace('-', '')}_{safe_kind}_daily_report_result.json", ) def _load_daily_report_cache(self, room_id: str, anchor_day: str, report_kind: str = "operator") -> Optional[Dict[str, Any]]: cache_path = self._daily_report_cache_path(room_id, anchor_day, report_kind=report_kind) if not os.path.exists(cache_path): return None try: with open(cache_path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): return data except Exception as e: logger.warning( f"读取斗鱼每日报告缓存失败(room={room_id}, day={anchor_day}, kind={report_kind}): {e}" ) return None def _save_daily_report_cache( self, room_id: str, anchor_day: str, data: Dict[str, Any], report_kind: str = "operator", ) -> None: cache_path = self._daily_report_cache_path(room_id, anchor_day, report_kind=report_kind) try: with open(cache_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: logger.warning( f"保存斗鱼每日报告缓存失败(room={room_id}, day={anchor_day}, kind={report_kind}): {e}" ) @staticmethod def _resolve_existing_report_image(image_path: Optional[str]) -> Optional[str]: path = str(image_path or "").strip() if not path: return None return path if os.path.exists(path) else None def _should_run_daily_report(self, now_dt: datetime) -> bool: time_text = str(self._daily_report_time or "").strip() try: target_hour, target_minute = [int(part) for part in time_text.split(":", 1)] except Exception: return False target_dt = now_dt.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) if now_dt < target_dt or now_dt > target_dt + timedelta(minutes=4, seconds=59): return False last_run = self.redis_manager.get_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d"))) return not last_run def _load_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]: if not self.redis_manager: return [] sessions = [] for session_id in self.redis_manager.list_room_session_ids(room_id, limit=30): session = self.redis_manager.get_room_session(room_id, session_id) if not session: continue if str(session.get("anchor_day") or "") != anchor_day: continue sessions.append(session) sessions.sort( key=lambda item: str(((item.get("segments") or [{}])[0]).get("start_time", "")), ) if sessions: return sessions[:self._daily_report_max_sessions] inferred_sessions = self._infer_sessions_for_anchor_day(room_id, anchor_day) if inferred_sessions: logger.info( f"斗鱼每日报告使用弹幕文件回推 session: room={room_id}, day={anchor_day}, " f"count={len(inferred_sessions)}" ) return inferred_sessions[:self._daily_report_max_sessions] return [] def _infer_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]: date_key = anchor_day.replace("-", "") day_messages = DouyuDanmuSummaryHelper.load_day_messages(room_id, date_key) if not day_messages: return [] inferred_sessions = DouyuDanmuSummaryHelper.infer_sessions_from_messages( room_id, day_messages, session_cutoff_hour=self._session_cutoff_hour, merge_gap_hours=self._merge_gap_hours, min_session_messages=min(50, self._daily_report_min_messages), ) inferred_sessions = [ item for item in inferred_sessions if str(item.get("anchor_day") or "") == anchor_day ] if inferred_sessions: return inferred_sessions if len(day_messages) < self._daily_report_min_messages: return [] ordered = sorted(day_messages, key=lambda item: item.get("timestamp") or datetime.min) start_dt = ordered[0].get("timestamp") end_dt = ordered[-1].get("timestamp") if not isinstance(start_dt, datetime) or not isinstance(end_dt, datetime): return [] return [{ "session_id": f"{room_id}_{date_key}_fallback", "room_id": room_id, "anchor_day": anchor_day, "nickname": "", "room_name": "", "segments": [{ "start_time": start_dt.strftime("%Y-%m-%d %H:%M:%S"), "end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"), }], "is_live": False, "source": "fallback_full_day", }] def _build_audience_trend(self, sessions: List[Dict[str, Any]]) -> Dict[str, Any]: points: List[Dict[str, Any]] = [] segment_start_times: List[str] = [] segment_end_times: List[str] = [] for session in sessions: for segment in session.get("segments", []) or []: start_time = str(segment.get("start_time") or "").strip() end_time = str(segment.get("end_time") or "").strip() if start_time: segment_start_times.append(start_time) if end_time: segment_end_times.append(end_time) for item in session.get("audience_points", []) or []: point = { "timestamp": str(item.get("timestamp") or "").strip(), "vip_count": int(item.get("vip_count", 0) or 0), "diamond_count": int(item.get("diamond_count", 0) or 0), } if point["timestamp"]: points.append(point) points = self._normalize_audience_points(points, limit=1440) if not points: return {"points": [], "summary": {}} vip_values = [int(item.get("vip_count", 0) or 0) for item in points] diamond_values = [int(item.get("diamond_count", 0) or 0) for item in points] labels = [str(item.get("timestamp") or "")[-8:-3] for item in points] session_start = min(segment_start_times) if segment_start_times else "" session_end = max(segment_end_times) if segment_end_times else str(points[-1].get("timestamp") or "") first_point_time = str(points[0].get("timestamp") or "") last_point_time = str(points[-1].get("timestamp") or "") leading_gap_minutes = 0 if session_start and first_point_time: start_dt = self._parse_session_time(session_start) point_dt = self._parse_session_time(first_point_time) if start_dt and point_dt: leading_gap_minutes = max(int((point_dt - start_dt).total_seconds() // 60), 0) return { "points": points, "summary": { "point_count": len(points), "vip_min": min(vip_values), "vip_max": max(vip_values), "vip_latest": vip_values[-1], "diamond_min": min(diamond_values), "diamond_max": max(diamond_values), "diamond_latest": diamond_values[-1], "labels": labels, "session_start": session_start, "session_end": session_end, "first_point_time": first_point_time, "last_point_time": last_point_time, "leading_gap_minutes": leading_gap_minutes, }, } def _build_daily_report_payload(self, room_id: str, anchor_day: str, sessions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: if not sessions: return None session_payloads: List[Dict[str, Any]] = [] total_message_count = 0 total_noise_filtered_count = 0 total_organized_message_count = 0 total_unique_users: Set[str] = set() merged_templates: List[Dict[str, Any]] = [] repeated_messages: List[Dict[str, Any]] = [] peak_buckets: List[Dict[str, Any]] = [] representative_messages: List[Dict[str, Any]] = [] raw_window_samples: List[Dict[str, Any]] = [] top_terms_counter = Counter() burst_terms_counter = Counter() operator_totals = { "fans_badge_user_count": 0, "fans_badge_message_count": 0, "high_room_level_user_count": 0, "high_fans_level_user_count": 0, "noble_user_count": 0, "noble_message_count": 0, "active_users_5plus": 0, "active_users_10plus": 0, } top_badge_counter = Counter() top_badge_message_counter = Counter() top_active_user_map: Dict[str, Dict[str, Any]] = {} nickname = "" room_name = "" for session in sessions: messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) if len(messages) < self._daily_report_min_messages: continue payload = DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages) session_payloads.append(payload) meta = payload.get("session_meta", {}) or {} operator_metrics = payload.get("operator_metrics", {}) or {} total_message_count += int(meta.get("message_count", 0) or 0) total_noise_filtered_count += int(meta.get("noise_filtered_count", 0) or 0) total_organized_message_count += int(meta.get("organized_message_count", 0) or 0) nickname = nickname or str(meta.get("nickname") or session.get("nickname") or "") room_name = room_name or str(meta.get("room_name") or session.get("room_name") or "") for key in operator_totals: operator_totals[key] += int(operator_metrics.get(key, 0) or 0) for item in payload.get("merged_templates", []) or []: merged_templates.append(dict(item)) for item in payload.get("repeated_messages", []) or []: repeated_messages.append(dict(item)) for item in payload.get("peak_buckets", []) or []: peak_buckets.append(dict(item)) for item in payload.get("representative_messages", []) or []: representative_messages.append(dict(item)) for item in payload.get("raw_window_samples", []) or []: raw_window_samples.append(dict(item)) for item in payload.get("top_terms", []) or []: term = str(item.get("term") or "").strip() if term: top_terms_counter[term] += int(item.get("count", 0) or 0) for item in payload.get("burst_terms", []) or []: term = str(item.get("text") or "").strip() if term: burst_terms_counter[term] += int(item.get("count", 0) or 0) for item in operator_metrics.get("top_badges", []) or []: badge_name = str(item.get("badge_name") or "").strip() if badge_name: top_badge_counter[badge_name] += int(item.get("user_count", 0) or 0) top_badge_message_counter[badge_name] += int(item.get("message_count", 0) or 0) for item in operator_metrics.get("top_active_users", []) or []: uid = str(item.get("uid") or "").strip() if not uid: continue existing = top_active_user_map.get(uid) message_count = int(item.get("message_count", 0) or 0) organized_message_count = int(item.get("organized_message_count", 0) or 0) room_level = int(item.get("room_level", 0) or 0) fans_level = int(item.get("fans_level", 0) or 0) if not existing: top_active_user_map[uid] = { "uid": uid, "nickname": str(item.get("nickname") or "").strip(), "message_count": message_count, "organized_message_count": organized_message_count, "room_level": room_level, "fans_name": str(item.get("fans_name") or "").strip(), "fans_level": fans_level, "noble_name": str(item.get("noble_name") or "").strip(), } continue existing["message_count"] = int(existing.get("message_count", 0) or 0) + message_count existing["organized_message_count"] = int(existing.get("organized_message_count", 0) or 0) + organized_message_count if not str(existing.get("nickname") or "").strip(): existing["nickname"] = str(item.get("nickname") or "").strip() if room_level > int(existing.get("room_level", 0) or 0): existing["room_level"] = room_level if fans_level > int(existing.get("fans_level", 0) or 0): existing["fans_level"] = fans_level if not str(existing.get("fans_name") or "").strip(): existing["fans_name"] = str(item.get("fans_name") or "").strip() if not str(existing.get("noble_name") or "").strip(): existing["noble_name"] = str(item.get("noble_name") or "").strip() for session_message in messages: uid = str(session_message.get("uid") or "").strip() if uid: total_unique_users.add(uid) if not session_payloads: return None merged_templates.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) repeated_messages.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) peak_buckets.sort(key=lambda item: int(item.get("message_count", 0) or 0), reverse=True) artifact_dir = os.path.join("temp", "douyu_materials") os.makedirs(artifact_dir, exist_ok=True) audience_trend = self._build_audience_trend(sessions) room_context = self._build_room_semantic_context(room_id, nickname, room_name, sessions) payload = { "report_meta": { "room_id": room_id, "anchor_day": anchor_day, "nickname": nickname, "room_name": room_name, "session_count": len(session_payloads), "message_count": total_message_count, "noise_filtered_count": total_noise_filtered_count, "organized_message_count": total_organized_message_count, "unique_user_count": len(total_unique_users), }, "operator_metrics": { **operator_totals, "fans_badge_user_ratio": round(operator_totals["fans_badge_user_count"] / max(len(total_unique_users), 1), 4), "top_badges": [ { "badge_name": badge_name, "user_count": user_count, "message_count": int(top_badge_message_counter.get(badge_name, 0) or 0), } for badge_name, user_count in top_badge_counter.most_common(10) ], "top_active_users": sorted( top_active_user_map.values(), key=lambda item: ( int(item.get("message_count", 0) or 0), int(item.get("organized_message_count", 0) or 0), ), reverse=True, )[:12], }, "sessions": [ { "session_id": (item.get("session_meta", {}) or {}).get("session_id", ""), "segments": (item.get("session_meta", {}) or {}).get("segments", []), "message_count": (item.get("session_meta", {}) or {}).get("message_count", 0), "organized_message_count": (item.get("session_meta", {}) or {}).get("organized_message_count", 0), } for item in session_payloads ], # 直播间语义上下文: # 1. 给 LLM 一个“这是什么圈子”的先验; # 2. 主要用于 Dota2 这类重人物关系、重职业生涯梗的直播间; # 3. 不替代真实弹幕,只帮助模型更准确解释黑话和典故。 "room_context": room_context, "audience_trend": audience_trend, "merged_templates": merged_templates[:24], "repeated_messages": repeated_messages[:24], "top_terms": [{"term": term, "count": count} for term, count in top_terms_counter.most_common(24)], "burst_terms": [{"text": term, "count": count} for term, count in burst_terms_counter.most_common(16)], "peak_buckets": peak_buckets[:10], "representative_messages": representative_messages[:24], "raw_window_samples": raw_window_samples[:10], } artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json") with open(artifact_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) return payload def _build_daily_report_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]: meta = payload.get("report_meta", {}) or {} room_context_prompt = self._build_room_context_prompt_block(payload) system_prompt = ( "你是斗鱼直播日报助手。请基于给定的结构化弹幕材料,输出一份适合发群的中文日报。" "要求简洁、自然、信息密度高,不要编造,不要使用代码块。" "如果材料显示这是 Dota2 / 电竞语境,请优先按该圈层理解弹幕中的人物、黑话、历史梗和职业生涯梗。" ) user_prompt = ( "请输出一份斗鱼每日报告,格式要求:\n" "1. 第一行写标题,包含主播名和日期。\n" "2. 用 3-5 条概括直播主线、弹幕情绪、观众关注点。\n" "3. 单独补充运营视角观察,比如带牌活跃用户、高等级用户、核心发言用户、活跃牌子分布。\n" "4. 单独列出高频梗/复读内容(不超过 5 条)。\n" "5. 单独列出 2-3 个热点时段。\n" "6. 整体控制在 600 字以内。\n\n" f"{room_context_prompt}" f"材料如下:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt def _build_danmu_summary_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]: meta = payload.get("report_meta", {}) or {} room_context_prompt = self._build_room_context_prompt_block(payload) system_prompt = ( "你是直播弹幕总结助手。请只根据给定材料,总结这场直播的弹幕内容与氛围。" "不要输出运营数据,不要编造,不要写空话套话。" "如果材料表明这是 Dota2 / 电竞直播间,请优先把梗理解为圈内人物、职业经历、赛事记忆和主播关系梗。" ) user_prompt = ( "请输出一段适合放在日报图片上半部分的弹幕总结,要求:\n" "1. 先用 1 段总述直播氛围与主线。\n" "2. 再用 5 条要点总结观众关注点、情绪变化、反复出现的梗、节奏变化和额外反馈,每条只写一句。\n" "3. 另起一行固定写标题:`【粉丝向弹幕萃取】`。\n" "4. 在该标题下输出 4-6 条短句,尽量保留弹幕原话风格(可以保留口头语、玩梗、情绪词)。\n" "5. 整体语气要像“直播间现场记录”,不要写成运营复盘。\n" "6. 不要写“根据数据”“建议”“策略”等词。\n\n" f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n" f"日期:{meta.get('anchor_day', '')}\n" f"{room_context_prompt}" f"材料:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt def _build_fans_daily_report_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]: """ 粉丝版日报提示词设计目标: 1. 和运营版彻底区分开,不再强调“策略、复盘、活跃质量”; 2. 保留真实弹幕语境,让输出像“群友拿着回放在整活”; 3. 允许轻微恶搞和夸张,但不能编造未出现的事件,也不能攻击主播或观众。 """ meta = payload.get("report_meta", {}) or {} room_context_prompt = self._build_room_context_prompt_block(payload) system_prompt = ( "你是斗鱼直播间的粉丝向整活日报编辑。" "请只根据提供的真实弹幕材料,输出一份开心、欢乐、带一点恶搞气质的中文总结。" "语气要像群友在复盘名场面,不要写成运营分析,不要编造剧情,不要使用代码块。" "如果这是 Dota2 / 电竞语境直播间,请优先按刀圈/电竞圈人物关系、职业生涯、老比赛和主播互动梗去理解笑点。" ) user_prompt = ( "请输出一份适合给粉丝看的《斗鱼弹幕乐子日报》,严格按下面结构输出:\n" "1. 开头先写 1 段总述,概括今天直播间的整体节目效果和气氛。\n" "2. 另起一行写标题:`【今日笑点】`,下面写 4 条 bullet,每条一句,突出最有节目效果的地方。\n" "3. 另起一行写标题:`【弹幕名场面】`,下面写 4-6 条 bullet,尽量保留弹幕原话风格,像现场回放。\n" "4. 另起一行写标题:`【梗王榜】`,下面写 3 条 bullet,把今天最刷屏、最有共识的梗排出来。\n" "5. 另起一行写标题:`【收尾播报】`,下面只写 1 句收尾,轻松一点,像群里发图后的总结句。\n" "6. 可以夸张一点、调皮一点,但不要低俗,不要攻击主播,不要使用“建议、策略、转化、数据表现”等运营词。\n\n" f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n" f"日期:{meta.get('anchor_day', '')}\n" f"{room_context_prompt}" f"材料:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" ) return system_prompt, user_prompt def _build_funny_scene_lines(self, payload: Dict[str, Any], limit: int = 5) -> List[str]: """ 组装“弹幕名场面”兜底素材。 优先级: 1. 代表性原始弹幕,保证现场感; 2. 重复刷屏梗,保证“今天大家到底在笑什么”能被看出来。 """ lines: List[str] = [] seen = set() def push(text: str) -> None: value = str(text or "").strip() if not value: return normalized = value.lower() if normalized in seen: return seen.add(normalized) lines.append(value) for item in (payload.get("representative_messages", []) or [])[:12]: nickname = str(item.get("nickname") or "").strip() or "观众" content = str(item.get("content") or "").strip() if content: push(f"{nickname}:{content[:48]}") if len(lines) >= limit: return lines[:limit] for item in (payload.get("repeated_messages", []) or [])[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(f"复读现场:{text[:40]}(今天被刷了 {count} 次)") if len(lines) >= limit: return lines[:limit] return lines[:limit] def _build_fans_extract_lines(self, payload: Dict[str, Any], limit: int = 6) -> List[str]: # 粉丝向萃取强调“可读、像现场弹幕”,优先取代表发言,再补充重复梗与情绪短词。 representative_messages = payload.get("representative_messages", []) or [] repeated_messages = payload.get("repeated_messages", []) or [] merged_templates = payload.get("merged_templates", []) or [] burst_terms = payload.get("burst_terms", []) or [] lines: List[str] = [] seen = set() def push(text: str) -> None: value = str(text or "").strip() if not value: return key = value.lower() if key in seen: return seen.add(key) lines.append(value) for item in representative_messages[:10]: nickname = str(item.get("nickname") or "").strip() or "观众" content = str(item.get("content") or "").strip() if content: push(f"{nickname}:{content[:56]}") if len(lines) >= limit: return lines[:limit] for item in repeated_messages[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(f"复读梗「{text[:36]}」刷了 {count} 次。") if len(lines) >= limit: return lines[:limit] for item in merged_templates[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(f"共识弹幕「{text[:36]}」出现 {count} 次。") if len(lines) >= limit: return lines[:limit] for item in burst_terms[:4]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(f"情绪短词「{text}」集中出现 {count} 次。") if len(lines) >= limit: return lines[:limit] return lines[:limit] def _build_fallback_fans_daily_report(self, payload: Dict[str, Any]) -> str: """ 当 LLM 不可用或返回空内容时,仍然生成一份可直接发群的粉丝版日报。 兜底文本保持“有梗但不胡编”的原则,所有句子都只从真实弹幕统计结果里取材。 """ meta = payload.get("report_meta", {}) or {} top_terms = [ str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or [])[:5] if str(item.get("term") or "").strip() ] merged_templates = payload.get("merged_templates", []) or [] burst_terms = payload.get("burst_terms", []) or [] peak_buckets = payload.get("peak_buckets", []) or [] repeated_messages = payload.get("repeated_messages", []) or [] anchor_day = str(meta.get("anchor_day", "") or "") lead_parts = [ f"{anchor_day} 这场直播,弹幕区整体处于高能围观状态,大家一边盯着直播内容,一边围着" f"{'、'.join(top_terms[:4]) or '节目效果'}疯狂接梗。" ] if merged_templates: lead_parts.append( f"尤其是「{str(merged_templates[0].get('text') or '').strip()[:26]}」这类共识弹幕,一看就是全场默认会背。" ) lines = [" ".join(lead_parts).strip(), "【今日笑点】"] if peak_buckets: top_bucket = peak_buckets[0] lines.append( f"- {str(top_bucket.get('start_time') or '')[-8:-3]} 前后弹幕密度冲高,直播间像突然集体抢到麦,乐子值直接拉满。" ) if repeated_messages: first_repeat = repeated_messages[0] lines.append( f"- 复读冠军是「{str(first_repeat.get('text') or '').strip()[:32]}」,光这句就被来回刷了 {int(first_repeat.get('count', 0) or 0)} 次。" ) if burst_terms: first_burst = burst_terms[0] lines.append( f"- 情绪词「{str(first_burst.get('text') or '').strip()}」反复出现 {int(first_burst.get('count', 0) or 0)} 次,说明那一段大家已经彻底上头。" ) if top_terms: lines.append(f"- 今天的集体关注点基本围着 {'、'.join(top_terms[:4])} 打转,谁路过都会被梗吸进去。") lines.append("【弹幕名场面】") for item in self._build_funny_scene_lines(payload, limit=5): lines.append(f"- {item}") lines.append("【梗王榜】") rank_items: List[str] = [] for item in merged_templates[:2]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: rank_items.append(f"{text[:30]}|全场 {count} 次") for item in burst_terms[:2]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: rank_items.append(f"{text}|情绪爆发 {count} 次") for item in repeated_messages[:3]: if len(rank_items) >= 3: break text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: candidate = f"{text[:30]}|复读 {count} 次" if candidate not in rank_items: rank_items.append(candidate) for item in rank_items[:3]: lines.append(f"- {item}") lines.append("【收尾播报】") if peak_buckets: lines.append( f"- 今天的直播内容未必全部记住了,但 {str(peak_buckets[0].get('start_time') or '')[-8:-3]} 那波弹幕起哄,已经足够做成群内经典片段。" ) else: lines.append("- 今天的直播总结成一句话就是:画面会结束,梗不会下播。") return "\n".join(lines).strip() def _build_fallback_daily_report(self, payload: Dict[str, Any]) -> str: meta = payload.get("report_meta", {}) or {} title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") lines = [ f"斗鱼每日报告 | {title_name} | {meta.get('anchor_day', '')}", f"共 {meta.get('session_count', 0)} 场,弹幕 {meta.get('message_count', 0)} 条,参与用户 {meta.get('unique_user_count', 0)} 人。", ] operator_metrics = payload.get("operator_metrics", {}) or {} sessions = payload.get("sessions", []) or [] if sessions: session_parts = [] for item in sessions[:4]: segments = item.get("segments", []) or [] if not segments: continue start_time = str(segments[0].get("start_time", ""))[-8:-3] end_time = str(segments[-1].get("end_time", ""))[-8:-3] session_parts.append(f"{start_time}-{end_time}") if session_parts: lines.append("场次时间:" + " / ".join(session_parts)) top_terms = payload.get("top_terms", []) or [] if top_terms: lines.append("关注焦点:" + "、".join([str(item.get("term") or "") for item in top_terms[:8] if str(item.get("term") or "").strip()])) if operator_metrics: op_parts = [] fans_badge_user_count = int(operator_metrics.get("fans_badge_user_count", 0) or 0) high_room_level_user_count = int(operator_metrics.get("high_room_level_user_count", 0) or 0) high_fans_level_user_count = int(operator_metrics.get("high_fans_level_user_count", 0) or 0) active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0) if fans_badge_user_count: op_parts.append(f"带牌活跃用户 {fans_badge_user_count}") if high_room_level_user_count: op_parts.append(f"30级+活跃用户 {high_room_level_user_count}") if high_fans_level_user_count: op_parts.append(f"10级+粉丝牌用户 {high_fans_level_user_count}") if active_users_10plus: op_parts.append(f"高活跃核心用户 {active_users_10plus}") if op_parts: lines.append("运营侧:" + ",".join(op_parts)) top_badges = operator_metrics.get("top_badges", []) or [] if top_badges: lines.append("活跃粉丝牌:") for item in top_badges[:5]: badge_name = str(item.get("badge_name") or "").strip() user_count = int(item.get("user_count", 0) or 0) message_count = int(item.get("message_count", 0) or 0) if badge_name: lines.append(f"- {badge_name}:{user_count}人,{message_count}条") merged_templates = payload.get("merged_templates", []) or [] if merged_templates: lines.append("高频梗:") for item in merged_templates[:5]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: lines.append(f"- {text[:42]}({count}次)") peak_buckets = payload.get("peak_buckets", []) or [] if peak_buckets: lines.append("热点时段:") for item in peak_buckets[:3]: start_time = str(item.get("start_time") or "")[-8:-3] message_count = int(item.get("message_count", 0) or 0) terms = "、".join( [str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] ) lines.append(f"- {start_time},{message_count}条,关键词:{terms}") representative_messages = payload.get("representative_messages", []) or [] if representative_messages: lines.append("代表弹幕:") for item in representative_messages[:4]: nickname = str(item.get("nickname") or "").strip() content = str(item.get("content") or "").strip() if content: lines.append(f"- {nickname}:{content[:60]}") text = "\n".join(lines).strip() if len(text) > self._daily_report_max_length: text = text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)" return text def _build_fallback_danmu_summary(self, payload: Dict[str, Any]) -> str: meta = payload.get("report_meta", {}) or {} top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or [])[:6] if str(item.get("term") or "").strip()] merged_templates = payload.get("merged_templates", []) or [] peak_buckets = payload.get("peak_buckets", []) or [] representative_messages = payload.get("representative_messages", []) or [] lines = [ f"{meta.get('anchor_day', '')} 这场直播弹幕整体比较密集,讨论重心主要围绕 {'、'.join(top_terms[:4]) or '对局过程'} 展开,观众互动意愿较强,梗和复读内容持续出现。" ] if merged_templates: sample_templates = ";".join( [str(item.get("text") or "").strip()[:26] for item in merged_templates[:3] if str(item.get("text") or "").strip()] ) if sample_templates: lines.append(f"- 主线观察:直播间共识梗很强,重复刷屏内容主要集中在 {sample_templates}。") for item in merged_templates[:4]: break if peak_buckets: top_bucket = peak_buckets[0] terms = "、".join( [str(term.get("term") or "") for term in (top_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] ) lines.append( f"- 节奏变化:高峰集中在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,单时段弹幕 {int(top_bucket.get('message_count', 0) or 0)} 条,关键词偏向 {terms}。" ) if len(peak_buckets) > 1: second_bucket = peak_buckets[1] second_terms = "、".join( [str(term.get("term") or "") for term in (second_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] ) lines.append( f"- 热点补充:{str(second_bucket.get('start_time') or '')[-8:-3]} 也出现明显抬升,弹幕讨论继续围绕 {second_terms} 展开。" ) if representative_messages: lines.append("- 情绪特点:代表性发言里既有对操作和决策的即时反馈,也有大量玩梗、调侃和情绪宣泄。") if top_terms: lines.append(f"- 关注焦点:高频词主要落在 {'、'.join(top_terms[:6])},说明观众注意力相对集中。") # 在兜底模式下也强制补出“粉丝向弹幕萃取”,避免图片模板出现空区块。 fans_extract_lines = self._build_fans_extract_lines(payload, limit=6) if fans_extract_lines: lines.append("【粉丝向弹幕萃取】") for item in fans_extract_lines: lines.append(f"- {item}") return "\n".join(lines).strip() def _build_operator_summary_text(self, payload: Dict[str, Any]) -> str: meta = payload.get("report_meta", {}) or {} operator_metrics = payload.get("operator_metrics", {}) or {} total_users = int(meta.get("unique_user_count", 0) or 0) fans_badge_users = int(operator_metrics.get("fans_badge_user_count", 0) or 0) high_room_users = int(operator_metrics.get("high_room_level_user_count", 0) or 0) high_fans_users = int(operator_metrics.get("high_fans_level_user_count", 0) or 0) active_users_5plus = int(operator_metrics.get("active_users_5plus", 0) or 0) active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0) fans_badge_ratio = float(operator_metrics.get("fans_badge_user_ratio", 0) or 0) lines = [ f"- 活跃用户规模:{total_users} 人,其中发言 5 次以上 {active_users_5plus} 人,10 次以上 {active_users_10plus} 人。", f"- 粉丝粘性:带粉丝牌活跃用户 {fans_badge_users} 人,占活跃用户 {fans_badge_ratio * 100:.1f}%;10 级以上粉丝牌用户 {high_fans_users} 人。", f"- 用户质量:房间等级 30 级以上活跃用户 {high_room_users} 人,说明高等级老观众参与度不低。", ] audience_summary = (payload.get("audience_trend", {}) or {}).get("summary", {}) or {} if audience_summary: vip_min = int(audience_summary.get("vip_min", 0) or 0) vip_max = int(audience_summary.get("vip_max", 0) or 0) diamond_latest = int(audience_summary.get("diamond_latest", 0) or 0) point_count = int(audience_summary.get("point_count", 0) or 0) lines.append( f"- 人数走势:WS 侧共采样 {point_count} 个时间点,贵宾在 {vip_min}-{vip_max} 区间波动,钻粉收盘约 {diamond_latest}。" ) top_badges = payload.get("operator_metrics", {}).get("top_badges", []) or [] if top_badges: badge_parts = [] for item in top_badges[:5]: badge_name = str(item.get("badge_name") or "").strip() if not badge_name: continue badge_parts.append(f"{badge_name} {int(item.get('user_count', 0) or 0)}人/{int(item.get('message_count', 0) or 0)}条") if badge_parts: lines.append(f"- 活跃牌子分布:{';'.join(badge_parts)}。") top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or [] if top_active_users: core_parts = [] for item in top_active_users[:5]: nickname = str(item.get("nickname") or item.get("uid") or "").strip() msg_count = int(item.get("message_count", 0) or 0) fans_name = str(item.get("fans_name") or "").strip() fans_level = int(item.get("fans_level", 0) or 0) room_level = int(item.get("room_level", 0) or 0) tags = [] if fans_name: if fans_level > 0: tags.append(f"{fans_name} Lv{fans_level}") else: tags.append(fans_name) if room_level > 0: tags.append(f"平台 Lv{room_level}") tags.append(f"{msg_count}条") core_parts.append(f"{nickname}({','.join(tags)})") if core_parts: lines.append(f"- 核心发言用户:{';'.join(core_parts)}。") return "\n".join(lines).strip() def _build_operator_summary_lines(self, payload: Dict[str, Any]) -> List[str]: return [line.strip()[2:].strip() for line in self._build_operator_summary_text(payload).splitlines() if line.strip().startswith("- ")] def _build_dify_daily_report_inputs( self, *, task_type: str, system_prompt: str, user_prompt: str, payload: Dict[str, Any], ) -> Dict[str, Any]: """ 组装斗鱼日报在 Dify Workflow 下的输入参数。 设计目标: 1. 让工作流既能拿到“最终自然语言提示词”,也能拿到“结构化原始载荷”; 2. 让一个工作流通过 task_type 同时处理「日报正文」和「弹幕摘要」两类任务; 3. 保留关键元信息,便于在工作流内做分支、日志与降级兜底。 """ meta = payload.get("report_meta", {}) or {} room_id = str(meta.get("room_id") or "").strip() anchor_day = str(meta.get("anchor_day") or "").strip() nickname = str(meta.get("nickname") or meta.get("room_name") or "").strip() # 说明: # 1. 部分 Dify Workflow 对输入变量类型校验较严格,复杂对象(dict/list)容易触发 400; # 2. 默认只提交精简字符串字段,优先保证链路可用; # 3. 如需在工作流内使用完整载荷,可通过 include_structured_inputs 开关启用。 inputs = { # 任务路由字段:在 Dify 条件分支里用于区分日报正文/弹幕摘要。 "task_type": task_type, # 兼容 Workflow 中直接读取 query 的场景。 "query": user_prompt, # 保留原有两段提示词,便于工作流内部二次拼装或调试。 "system_prompt": system_prompt, "user_prompt": user_prompt, # 关键元信息:用于日志、标题拼接、数据看板或异常追踪。 "room_id": room_id, "anchor_day": anchor_day, "nickname": nickname, # 控制输出长度:Dify 该变量在部分工作流中配置为 paragraph(字符串)类型, # 因此这里统一传字符串,避免出现 “max_length must be a string” 的 400 校验错误。 "max_length": str(int(self._daily_report_max_length or 1800)), } if self._daily_report_include_structured_inputs: inputs["report_payload_json"] = json.dumps(payload, ensure_ascii=False) return inputs def _call_daily_report_llm( self, *, task_type: str, system_prompt: str, user_prompt: str, payload: Dict[str, Any], tag: str, ) -> str: """ 统一封装斗鱼日报 LLM 调用。 - Dify provider:走 run(inputs) 进入 Workflow,确保输入结构稳定可编排; - 其他 provider:保持原 chat(system,user) 行为,兼容现有 OpenAI-compatible 配置。 """ if not self._daily_report_llm_client: return "" meta = payload.get("report_meta", {}) or {} room_id = str(meta.get("room_id") or "").strip() user_id = f"douyu_daily_report_{room_id or 'unknown'}" if self._daily_report_llm_client.provider == "dify": inputs = self._build_dify_daily_report_inputs( task_type=task_type, system_prompt=system_prompt, user_prompt=user_prompt, payload=payload, ) result = self._daily_report_llm_client.run( prompt=user_prompt, user=user_id, inputs=inputs, tag=tag, ) return str((result or {}).get("text", "") or "").strip() return self._daily_report_llm_client.chat( system_prompt, user_prompt, user_id=user_id, ).strip() async def _generate_danmu_summary_text(self, payload: Dict[str, Any]) -> str: if self._daily_report_use_llm and self._daily_report_llm_client: system_prompt, user_prompt = self._build_danmu_summary_prompt(payload) result = await asyncio.to_thread( self._call_daily_report_llm, task_type="danmu_summary", system_prompt=system_prompt, user_prompt=user_prompt, payload=payload, tag=f"douyu_danmu_summary_{(payload.get('report_meta', {}) or {}).get('room_id', '')}", ) if result: return result.strip() logger.warning( f"斗鱼弹幕总结 LLM 生成失败: model={self._daily_report_llm_client.model}, " f"last_error={self._daily_report_llm_client.last_error}" ) return self._build_fallback_danmu_summary(payload) async def _generate_fans_daily_report_text(self, payload: Dict[str, Any]) -> str: """ 生成独立的粉丝向恶搞日报正文。 这里继续复用统一的 LLM 客户端,但通过不同 task_type 和 prompt 把风格切开。 """ if self._daily_report_use_llm and self._daily_report_llm_client: system_prompt, user_prompt = self._build_fans_daily_report_prompt(payload) result = await asyncio.to_thread( self._call_daily_report_llm, task_type="fans_daily_report", system_prompt=system_prompt, user_prompt=user_prompt, payload=payload, tag=f"douyu_fans_daily_report_{(payload.get('report_meta', {}) or {}).get('room_id', '')}", ) if result: text = result.strip() if len(text) > self._daily_report_max_length: return text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)" return text logger.warning( f"斗鱼粉丝日报 LLM 生成失败: model={self._daily_report_llm_client.model}, " f"last_error={self._daily_report_llm_client.last_error}" ) return self._build_fallback_fans_daily_report(payload) async def _build_fans_daily_report_markdown(self, payload: Dict[str, Any]) -> str: """ Markdown 版本主要用于图片模板渲染失败时兜底。 即使最终还是走通用 markdown 截图,也要尽量保留粉丝版的结构感。 """ meta = payload.get("report_meta", {}) or {} title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") fans_report_text = await self._generate_fans_daily_report_text(payload) lines = [ f"# {title_name} 的弹幕乐子日报", f"{meta.get('anchor_day', '')}|弹幕 {meta.get('message_count', 0)}|围观群众 {meta.get('unique_user_count', 0)}", "", fans_report_text, ] return "\n".join(lines).strip() async def _render_fans_daily_report_image(self, payload: Dict[str, Any]) -> Optional[str]: markdown = await self._build_fans_daily_report_markdown(payload) room_id = str((payload.get("report_meta", {}) or {}).get("room_id", "") or "room") anchor_day = str((payload.get("report_meta", {}) or {}).get("anchor_day", "") or "").replace("-", "") filename = f"douyu_fans_daily_report_{room_id}_{anchor_day}.png" try: fans_report_text = await self._generate_fans_daily_report_text(payload) html_content = render_fans_daily_report_html( payload=payload, fans_report_text=fans_report_text, ) output_dir = os.path.join(os.getcwd(), "temp", "md2image") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, filename) await html_to_image(html_content, output_path) return str(Path(output_path).resolve()) except Exception as e: logger.error(f"斗鱼粉丝日报专用模板图片生成失败(room={room_id}, day={anchor_day}): {e}") try: return await convert_md_str_to_image(markdown, filename) except Exception as e: logger.error(f"斗鱼粉丝日报图片生成失败(room={room_id}, day={anchor_day}): {e}") return None async def _build_daily_report_markdown(self, payload: Dict[str, Any]) -> str: meta = payload.get("report_meta", {}) or {} title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") danmu_summary = await self._generate_danmu_summary_text(payload) operator_summary = self._build_operator_summary_text(payload) lines = [ f"# {title_name} 直播每日报告", f"{meta.get('anchor_day', '')}|场次 {meta.get('session_count', 0)}|弹幕 {meta.get('message_count', 0)}|活跃用户 {meta.get('unique_user_count', 0)}", "", "## 弹幕总结", danmu_summary, "", "## 运营数据总结", operator_summary, ] peak_buckets = payload.get("peak_buckets", []) or [] if peak_buckets: lines.extend([ "", "## 热点时段", ]) for item in peak_buckets[:3]: terms = "、".join( [str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] ) lines.append( f"- `{str(item.get('start_time') or '')[-8:-3]}` 弹幕 {int(item.get('message_count', 0) or 0)} 条,关键词:{terms}" ) merged_templates = payload.get("merged_templates", []) or [] if merged_templates: lines.extend([ "", "## 高频梗", ]) for item in merged_templates[:5]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: lines.append(f"- {text[:72]}({count}次)") return "\n".join(lines).strip() async def _render_daily_report_image(self, payload: Dict[str, Any]) -> Optional[str]: markdown = await self._build_daily_report_markdown(payload) room_id = str((payload.get("report_meta", {}) or {}).get("room_id", "") or "room") anchor_day = str((payload.get("report_meta", {}) or {}).get("anchor_day", "") or "").replace("-", "") filename = f"douyu_daily_report_{room_id}_{anchor_day}.png" try: danmu_summary = await self._generate_danmu_summary_text(payload) html_content = render_daily_report_html( payload=payload, danmu_summary=danmu_summary, operator_summary_lines=self._build_operator_summary_lines(payload), ) output_dir = os.path.join(os.getcwd(), "temp", "md2image") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, filename) await html_to_image(html_content, output_path) return str(Path(output_path).resolve()) except Exception as e: logger.error(f"斗鱼专用模板图片生成失败(room={room_id}, day={anchor_day}): {e}") try: return await convert_md_str_to_image(markdown, filename) except Exception as e: logger.error(f"斗鱼每日报告图片生成失败(room={room_id}, day={anchor_day}): {e}") return None async def _generate_daily_report_text(self, payload: Dict[str, Any]) -> str: if self._daily_report_use_llm and self._daily_report_llm_client: system_prompt, user_prompt = self._build_daily_report_prompt(payload) result = await asyncio.to_thread( self._call_daily_report_llm, task_type="daily_report", system_prompt=system_prompt, user_prompt=user_prompt, payload=payload, tag=f"douyu_daily_report_{(payload.get('report_meta', {}) or {}).get('room_id', '')}", ) if result: text = result.strip() if len(text) > self._daily_report_max_length: return text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)" return text logger.warning( f"斗鱼每日报告 LLM 生成失败: model={self._daily_report_llm_client.model}, " f"last_error={self._daily_report_llm_client.last_error}" ) return self._build_fallback_daily_report(payload) async def _get_or_create_daily_report_result( self, room_id: str, anchor_day: str, payload: Dict[str, Any], *, force_regenerate: bool = False, ) -> Dict[str, Any]: # force_regenerate=True 时,跳过本地缓存读取,直接重新生成文本/图片并覆盖缓存。 # 这样可以在模型提示词或模板变更后,通过命令立即验证最新效果。 cached = {} if force_regenerate else ( self._load_daily_report_cache(room_id, anchor_day, report_kind="operator") or {} ) cached_image = self._resolve_existing_report_image(cached.get("report_image")) cached_text = str(cached.get("report_text") or "").strip() cached_version = int(cached.get("cache_version", 0) or 0) if cached_version >= self._DAILY_REPORT_CACHE_VERSION and (cached_image or cached_text): return { "report_text": cached_text, "report_image": cached_image, "cached": True, } report_text = await self._generate_daily_report_text(payload) report_image = None if self._daily_report_send_image: report_image = await self._render_daily_report_image(payload) result = { "room_id": room_id, "anchor_day": anchor_day, "cache_version": self._DAILY_REPORT_CACHE_VERSION, "report_text": report_text, "report_image": report_image, "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } self._save_daily_report_cache(room_id, anchor_day, result, report_kind="operator") result["cached"] = False return result async def _get_or_create_fans_daily_report_result( self, room_id: str, anchor_day: str, payload: Dict[str, Any], *, force_regenerate: bool = False, ) -> Dict[str, Any]: """ 粉丝向日报使用独立缓存: 1. 避免和运营版相互覆盖; 2. 便于后续单独升级风格、模板、提示词; 3. 手动调试时也能明确区分当前命中的到底是哪一类结果。 """ cached = {} if force_regenerate else ( self._load_daily_report_cache(room_id, anchor_day, report_kind="fans") or {} ) cached_image = self._resolve_existing_report_image(cached.get("report_image")) cached_text = str(cached.get("report_text") or "").strip() cached_version = int(cached.get("cache_version", 0) or 0) if cached_version >= self._DAILY_REPORT_CACHE_VERSION and (cached_image or cached_text): return { "report_text": cached_text, "report_image": cached_image, "cached": True, } report_text = await self._generate_fans_daily_report_text(payload) report_image = None if self._daily_report_send_image: report_image = await self._render_fans_daily_report_image(payload) result = { "room_id": room_id, "anchor_day": anchor_day, "cache_version": self._DAILY_REPORT_CACHE_VERSION, "report_text": report_text, "report_image": report_image, "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } self._save_daily_report_cache(room_id, anchor_day, result, report_kind="fans") result["cached"] = False return result async def _send_daily_reports( self, anchor_day: str, target_group_id: Optional[str] = None, force: bool = False, force_regenerate: bool = False, ) -> bool: rooms = ( set(self.redis_manager.list_group_rooms(target_group_id)) if target_group_id else self.redis_manager.all_subscribed_rooms() ) if not rooms: logger.info( f"斗鱼每日报告无可处理房间: day={anchor_day}, target_group={target_group_id or 'ALL'}" ) return False delivered_any = False for room_id in rooms: if not force and self.redis_manager.get_text_value(self._daily_report_room_key(room_id, anchor_day)): logger.info(f"斗鱼每日报告已发送过,跳过: room={room_id}, day={anchor_day}") continue sessions = self._load_sessions_for_anchor_day(room_id, anchor_day) if not sessions: logger.info(f"斗鱼每日报告无 session: room={room_id}, day={anchor_day}") continue if any(bool(session.get("is_live")) for session in sessions): logger.info(f"斗鱼每日报告存在直播中场次,跳过: room={room_id}, day={anchor_day}") continue payload = self._build_daily_report_payload(room_id, anchor_day, sessions) if not payload: logger.info( f"斗鱼每日报告 payload 为空: room={room_id}, day={anchor_day}, " f"sessions={len(sessions)}, min_messages={self._daily_report_min_messages}" ) continue report_result = await self._get_or_create_daily_report_result( room_id, anchor_day, payload, force_regenerate=force_regenerate, ) report_text = str(report_result.get("report_text") or "").strip() report_image = self._resolve_existing_report_image(report_result.get("report_image")) groups = [target_group_id] if target_group_id else self.redis_manager.groups_for_room(room_id) delivered = False for gid in groups: if not gid: continue if GroupBotManager.get_group_permission(gid, self.feature) != PermissionStatus.ENABLED: continue try: if report_image: await self.bot.send_image_message(gid, Path(report_image)) else: await self.bot.send_text_message(gid, report_text) delivered = True delivered_any = True except Exception as e: logger.error(f"发送斗鱼每日报告失败(room={room_id}, group={gid}): {e}") if delivered: self.redis_manager.set_text_value( self._daily_report_room_key(room_id, anchor_day), datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) return delivered_any async def _send_fans_daily_reports( self, anchor_day: str, target_group_id: Optional[str] = None, *, force_regenerate: bool = False, ) -> bool: """ 发送粉丝向恶搞日报。 当前刻意不复用“已发送标记”: 1. 它不是原有定时任务的一部分,默认按手动召回理解; 2. 群里想反复看不同版本文案时,不会被“今天已经发过”拦住。 """ rooms = ( set(self.redis_manager.list_group_rooms(target_group_id)) if target_group_id else self.redis_manager.all_subscribed_rooms() ) if not rooms: logger.info( f"斗鱼粉丝日报无可处理房间: day={anchor_day}, target_group={target_group_id or 'ALL'}" ) return False delivered_any = False for room_id in rooms: sessions = self._load_sessions_for_anchor_day(room_id, anchor_day) if not sessions: logger.info(f"斗鱼粉丝日报无 session: room={room_id}, day={anchor_day}") continue if any(bool(session.get("is_live")) for session in sessions): logger.info(f"斗鱼粉丝日报存在直播中场次,跳过: room={room_id}, day={anchor_day}") continue payload = self._build_daily_report_payload(room_id, anchor_day, sessions) if not payload: logger.info( f"斗鱼粉丝日报 payload 为空: room={room_id}, day={anchor_day}, " f"sessions={len(sessions)}, min_messages={self._daily_report_min_messages}" ) continue report_result = await self._get_or_create_fans_daily_report_result( room_id, anchor_day, payload, force_regenerate=force_regenerate, ) report_text = str(report_result.get("report_text") or "").strip() report_image = self._resolve_existing_report_image(report_result.get("report_image")) groups = [target_group_id] if target_group_id else self.redis_manager.groups_for_room(room_id) for gid in groups: if not gid: continue if GroupBotManager.get_group_permission(gid, self.feature) != PermissionStatus.ENABLED: continue try: if report_image: await self.bot.send_image_message(gid, Path(report_image)) else: await self.bot.send_text_message(gid, report_text) delivered_any = True except Exception as e: logger.error(f"发送斗鱼粉丝日报失败(room={room_id}, group={gid}): {e}") return delivered_any def _start_danmu_record(self, room_id: str): recorder = self._get_danmu_recorder(room_id) recorder.start() def _stop_danmu_record(self, room_id: str): recorder = self._danmu_recorders.get(room_id) if recorder: recorder.stop()