From 66f4a3e60498ac0bf8a8004aaa40f27130be6812 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 8 Apr 2026 13:17:29 +0800 Subject: [PATCH] feat(douyu): add daily danmu report pipeline --- plugins/douyu/config.toml | 22 + plugins/douyu/danmu_summary.py | 839 +++++++++++++++++++++++++++++++ plugins/douyu/main.py | 771 +++++++++++++++++++++++++++- plugins/douyu/report_template.py | 485 ++++++++++++++++++ utils/markdown_to_image.py | 67 ++- 5 files changed, 2181 insertions(+), 3 deletions(-) create mode 100644 plugins/douyu/danmu_summary.py create mode 100644 plugins/douyu/report_template.py diff --git a/plugins/douyu/config.toml b/plugins/douyu/config.toml index 99fe03c..a9f2589 100644 --- a/plugins/douyu/config.toml +++ b/plugins/douyu/config.toml @@ -4,3 +4,25 @@ command = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表"] check_interval_minutes = 5 api_url_template = "https://www.douyu.com/betard/{room_id}" user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +session_cutoff_hour = 6 +merge_gap_hours = 4 +daily_report_enable = true +daily_report_time = "10:05" +daily_report_min_messages = 120 +daily_report_use_llm = false +daily_report_max_sessions = 4 +daily_report_max_length = 1800 +daily_report_send_image = true + +[Douyu.report_api] +provider = "openai_compatible" +api_base_url = "" +endpoint = "chat/completions" +api_key = "" +model = "" +timeout_seconds = 45 +temperature = 0.3 +max_tokens = 900 +stream = true +max_retries = 3 +retry_delay_seconds = 1.0 diff --git a/plugins/douyu/danmu_summary.py b/plugins/douyu/danmu_summary.py new file mode 100644 index 0000000..e6032b0 --- /dev/null +++ b/plugins/douyu/danmu_summary.py @@ -0,0 +1,839 @@ +# -*- coding: utf-8 -*- +import os +import re +from collections import Counter, defaultdict +from datetime import datetime +from typing import Any, Dict, List, Optional, Set + + +class DouyuDanmuSummaryHelper: + """斗鱼弹幕场次抽取与压缩辅助器。""" + + LINE_PATTERN = re.compile( + r"^\[(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+" + r"(?P.*?)\s+\(UID:\s*(?P[^,\)]+)(?P.*?)\):(?P.*)$" + ) + + STOPWORDS: Set[str] = { + "哈哈", "哈哈哈", "hh", "hhh", "啊啊", "啊啊啊", "可以", "这个", "那个", "真的", "就是", + "一下", "主播", "兄弟", "老铁", "大家", "我们", "你们", "他们", "不是", "什么", "怎么", + "为啥", "有点", "感觉", "这里", "那里", "然后", "今天", "刚刚", "现在", "一个", "一下子", + } + + SHORT_BURST_WORDS: Set[str] = { + "666", "6666", "牛", "牛逼", "稳", "寄", "杀", "帅", "好", "行", "绷", "哭", "乐", + "哈哈", "哈哈哈", "笑死", "卧槽", "wc", "awsl", "nb", "nbl", "c", "6", + } + NOISE_PATTERNS = [ + re.compile(r"本条弹幕.*机器人", re.I), + re.compile(r"请不要.*统计机器人数", re.I), + re.compile(r"原来直播间有.*非机器人用户", re.I), + re.compile(r"如果您的直播内容是以观看他人操作为主", re.I), + ] + TEMPLATE_HINT_PATTERNS = [ + re.compile(r"闭目不语任由"), + re.compile(r"你就忍心一辈"), + re.compile(r"刚刚偷看你直播被老板发现"), + re.compile(r"还好老板是蝙蝠侠"), + re.compile(r"原来直播间有这么多姐妹"), + re.compile(r"之前我一直不敢发弹幕"), + re.compile(r"强子也就是吃了直播的红利"), + re.compile(r"你声音好像强子"), + re.compile(r"怎么回事强子"), + re.compile(r"你是个人吗强"), + ] + TEMPLATE_MIN_LENGTH = 14 + TEMPLATE_MIN_REPEAT = 4 + REPEAT_MIN_COUNT = 3 + + @classmethod + def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]: + text = str(line or "").strip() + if not text: + return None + match = cls.LINE_PATTERN.match(text) + if not match: + return None + try: + ts = datetime.strptime(match.group("timestamp"), "%Y-%m-%d %H:%M:%S") + except Exception: + return None + return { + "timestamp": ts, + "timestamp_text": match.group("timestamp"), + "nickname": match.group("nickname").strip(), + "uid": str(match.group("uid")).strip(), + **cls._parse_profile_text(str(match.group("profile") or "")), + "content": match.group("content").strip(), + "raw": text, + } + + @classmethod + def load_session_messages(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[Dict[str, Any]]: + segments = cls._normalize_segments(session.get("segments", []) or []) + if not room_id or not segments: + return [] + + date_keys = sorted({segment["start"].strftime("%Y%m%d") for segment in segments} | + {segment["end"].strftime("%Y%m%d") for segment in segments}) + collected: List[Dict[str, Any]] = [] + for date_key in date_keys: + file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt") + if not os.path.exists(file_path): + continue + try: + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + parsed = cls.parse_danmu_line(line) + if not parsed: + continue + if cls._in_any_segment(parsed["timestamp"], segments): + collected.append(parsed) + except Exception: + continue + return collected + + @classmethod + def load_day_messages(cls, room_id: str, date_key: str, base_dir: str = "temp") -> List[Dict[str, Any]]: + file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt") + if not os.path.exists(file_path): + return [] + collected: List[Dict[str, Any]] = [] + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + parsed = cls.parse_danmu_line(line) + if parsed: + collected.append(parsed) + return collected + + @classmethod + def build_summary_material( + cls, + room_id: str, + session: Dict[str, Any], + messages: List[Dict[str, Any]], + bucket_minutes: int = 5, + top_bucket_count: int = 10, + ) -> Dict[str, Any]: + normalized = [item for item in messages if item and item.get("content")] + prepared = cls._prepare_messages(normalized) + organized_messages = prepared["organized_messages"] + unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()} + organized_unique_users = { + str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip() + } + deduped_messages = cls._dedupe_consecutive_messages(organized_messages) + burst_terms = cls._build_burst_terms(organized_messages) + top_terms = cls._extract_top_terms(organized_messages, limit=30) + bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes) + peak_buckets = sorted(bucket_stats, key=lambda item: item.get("message_count", 0), reverse=True)[:top_bucket_count] + + return { + "session_id": session.get("session_id", ""), + "room_id": room_id, + "anchor_day": session.get("anchor_day", ""), + "nickname": session.get("nickname", ""), + "room_name": session.get("room_name", ""), + "segments": cls._serialize_segments(session.get("segments", []) or []), + "message_count": len(normalized), + "noise_filtered_count": len(prepared["noise_messages"]), + "organized_message_count": len(organized_messages), + "deduped_message_count": len(deduped_messages), + "unique_user_count": len(unique_users), + "organized_unique_user_count": len(organized_unique_users), + "merged_templates": prepared["merged_templates"], + "top_terms": top_terms, + "burst_terms": burst_terms, + "time_buckets": bucket_stats, + "peak_buckets": peak_buckets, + "representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats), + "operator_metrics": cls._build_operator_metrics(normalized, organized_messages), + } + + @classmethod + def build_llm_payload( + cls, + room_id: str, + session: Dict[str, Any], + messages: List[Dict[str, Any]], + bucket_minutes: int = 5, + top_bucket_count: int = 8, + top_repeat_count: int = 24, + ) -> Dict[str, Any]: + """ + 面向 LLM 的高保真弹幕载荷。 + 规则: + 1. 仅过滤平台/机器人类系统噪音。 + 2. 相同或高度模板化的内容做聚合,不直接删除。 + 3. 其他不同内容尽量保留,并按时段/热点组织给模型。 + """ + normalized = [item for item in messages if item and item.get("content")] + prepared = cls._prepare_messages(normalized) + organized_messages = prepared["organized_messages"] + bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes) + peak_buckets = sorted( + bucket_stats, + key=lambda item: item.get("message_count", 0), + reverse=True, + )[:top_bucket_count] + + unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()} + organized_unique_users = { + str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip() + } + + return { + "session_meta": { + "room_id": room_id, + "session_id": session.get("session_id", ""), + "anchor_day": session.get("anchor_day", ""), + "nickname": session.get("nickname", ""), + "room_name": session.get("room_name", ""), + "segments": cls._serialize_segments(session.get("segments", []) or []), + "message_count": len(normalized), + "noise_filtered_count": len(prepared["noise_messages"]), + "organized_message_count": len(organized_messages), + "unique_user_count": len(unique_users), + "organized_unique_user_count": len(organized_unique_users), + }, + "operator_metrics": cls._build_operator_metrics(normalized, organized_messages), + "cleaning_rules": [ + "仅过滤系统噪音、机器人探测、平台提示类弹幕。", + "明显重复的长模板文案按内容聚合,保留出现次数、人数、首末时间。", + "其他相同内容按重复短语归并,但不抹掉不同观点和不同句式。", + "高峰时段补充原始弹幕样本,方便 LLM 还原语境。", + ], + "merged_templates": prepared["merged_templates"], + "repeated_messages": cls._build_repeated_messages( + organized_messages, + limit=top_repeat_count, + ), + "top_terms": cls._extract_top_terms(organized_messages, limit=30), + "burst_terms": cls._build_burst_terms(organized_messages), + "peak_buckets": cls._simplify_peak_buckets(peak_buckets), + "representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats), + "raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=8), + } + + @staticmethod + def _parse_profile_text(profile_text: str) -> Dict[str, Any]: + text = str(profile_text or "").strip() + room_level = 0 + fans_name = "" + fans_level = 0 + noble_name = "" + + room_match = re.search(r",\s*Lv\s*(\d+)", text, re.I) + if room_match: + try: + room_level = int(room_match.group(1)) + except Exception: + room_level = 0 + + fans_match = re.search(r"/\s*([^/]+?)\s+Lv\s*(\d+)", text, re.I) + if fans_match: + fans_name = str(fans_match.group(1) or "").strip() + try: + fans_level = int(fans_match.group(2)) + except Exception: + fans_level = 0 + + noble_match = re.search(r"/\s*(骑士|子爵|伯爵|公爵|国王|皇帝|游侠|超级皇帝|幻神)\b", text) + if noble_match: + noble_name = str(noble_match.group(1) or "").strip() + + return { + "room_level": room_level, + "fans_name": fans_name, + "fans_level": fans_level, + "noble_name": noble_name, + "has_fans_badge": bool(fans_name), + "has_noble": bool(noble_name), + } + + @classmethod + def _build_operator_metrics(cls, messages: List[Dict[str, Any]], organized_messages: List[Dict[str, Any]]) -> Dict[str, Any]: + user_profiles: Dict[str, Dict[str, Any]] = {} + user_message_count = Counter() + user_organized_count = Counter() + fans_badge_users: Set[str] = set() + noble_users: Set[str] = set() + high_room_level_users: Set[str] = set() + high_fans_level_users: Set[str] = set() + fans_badge_message_count = 0 + noble_message_count = 0 + room_level_histogram = Counter() + fans_level_histogram = Counter() + badge_user_counter = Counter() + badge_message_counter = Counter() + + for item in messages: + uid = str(item.get("uid") or "").strip() + if not uid: + continue + user_message_count[uid] += 1 + + profile = user_profiles.setdefault(uid, { + "uid": uid, + "nickname": str(item.get("nickname") or "").strip(), + "room_level": 0, + "fans_name": "", + "fans_level": 0, + "noble_name": "", + }) + room_level = int(item.get("room_level") or 0) + fans_level = int(item.get("fans_level") or 0) + fans_name = str(item.get("fans_name") or "").strip() + noble_name = str(item.get("noble_name") or "").strip() + + if room_level > int(profile.get("room_level") or 0): + profile["room_level"] = room_level + if fans_level > int(profile.get("fans_level") or 0): + profile["fans_level"] = fans_level + if fans_name and not profile.get("fans_name"): + profile["fans_name"] = fans_name + if noble_name and not profile.get("noble_name"): + profile["noble_name"] = noble_name + if profile.get("nickname") == "" and str(item.get("nickname") or "").strip(): + profile["nickname"] = str(item.get("nickname") or "").strip() + + if fans_name: + fans_badge_users.add(uid) + fans_badge_message_count += 1 + badge_user_counter[fans_name] += 0 + badge_message_counter[fans_name] += 1 + if noble_name: + noble_users.add(uid) + noble_message_count += 1 + + for uid, profile in user_profiles.items(): + room_level = int(profile.get("room_level") or 0) + fans_level = int(profile.get("fans_level") or 0) + fans_name = str(profile.get("fans_name") or "").strip() + noble_name = str(profile.get("noble_name") or "").strip() + + room_level_histogram[cls._level_bucket(room_level)] += 1 + if fans_name: + badge_user_counter[fans_name] += 1 + fans_level_histogram[cls._fans_level_bucket(fans_level)] += 1 + if noble_name: + noble_users.add(uid) + if room_level >= 30: + high_room_level_users.add(uid) + if fans_level >= 10: + high_fans_level_users.add(uid) + + for item in organized_messages: + uid = str(item.get("uid") or "").strip() + if uid: + user_organized_count[uid] += 1 + + active_users_5plus = sum(1 for uid, count in user_message_count.items() if count >= 5) + active_users_10plus = sum(1 for uid, count in user_message_count.items() if count >= 10) + + top_active_users = [] + for uid, count in user_message_count.most_common(12): + profile = user_profiles.get(uid, {}) + top_active_users.append({ + "uid": uid, + "nickname": str(profile.get("nickname") or ""), + "message_count": count, + "organized_message_count": int(user_organized_count.get(uid, 0) or 0), + "room_level": int(profile.get("room_level", 0) or 0), + "fans_name": str(profile.get("fans_name") or ""), + "fans_level": int(profile.get("fans_level", 0) or 0), + "noble_name": str(profile.get("noble_name") or ""), + }) + + top_badges = [] + for badge_name, unique_user_count in badge_user_counter.most_common(10): + if not badge_name: + continue + top_badges.append({ + "badge_name": badge_name, + "user_count": unique_user_count, + "message_count": int(badge_message_counter.get(badge_name, 0) or 0), + }) + + total_unique_users = max(len(user_profiles), 1) + return { + "active_unique_users": len(user_profiles), + "active_users_5plus": active_users_5plus, + "active_users_10plus": active_users_10plus, + "fans_badge_user_count": len(fans_badge_users), + "fans_badge_user_ratio": round(len(fans_badge_users) / total_unique_users, 4), + "fans_badge_message_count": fans_badge_message_count, + "high_room_level_user_count": len(high_room_level_users), + "high_room_level_threshold": 30, + "high_fans_level_user_count": len(high_fans_level_users), + "high_fans_level_threshold": 10, + "noble_user_count": len(noble_users), + "noble_message_count": noble_message_count, + "room_level_distribution": [ + {"bucket": bucket, "user_count": count} + for bucket, count in sorted(room_level_histogram.items()) + ], + "fans_level_distribution": [ + {"bucket": bucket, "user_count": count} + for bucket, count in sorted(fans_level_histogram.items()) + ], + "top_badges": top_badges, + "top_active_users": top_active_users, + } + + @staticmethod + def _level_bucket(level: int) -> str: + if level >= 40: + return "40+" + if level >= 30: + return "30-39" + if level >= 20: + return "20-29" + if level >= 10: + return "10-19" + return "1-9" + + @staticmethod + def _fans_level_bucket(level: int) -> str: + if level >= 20: + return "20+" + if level >= 15: + return "15-19" + if level >= 10: + return "10-14" + if level >= 5: + return "5-9" + return "1-4" + + @classmethod + def infer_sessions_from_messages( + cls, + room_id: str, + messages: List[Dict[str, Any]], + *, + session_cutoff_hour: int = 6, + merge_gap_hours: int = 4, + min_session_messages: int = 50, + ) -> List[Dict[str, Any]]: + if not messages: + return [] + ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min) + sessions: List[Dict[str, Any]] = [] + current_messages: List[Dict[str, Any]] = [] + + def flush_current(): + if len(current_messages) < min_session_messages: + return + session = cls._build_inferred_session( + room_id, + current_messages, + session_cutoff_hour=session_cutoff_hour, + ) + if session: + sessions.append(session) + + prev_dt: Optional[datetime] = None + for item in ordered: + ts = item.get("timestamp") + if not isinstance(ts, datetime): + continue + if prev_dt is not None: + gap_seconds = (ts - prev_dt).total_seconds() + if gap_seconds > merge_gap_hours * 3600: + flush_current() + current_messages = [] + current_messages.append(item) + prev_dt = ts + + flush_current() + return sessions + + @classmethod + def _build_inferred_session( + cls, + room_id: str, + messages: List[Dict[str, Any]], + *, + session_cutoff_hour: int = 6, + ) -> Optional[Dict[str, Any]]: + if not messages: + return None + ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min) + start_dt = ordered[0]["timestamp"] + end_dt = ordered[-1]["timestamp"] + anchor_dt = start_dt + if start_dt.hour < session_cutoff_hour: + from datetime import timedelta + anchor_dt = start_dt - timedelta(days=1) + anchor_day = anchor_dt.strftime("%Y-%m-%d") + segments = [] + seg_start = ordered[0]["timestamp"] + prev_dt = ordered[0]["timestamp"] + for item in ordered[1:]: + current_dt = item["timestamp"] + if (current_dt - prev_dt).total_seconds() > 30 * 60: + segments.append({ + "start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": prev_dt.strftime("%Y-%m-%d %H:%M:%S"), + }) + seg_start = current_dt + prev_dt = current_dt + segments.append({ + "start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"), + "end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"), + }) + return { + "session_id": f"{room_id}_{anchor_day.replace('-', '')}_{start_dt.strftime('%H%M%S')}", + "room_id": room_id, + "anchor_day": anchor_day, + "nickname": "", + "room_name": "", + "segments": segments, + "is_live": False, + "source": "inferred_from_danmu", + } + + @staticmethod + def _normalize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, datetime]]: + normalized = [] + for item in segments: + try: + start_dt = datetime.strptime(str(item.get("start_time") or ""), "%Y-%m-%d %H:%M:%S") + end_dt = datetime.strptime(str(item.get("end_time") or ""), "%Y-%m-%d %H:%M:%S") + except Exception: + continue + if end_dt < start_dt: + continue + normalized.append({"start": start_dt, "end": end_dt}) + return normalized + + @staticmethod + def _serialize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, str]]: + result = [] + for item in segments: + start_time = str(item.get("start_time") or "").strip() + end_time = str(item.get("end_time") or "").strip() + if start_time and end_time: + result.append({"start_time": start_time, "end_time": end_time}) + return result + + @staticmethod + def _in_any_segment(target_dt: datetime, segments: List[Dict[str, datetime]]) -> bool: + for segment in segments: + if segment["start"] <= target_dt <= segment["end"]: + return True + return False + + @staticmethod + def _dedupe_consecutive_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + result: List[Dict[str, Any]] = [] + prev_key = None + repeat_count = 0 + for item in messages: + current_key = (item.get("uid"), item.get("content")) + if current_key == prev_key: + repeat_count += 1 + result[-1]["repeat_count"] = repeat_count + continue + copied = dict(item) + copied["repeat_count"] = 1 + result.append(copied) + prev_key = current_key + repeat_count = 1 + return result + + @classmethod + def _filter_noise_messages(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + result = [] + for item in messages: + content = str(item.get("content") or "").strip() + if not content: + continue + if cls._is_noise_message(content): + continue + result.append(item) + return result + + @classmethod + def _prepare_messages(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]: + noise_messages: List[Dict[str, Any]] = [] + candidate_messages: List[Dict[str, Any]] = [] + for item in messages: + content = str(item.get("content") or "").strip() + if not content: + continue + if cls._is_noise_message(content): + noise_messages.append(item) + continue + candidate_messages.append(item) + + merged_templates, organized_messages = cls._merge_template_messages(candidate_messages) + return { + "noise_messages": noise_messages, + "merged_templates": merged_templates, + "organized_messages": organized_messages, + } + + @classmethod + def _is_noise_message(cls, content: str) -> bool: + text = str(content or "").strip() + if not text: + return True + for pattern in cls.NOISE_PATTERNS: + if pattern.search(text): + return True + if len(text) >= 30 and len(set(text)) <= 6: + return True + return False + + @classmethod + def _merge_template_messages(cls, messages: List[Dict[str, Any]]) -> (List[Dict[str, Any]], List[Dict[str, Any]]): + normalized_counter = Counter() + normalized_to_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for item in messages: + normalized = cls._normalize_template_text(str(item.get("content") or "")) + if not normalized: + continue + normalized_counter[normalized] += 1 + normalized_to_messages[normalized].append(item) + + template_keys = { + key for key, count in normalized_counter.items() + if len(key) >= cls.TEMPLATE_MIN_LENGTH and count >= cls.TEMPLATE_MIN_REPEAT + } + for key in list(normalized_counter.keys()): + if any(pattern.search(key) for pattern in cls.TEMPLATE_HINT_PATTERNS): + template_keys.add(key) + + merged_templates: List[Dict[str, Any]] = [] + organized_messages: List[Dict[str, Any]] = [] + for normalized, items in normalized_to_messages.items(): + if normalized in template_keys: + first = items[0] + merged_templates.append({ + "text": str(first.get("content") or "").strip()[:120], + "count": len(items), + "user_count": len({str(item.get('uid') or '') for item in items if str(item.get('uid') or '').strip()}), + "first_time": str(first.get("timestamp_text") or ""), + "last_time": str(items[-1].get("timestamp_text") or ""), + }) + else: + organized_messages.extend(items) + + merged_templates.sort(key=lambda item: item.get("count", 0), reverse=True) + organized_messages.sort(key=lambda item: item.get("timestamp") or datetime.min) + return merged_templates[:20], organized_messages + + @classmethod + def _build_repeated_messages(cls, messages: List[Dict[str, Any]], limit: int = 24) -> List[Dict[str, Any]]: + grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for item in messages: + content = str(item.get("content") or "").strip() + if not content: + continue + normalized = cls._normalize_template_text(content) + if not normalized: + continue + if cls._looks_like_pure_punctuation(content): + continue + grouped[normalized].append(item) + + repeated_messages: List[Dict[str, Any]] = [] + for items in grouped.values(): + if len(items) < cls.REPEAT_MIN_COUNT: + continue + first = items[0] + repeated_messages.append({ + "text": str(first.get("content") or "").strip()[:120], + "count": len(items), + "user_count": len({ + str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip() + }), + "first_time": str(first.get("timestamp_text") or ""), + "last_time": str(items[-1].get("timestamp_text") or ""), + }) + repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True) + return repeated_messages[:limit] + + @classmethod + def _build_burst_terms(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + counters: Dict[str, Dict[str, Any]] = {} + for item in messages: + content = str(item.get("content") or "").strip().lower() + if not content: + continue + if content not in cls.SHORT_BURST_WORDS and len(content) > 6: + continue + target = counters.setdefault(content, {"text": content, "count": 0, "users": set()}) + target["count"] += 1 + target["users"].add(str(item.get("uid") or "")) + result = [] + for item in sorted(counters.values(), key=lambda entry: entry["count"], reverse=True)[:15]: + result.append({ + "text": item["text"], + "count": item["count"], + "user_count": len([uid for uid in item["users"] if uid]), + }) + return result + + @classmethod + def _extract_top_terms(cls, messages: List[Dict[str, Any]], limit: int = 30) -> List[Dict[str, Any]]: + counter = Counter() + for item in messages: + for token in cls._tokenize(str(item.get("content") or "")): + counter[token] += 1 + result = [] + for term, count in counter.most_common(limit): + result.append({"term": term, "count": count}) + return result + + @classmethod + def _tokenize(cls, content: str) -> List[str]: + text = str(content or "").lower().strip() + if not text: + return [] + chinese_terms = re.findall(r"[\u4e00-\u9fff]{2,6}", text) + alpha_terms = re.findall(r"[a-z0-9_\-]{3,20}", text) + tokens = [] + for token in chinese_terms + alpha_terms: + normalized = token.strip().lower() + if not normalized or normalized in cls.STOPWORDS: + continue + if normalized.isdigit() and len(normalized) <= 2: + continue + tokens.append(normalized) + return tokens + + @staticmethod + def _normalize_template_text(content: str) -> str: + text = str(content or "").strip().lower() + if not text: + return "" + text = re.sub(r"\s+", "", text) + text = re.sub(r"[`~!@#$%^&*()_\-+=\[\]{}\\|;:'\",.<>/?,。!?、…()【】《》“”‘’·]", "", text) + text = re.sub(r"(.)\1{4,}", r"\1\1\1", text) + return text + + @classmethod + def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]: + buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + for item in messages: + ts = item.get("timestamp") + if not isinstance(ts, datetime): + continue + bucket_minute = (ts.minute // minutes) * minutes + bucket_key = ts.replace(minute=bucket_minute, second=0) + buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item) + + results: List[Dict[str, Any]] = [] + for bucket_start, items in sorted(buckets.items()): + top_terms = cls._extract_top_terms(items, limit=8) + burst_terms = cls._build_burst_terms(items)[:5] + results.append({ + "start_time": bucket_start, + "message_count": len(items), + "user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}), + "top_terms": top_terms, + "burst_terms": burst_terms, + "sample_messages": cls._pick_bucket_samples(items, limit=6), + }) + return results + + @staticmethod + def _pick_bucket_samples(items: List[Dict[str, Any]], limit: int = 6) -> List[Dict[str, str]]: + if not items: + return [] + indexes = sorted({0, len(items) // 3, len(items) // 2, (len(items) * 2) // 3, len(items) - 1}) + selected = [] + seen = set() + for idx in indexes: + item = items[idx] + content = str(item.get("content") or "").strip() + if not content or content in seen: + continue + selected.append({ + "time": str(item.get("timestamp_text") or ""), + "nickname": str(item.get("nickname") or ""), + "content": content[:80], + }) + seen.add(content) + if len(selected) >= limit: + break + return selected + + @classmethod + def _pick_representative_messages(cls, messages: List[Dict[str, Any]], buckets: List[Dict[str, Any]]) -> List[Dict[str, str]]: + selected: List[Dict[str, str]] = [] + seen = set() + + for bucket in sorted(buckets, key=lambda item: item.get("message_count", 0), reverse=True)[:6]: + for sample in bucket.get("sample_messages", []): + content = str(sample.get("content") or "").strip() + if not content or content in seen: + continue + selected.append(sample) + seen.add(content) + if len(selected) >= 18: + return selected + + for item in messages: + content = str(item.get("content") or "").strip() + if not content or content in seen: + continue + selected.append({ + "time": str(item.get("timestamp_text") or ""), + "nickname": str(item.get("nickname") or ""), + "content": content[:80], + }) + seen.add(content) + if len(selected) >= 18: + break + return selected + + @classmethod + def _build_raw_window_samples( + cls, + peak_buckets: List[Dict[str, Any]], + per_bucket_limit: int = 8, + ) -> List[Dict[str, Any]]: + windows: List[Dict[str, Any]] = [] + for bucket in peak_buckets: + samples = [] + for sample in bucket.get("sample_messages", [])[:per_bucket_limit]: + content = str(sample.get("content") or "").strip() + if not content: + continue + samples.append({ + "time": str(sample.get("time") or ""), + "nickname": str(sample.get("nickname") or ""), + "content": content, + }) + if not samples: + continue + windows.append({ + "start_time": str(bucket.get("start_time") or ""), + "message_count": int(bucket.get("message_count", 0) or 0), + "user_count": int(bucket.get("user_count", 0) or 0), + "samples": samples, + }) + return windows + + @staticmethod + def _simplify_peak_buckets(buckets: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + simplified = [] + for bucket in buckets: + simplified.append({ + "start_time": str(bucket.get("start_time") or ""), + "message_count": int(bucket.get("message_count", 0) or 0), + "user_count": int(bucket.get("user_count", 0) or 0), + "top_terms": bucket.get("top_terms", [])[:6], + "burst_terms": bucket.get("burst_terms", [])[:5], + }) + return simplified + + @staticmethod + def _looks_like_pure_punctuation(content: str) -> bool: + text = str(content or "").strip() + if not text: + return True + return re.fullmatch(r"[\W_]+", text, re.UNICODE) is not None diff --git a/plugins/douyu/main.py b/plugins/douyu/main.py index 6be5ada..8c7b143 100644 --- a/plugins/douyu/main.py +++ b/plugins/douyu/main.py @@ -1,7 +1,9 @@ import asyncio import json -from datetime import datetime +from collections import Counter +from datetime import datetime, timedelta import os +from pathlib import Path import threading import time from typing import Dict, Any, List, Optional, Tuple, Set @@ -19,9 +21,13 @@ except ImportError: from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager +from plugins.ai_auto_response.llm_client import LLMClient +from plugins.douyu.danmu_summary import DouyuDanmuSummaryHelper +from plugins.douyu.report_template import render_daily_report_html from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost +from utils.markdown_to_image import convert_md_str_to_image, html_to_image from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML @@ -316,6 +322,61 @@ class DouyuRedisManager: key = f"{self.prefix}room_status:{room_id}" return self.redis.set(key, json.dumps(status, ensure_ascii=False)) + def get_room_session(self, room_id: str, session_id: str) -> Optional[Dict[str, Any]]: + key = f"{self.prefix}room:{room_id}:session:{session_id}" + data = self.redis.get(key) + if not data: + return None + if isinstance(data, bytes): + data = data.decode("utf-8") + try: + return json.loads(data) + except Exception: + return None + + def save_room_session(self, room_id: str, session: Dict[str, Any]) -> bool: + session_id = str(session.get("session_id") or "").strip() + if not session_id: + return False + payload = json.dumps(session, ensure_ascii=False) + session_key = f"{self.prefix}room:{room_id}:session:{session_id}" + latest_key = f"{self.prefix}room:{room_id}:latest_session" + index_key = f"{self.prefix}room:{room_id}:session_ids" + pipe = self.redis.pipeline() + pipe.set(session_key, payload) + pipe.set(latest_key, session_id) + pipe.lrem(index_key, 0, session_id) + pipe.lpush(index_key, session_id) + pipe.ltrim(index_key, 0, 29) + result = pipe.execute() + return bool(result) + + def get_latest_room_session(self, room_id: str) -> Optional[Dict[str, Any]]: + latest_key = f"{self.prefix}room:{room_id}:latest_session" + session_id = self.redis.get(latest_key) + if not session_id: + return None + if isinstance(session_id, bytes): + session_id = session_id.decode("utf-8") + return self.get_room_session(room_id, str(session_id)) + + def list_room_session_ids(self, room_id: str, limit: int = 10) -> List[str]: + key = f"{self.prefix}room:{room_id}:session_ids" + rows = self.redis.lrange(key, 0, max(limit - 1, 0)) or [] + result = [] + for row in rows: + result.append(row.decode("utf-8") if isinstance(row, bytes) else str(row)) + return result + + def get_text_value(self, key: str) -> Optional[str]: + data = self.redis.get(key) + if not data: + return None + return data.decode("utf-8") if isinstance(data, bytes) else str(data) + + def set_text_value(self, key: str, value: str) -> bool: + return bool(self.redis.set(key, value)) + class DouyuPlugin(MessagePluginInterface): FEATURE_KEY = "DOUYU_MONITOR" @@ -364,18 +425,44 @@ class DouyuPlugin(MessagePluginInterface): self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList" self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" self._check_interval = 5 + self._session_cutoff_hour = 6 + self._merge_gap_hours = 4 + self._daily_report_enable = True + self._daily_report_time = "10:05" + self._daily_report_min_messages = 120 + self._daily_report_use_llm = False + self._daily_report_max_sessions = 4 + self._daily_report_max_length = 1800 + self._daily_report_send_image = True + self._daily_report_llm_client: Optional[LLMClient] = None self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {} async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) + async_job.every_minutes(5)(self._scheduled_daily_report_tick) async def _scheduled_unified_check_job(self): """统一检查直播和鱼吧动态""" await self._scheduled_check_job() await self._scheduled_yuba_check_job() + async def _scheduled_daily_report_tick(self): + """每 5 分钟检查一次,命中配置时间后发送前一天日报。""" + if not self._daily_report_enable or not self.redis_manager or not self.bot: + return + now_dt = datetime.now() + if not self._should_run_daily_report(now_dt): + return + anchor_day = (now_dt - timedelta(days=1)).strftime("%Y-%m-%d") + try: + await self._send_daily_reports(anchor_day) + self.redis_manager.set_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")), now_dt.strftime("%Y-%m-%d %H:%M:%S")) + except Exception as e: + logger.error(f"斗鱼每日报告任务失败(anchor_day={anchor_day}): {e}") + def initialize(self, context: Dict[str, Any]) -> bool: try: dbm = DBConnectionManager.get_instance() self.redis_manager = DouyuRedisManager(dbm) + self.bot = context.get("bot", self.bot) cfg = self._config.get("Douyu", {}) cfg_cmds = cfg.get("command", []) if isinstance(cfg_cmds, list) and cfg_cmds: @@ -383,6 +470,21 @@ class DouyuPlugin(MessagePluginInterface): self._api_template = cfg.get("api_url_template", self._api_template) self._user_agent = cfg.get("user_agent", self._user_agent) self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval)) + self._session_cutoff_hour = int(cfg.get("session_cutoff_hour", self._session_cutoff_hour)) + self._merge_gap_hours = int(cfg.get("merge_gap_hours", self._merge_gap_hours)) + self._daily_report_enable = bool(cfg.get("daily_report_enable", self._daily_report_enable)) + self._daily_report_time = str(cfg.get("daily_report_time", self._daily_report_time) or self._daily_report_time) + self._daily_report_min_messages = int( + cfg.get("daily_report_min_messages", self._daily_report_min_messages) + ) + self._daily_report_use_llm = bool(cfg.get("daily_report_use_llm", self._daily_report_use_llm)) + self._daily_report_max_sessions = int(cfg.get("daily_report_max_sessions", self._daily_report_max_sessions)) + self._daily_report_max_length = int(cfg.get("daily_report_max_length", self._daily_report_max_length)) + self._daily_report_send_image = bool(cfg.get("daily_report_send_image", self._daily_report_send_image)) + + report_api_cfg = cfg.get("report_api", {}) or {} + if report_api_cfg: + self._daily_report_llm_client = LLMClient(report_api_cfg) return True except Exception as e: logger.error(f"{self.name} 初始化失败: {e}") @@ -544,6 +646,12 @@ class DouyuPlugin(MessagePluginInterface): continue if prev_live is True and curr_live is True and room_id not in self._danmu_recorders: try: + session = self._open_or_resume_session(room_id, nickname, room_name) + if session: + logger.info( + f"检测到持续直播状态,续接斗鱼直播会话({room_id}): " + f"session={session.get('session_id')}" + ) logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: @@ -573,6 +681,12 @@ class DouyuPlugin(MessagePluginInterface): logger.error(f"发送斗鱼开播提醒失败: {e}") continue try: + session = self._open_or_resume_session(room_id, nickname, room_name) + if session: + logger.info( + f"斗鱼直播会话开启/续接: room={room_id}, session={session.get('session_id')}, " + f"segments={len(session.get('segments', []))}, anchor_day={session.get('anchor_day')}" + ) logger.info(f"启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: @@ -591,6 +705,12 @@ class DouyuPlugin(MessagePluginInterface): logger.error(f"发送斗鱼下播提醒失败: {e}") continue try: + session = self._close_active_session(room_id, nickname, room_name) + if session: + logger.info( + f"斗鱼直播会话关闭片段: room={room_id}, session={session.get('session_id')}, " + f"segments={len(session.get('segments', []))}, is_live={session.get('is_live')}" + ) logger.info(f"停止斗鱼弹幕记录({room_id})") self._stop_danmu_record(room_id) except Exception as e: @@ -685,6 +805,655 @@ class DouyuPlugin(MessagePluginInterface): self._danmu_recorders[room_id] = recorder return recorder + def _resolve_anchor_day(self, target_dt: datetime) -> str: + if target_dt.hour < self._session_cutoff_hour: + target_dt = target_dt - timedelta(days=1) + return target_dt.strftime("%Y-%m-%d") + + @staticmethod + def _parse_session_time(value: str) -> Optional[datetime]: + if not value: + return None + try: + return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S") + except Exception: + return None + + @staticmethod + def _find_open_segment(session: Dict[str, Any]) -> Optional[Dict[str, Any]]: + for segment in reversed(session.get("segments", []) or []): + if not str(segment.get("end_time") or "").strip(): + return segment + return None + + def _should_merge_with_latest_session(self, latest_session: Optional[Dict[str, Any]], now_dt: datetime) -> bool: + if not latest_session: + return False + if latest_session.get("is_live"): + return True + segments = latest_session.get("segments", []) or [] + if not segments: + return False + last_segment = segments[-1] + end_dt = self._parse_session_time(last_segment.get("end_time", "")) + if not end_dt: + return False + gap_seconds = (now_dt - end_dt).total_seconds() + return 0 <= gap_seconds <= self._merge_gap_hours * 3600 + + def _open_or_resume_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]: + if not self.redis_manager: + return None + now_dt = datetime.now() + now_str = now_dt.strftime("%Y-%m-%d %H:%M:%S") + latest_session = self.redis_manager.get_latest_room_session(room_id) or {} + + if self._should_merge_with_latest_session(latest_session, now_dt): + session = dict(latest_session) + open_segment = self._find_open_segment(session) + if not open_segment: + segments = list(session.get("segments", []) or []) + segments.append({"start_time": now_str, "end_time": ""}) + session["segments"] = segments + else: + anchor_day = self._resolve_anchor_day(now_dt) + session = { + "session_id": f"{room_id}_{anchor_day.replace('-', '')}_{now_dt.strftime('%H%M%S')}", + "room_id": room_id, + "anchor_day": anchor_day, + "nickname": nickname, + "room_name": room_name, + "segments": [{"start_time": now_str, "end_time": ""}], + "is_live": True, + "summary_status": "pending", + "summary_generated_at": "", + "created_at": now_str, + } + + session["nickname"] = nickname or session.get("nickname", "") + session["room_name"] = room_name or session.get("room_name", "") + session["is_live"] = True + session["updated_at"] = now_str + session["last_live_at"] = now_str + self.redis_manager.save_room_session(room_id, session) + return session + + def _close_active_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]: + if not self.redis_manager: + return None + session = self.redis_manager.get_latest_room_session(room_id) + if not session: + return None + + now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + open_segment = self._find_open_segment(session) + if open_segment: + open_segment["end_time"] = now_str + + session["nickname"] = nickname or session.get("nickname", "") + session["room_name"] = room_name or session.get("room_name", "") + session["is_live"] = False + session["updated_at"] = now_str + session["last_offline_at"] = now_str + self.redis_manager.save_room_session(room_id, session) + return session + + def get_room_session(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + if not self.redis_manager or not room_id: + return None + if session_id: + return self.redis_manager.get_room_session(room_id, session_id) + return self.redis_manager.get_latest_room_session(room_id) + + def build_session_danmu_material(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + """ + 旁路能力:从已有日文件中按直播 session 抽取有效弹幕,并压缩成可供后续总结使用的材料。 + 当前不影响提醒、采集、群消息发送主流程。 + """ + session = self.get_room_session(room_id, session_id=session_id) + if not session: + return None + messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) + material = DouyuDanmuSummaryHelper.build_summary_material(room_id, session, messages) + material["session"] = { + "session_id": session.get("session_id", ""), + "anchor_day": session.get("anchor_day", ""), + "nickname": session.get("nickname", ""), + "room_name": session.get("room_name", ""), + "is_live": bool(session.get("is_live")), + } + return material + + def build_session_llm_payload(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + """ + 旁路能力:构造可直接发送给 LLM 的弹幕总结载荷。 + 不改变现有弹幕采集和通知主流程。 + """ + session = self.get_room_session(room_id, session_id=session_id) + if not session: + return None + messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) + return DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages) + + def _daily_report_job_key(self, day_key: str) -> str: + return f"{self.redis_manager.prefix}daily_report_job:{day_key}" + + def _daily_report_room_key(self, room_id: str, anchor_day: str) -> str: + return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}" + + def _should_run_daily_report(self, now_dt: datetime) -> bool: + time_text = str(self._daily_report_time or "").strip() + try: + target_hour, target_minute = [int(part) for part in time_text.split(":", 1)] + except Exception: + return False + target_dt = now_dt.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + if now_dt < target_dt or now_dt > target_dt + timedelta(minutes=4, seconds=59): + return False + last_run = self.redis_manager.get_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d"))) + return not last_run + + def _load_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]: + if not self.redis_manager: + return [] + sessions = [] + for session_id in self.redis_manager.list_room_session_ids(room_id, limit=30): + session = self.redis_manager.get_room_session(room_id, session_id) + if not session: + continue + if str(session.get("anchor_day") or "") != anchor_day: + continue + sessions.append(session) + sessions.sort( + key=lambda item: str(((item.get("segments") or [{}])[0]).get("start_time", "")), + ) + return sessions[:self._daily_report_max_sessions] + + def _build_daily_report_payload(self, room_id: str, anchor_day: str, sessions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + if not sessions: + return None + + session_payloads: List[Dict[str, Any]] = [] + total_message_count = 0 + total_noise_filtered_count = 0 + total_organized_message_count = 0 + total_unique_users: Set[str] = set() + merged_templates: List[Dict[str, Any]] = [] + repeated_messages: List[Dict[str, Any]] = [] + peak_buckets: List[Dict[str, Any]] = [] + representative_messages: List[Dict[str, Any]] = [] + raw_window_samples: List[Dict[str, Any]] = [] + top_terms_counter = Counter() + burst_terms_counter = Counter() + operator_totals = { + "fans_badge_user_count": 0, + "fans_badge_message_count": 0, + "high_room_level_user_count": 0, + "high_fans_level_user_count": 0, + "noble_user_count": 0, + "noble_message_count": 0, + "active_users_5plus": 0, + "active_users_10plus": 0, + } + top_badge_counter = Counter() + top_badge_message_counter = Counter() + + nickname = "" + room_name = "" + + for session in sessions: + messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session) + if len(messages) < self._daily_report_min_messages: + continue + payload = DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages) + session_payloads.append(payload) + meta = payload.get("session_meta", {}) or {} + operator_metrics = payload.get("operator_metrics", {}) or {} + total_message_count += int(meta.get("message_count", 0) or 0) + total_noise_filtered_count += int(meta.get("noise_filtered_count", 0) or 0) + total_organized_message_count += int(meta.get("organized_message_count", 0) or 0) + nickname = nickname or str(meta.get("nickname") or session.get("nickname") or "") + room_name = room_name or str(meta.get("room_name") or session.get("room_name") or "") + for key in operator_totals: + operator_totals[key] += int(operator_metrics.get(key, 0) or 0) + + for item in payload.get("merged_templates", []) or []: + merged_templates.append(dict(item)) + for item in payload.get("repeated_messages", []) or []: + repeated_messages.append(dict(item)) + for item in payload.get("peak_buckets", []) or []: + peak_buckets.append(dict(item)) + for item in payload.get("representative_messages", []) or []: + representative_messages.append(dict(item)) + for item in payload.get("raw_window_samples", []) or []: + raw_window_samples.append(dict(item)) + for item in payload.get("top_terms", []) or []: + term = str(item.get("term") or "").strip() + if term: + top_terms_counter[term] += int(item.get("count", 0) or 0) + for item in payload.get("burst_terms", []) or []: + term = str(item.get("text") or "").strip() + if term: + burst_terms_counter[term] += int(item.get("count", 0) or 0) + for item in operator_metrics.get("top_badges", []) or []: + badge_name = str(item.get("badge_name") or "").strip() + if badge_name: + top_badge_counter[badge_name] += int(item.get("user_count", 0) or 0) + top_badge_message_counter[badge_name] += int(item.get("message_count", 0) or 0) + + for session_message in messages: + uid = str(session_message.get("uid") or "").strip() + if uid: + total_unique_users.add(uid) + + if not session_payloads: + return None + + merged_templates.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) + repeated_messages.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True) + peak_buckets.sort(key=lambda item: int(item.get("message_count", 0) or 0), reverse=True) + + artifact_dir = os.path.join("temp", "douyu_materials") + os.makedirs(artifact_dir, exist_ok=True) + payload = { + "report_meta": { + "room_id": room_id, + "anchor_day": anchor_day, + "nickname": nickname, + "room_name": room_name, + "session_count": len(session_payloads), + "message_count": total_message_count, + "noise_filtered_count": total_noise_filtered_count, + "organized_message_count": total_organized_message_count, + "unique_user_count": len(total_unique_users), + }, + "operator_metrics": { + **operator_totals, + "fans_badge_user_ratio": round(operator_totals["fans_badge_user_count"] / max(len(total_unique_users), 1), 4), + "top_badges": [ + { + "badge_name": badge_name, + "user_count": user_count, + "message_count": int(top_badge_message_counter.get(badge_name, 0) or 0), + } + for badge_name, user_count in top_badge_counter.most_common(10) + ], + }, + "sessions": [ + { + "session_id": (item.get("session_meta", {}) or {}).get("session_id", ""), + "segments": (item.get("session_meta", {}) or {}).get("segments", []), + "message_count": (item.get("session_meta", {}) or {}).get("message_count", 0), + "organized_message_count": (item.get("session_meta", {}) or {}).get("organized_message_count", 0), + } + for item in session_payloads + ], + "merged_templates": merged_templates[:24], + "repeated_messages": repeated_messages[:24], + "top_terms": [{"term": term, "count": count} for term, count in top_terms_counter.most_common(24)], + "burst_terms": [{"text": term, "count": count} for term, count in burst_terms_counter.most_common(16)], + "peak_buckets": peak_buckets[:10], + "representative_messages": representative_messages[:24], + "raw_window_samples": raw_window_samples[:10], + } + artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json") + with open(artifact_path, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + return payload + + def _build_daily_report_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]: + meta = payload.get("report_meta", {}) or {} + system_prompt = ( + "你是斗鱼直播日报助手。请基于给定的结构化弹幕材料,输出一份适合发群的中文日报。" + "要求简洁、自然、信息密度高,不要编造,不要使用代码块。" + ) + user_prompt = ( + "请输出一份斗鱼每日报告,格式要求:\n" + "1. 第一行写标题,包含主播名和日期。\n" + "2. 用 3-5 条概括直播主线、弹幕情绪、观众关注点。\n" + "3. 单独补充运营视角观察,比如带牌活跃用户、高等级用户、核心发言用户、活跃牌子分布。\n" + "4. 单独列出高频梗/复读内容(不超过 5 条)。\n" + "5. 单独列出 2-3 个热点时段。\n" + "6. 整体控制在 600 字以内。\n\n" + f"材料如下:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" + ) + return system_prompt, user_prompt + + def _build_danmu_summary_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]: + meta = payload.get("report_meta", {}) or {} + system_prompt = ( + "你是直播弹幕总结助手。请只根据给定材料,总结这场直播的弹幕内容与氛围。" + "不要输出运营数据,不要编造,不要写空话套话。" + ) + user_prompt = ( + "请输出一段适合放在日报图片上半部分的弹幕总结,要求:\n" + "1. 先用 1 段总述直播氛围与主线。\n" + "2. 再用 3-5 条要点总结观众关注点、情绪变化、反复出现的梗。\n" + "3. 语言像运营复盘,简洁自然。\n" + "4. 不要写标题,不要写“根据数据”。\n\n" + f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n" + f"日期:{meta.get('anchor_day', '')}\n" + f"材料:\n{json.dumps(payload, ensure_ascii=False, indent=2)}" + ) + return system_prompt, user_prompt + + def _build_fallback_daily_report(self, payload: Dict[str, Any]) -> str: + meta = payload.get("report_meta", {}) or {} + title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") + lines = [ + f"斗鱼每日报告 | {title_name} | {meta.get('anchor_day', '')}", + f"共 {meta.get('session_count', 0)} 场,弹幕 {meta.get('message_count', 0)} 条,参与用户 {meta.get('unique_user_count', 0)} 人。", + ] + operator_metrics = payload.get("operator_metrics", {}) or {} + + sessions = payload.get("sessions", []) or [] + if sessions: + session_parts = [] + for item in sessions[:4]: + segments = item.get("segments", []) or [] + if not segments: + continue + start_time = str(segments[0].get("start_time", ""))[-8:-3] + end_time = str(segments[-1].get("end_time", ""))[-8:-3] + session_parts.append(f"{start_time}-{end_time}") + if session_parts: + lines.append("场次时间:" + " / ".join(session_parts)) + + top_terms = payload.get("top_terms", []) or [] + if top_terms: + lines.append("关注焦点:" + "、".join([str(item.get("term") or "") for item in top_terms[:8] if str(item.get("term") or "").strip()])) + + if operator_metrics: + op_parts = [] + fans_badge_user_count = int(operator_metrics.get("fans_badge_user_count", 0) or 0) + high_room_level_user_count = int(operator_metrics.get("high_room_level_user_count", 0) or 0) + high_fans_level_user_count = int(operator_metrics.get("high_fans_level_user_count", 0) or 0) + active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0) + if fans_badge_user_count: + op_parts.append(f"带牌活跃用户 {fans_badge_user_count}") + if high_room_level_user_count: + op_parts.append(f"30级+活跃用户 {high_room_level_user_count}") + if high_fans_level_user_count: + op_parts.append(f"10级+粉丝牌用户 {high_fans_level_user_count}") + if active_users_10plus: + op_parts.append(f"高活跃核心用户 {active_users_10plus}") + if op_parts: + lines.append("运营侧:" + ",".join(op_parts)) + + top_badges = operator_metrics.get("top_badges", []) or [] + if top_badges: + lines.append("活跃粉丝牌:") + for item in top_badges[:5]: + badge_name = str(item.get("badge_name") or "").strip() + user_count = int(item.get("user_count", 0) or 0) + message_count = int(item.get("message_count", 0) or 0) + if badge_name: + lines.append(f"- {badge_name}:{user_count}人,{message_count}条") + + merged_templates = payload.get("merged_templates", []) or [] + if merged_templates: + lines.append("高频梗:") + for item in merged_templates[:5]: + text = str(item.get("text") or "").strip() + count = int(item.get("count", 0) or 0) + if text: + lines.append(f"- {text[:42]}({count}次)") + + peak_buckets = payload.get("peak_buckets", []) or [] + if peak_buckets: + lines.append("热点时段:") + for item in peak_buckets[:3]: + start_time = str(item.get("start_time") or "")[-8:-3] + message_count = int(item.get("message_count", 0) or 0) + terms = "、".join( + [str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] + ) + lines.append(f"- {start_time},{message_count}条,关键词:{terms}") + + representative_messages = payload.get("representative_messages", []) or [] + if representative_messages: + lines.append("代表弹幕:") + for item in representative_messages[:4]: + nickname = str(item.get("nickname") or "").strip() + content = str(item.get("content") or "").strip() + if content: + lines.append(f"- {nickname}:{content[:60]}") + + text = "\n".join(lines).strip() + if len(text) > self._daily_report_max_length: + text = text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)" + return text + + def _build_fallback_danmu_summary(self, payload: Dict[str, Any]) -> str: + meta = payload.get("report_meta", {}) or {} + top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or [])[:6] if str(item.get("term") or "").strip()] + merged_templates = payload.get("merged_templates", []) or [] + peak_buckets = payload.get("peak_buckets", []) or [] + representative_messages = payload.get("representative_messages", []) or [] + + lines = [ + f"{meta.get('anchor_day', '')} 这场直播弹幕整体比较密集,讨论重心主要围绕 {'、'.join(top_terms[:4]) or '对局过程'} 展开,观众互动意愿较强,梗和复读内容持续出现。" + ] + if merged_templates: + sample_templates = ";".join( + [str(item.get("text") or "").strip()[:26] for item in merged_templates[:3] if str(item.get("text") or "").strip()] + ) + if sample_templates: + lines.append(f"- 主线观察:直播间共识梗很强,重复刷屏内容主要集中在 {sample_templates}。") + for item in merged_templates[:4]: + break + if peak_buckets: + top_bucket = peak_buckets[0] + terms = "、".join( + [str(term.get("term") or "") for term in (top_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] + ) + lines.append( + f"- 节奏变化:高峰集中在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,单时段弹幕 {int(top_bucket.get('message_count', 0) or 0)} 条,关键词偏向 {terms}。" + ) + if len(peak_buckets) > 1: + second_bucket = peak_buckets[1] + second_terms = "、".join( + [str(term.get("term") or "") for term in (second_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] + ) + lines.append( + f"- 热点补充:{str(second_bucket.get('start_time') or '')[-8:-3]} 也出现明显抬升,弹幕讨论继续围绕 {second_terms} 展开。" + ) + if representative_messages: + lines.append("- 情绪特点:代表性发言里既有对操作和决策的即时反馈,也有大量玩梗、调侃和情绪宣泄。") + if top_terms: + lines.append(f"- 关注焦点:高频词主要落在 {'、'.join(top_terms[:6])},说明观众注意力相对集中。") + return "\n".join(lines).strip() + + def _build_operator_summary_text(self, payload: Dict[str, Any]) -> str: + meta = payload.get("report_meta", {}) or {} + operator_metrics = payload.get("operator_metrics", {}) or {} + total_users = int(meta.get("unique_user_count", 0) or 0) + fans_badge_users = int(operator_metrics.get("fans_badge_user_count", 0) or 0) + high_room_users = int(operator_metrics.get("high_room_level_user_count", 0) or 0) + high_fans_users = int(operator_metrics.get("high_fans_level_user_count", 0) or 0) + active_users_5plus = int(operator_metrics.get("active_users_5plus", 0) or 0) + active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0) + fans_badge_ratio = float(operator_metrics.get("fans_badge_user_ratio", 0) or 0) + + lines = [ + f"- 活跃用户规模:{total_users} 人,其中发言 5 次以上 {active_users_5plus} 人,10 次以上 {active_users_10plus} 人。", + f"- 粉丝粘性:带粉丝牌活跃用户 {fans_badge_users} 人,占活跃用户 {fans_badge_ratio * 100:.1f}%;10 级以上粉丝牌用户 {high_fans_users} 人。", + f"- 用户质量:房间等级 30 级以上活跃用户 {high_room_users} 人,说明高等级老观众参与度不低。", + ] + + top_badges = payload.get("operator_metrics", {}).get("top_badges", []) or [] + if top_badges: + badge_parts = [] + for item in top_badges[:5]: + badge_name = str(item.get("badge_name") or "").strip() + if not badge_name: + continue + badge_parts.append(f"{badge_name} {int(item.get('user_count', 0) or 0)}人/{int(item.get('message_count', 0) or 0)}条") + if badge_parts: + lines.append(f"- 活跃牌子分布:{';'.join(badge_parts)}。") + + top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or [] + if top_active_users: + core_parts = [] + for item in top_active_users[:5]: + nickname = str(item.get("nickname") or item.get("uid") or "").strip() + msg_count = int(item.get("message_count", 0) or 0) + fans_name = str(item.get("fans_name") or "").strip() + if fans_name: + core_parts.append(f"{nickname}({fans_name},{msg_count}条)") + else: + core_parts.append(f"{nickname}({msg_count}条)") + if core_parts: + lines.append(f"- 核心发言用户:{';'.join(core_parts)}。") + + return "\n".join(lines).strip() + + def _build_operator_summary_lines(self, payload: Dict[str, Any]) -> List[str]: + return [line.strip()[2:].strip() for line in self._build_operator_summary_text(payload).splitlines() if line.strip().startswith("- ")] + + async def _generate_danmu_summary_text(self, payload: Dict[str, Any]) -> str: + if self._daily_report_use_llm and self._daily_report_llm_client: + system_prompt, user_prompt = self._build_danmu_summary_prompt(payload) + result = await asyncio.to_thread( + self._daily_report_llm_client.chat, + system_prompt, + user_prompt, + f"douyu_danmu_summary_{(payload.get('report_meta', {}) or {}).get('room_id', '')}", + ) + if result: + return result.strip() + logger.warning( + f"斗鱼弹幕总结 LLM 生成失败: model={self._daily_report_llm_client.model}, " + f"last_error={self._daily_report_llm_client.last_error}" + ) + return self._build_fallback_danmu_summary(payload) + + async def _build_daily_report_markdown(self, payload: Dict[str, Any]) -> str: + meta = payload.get("report_meta", {}) or {} + title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") + danmu_summary = await self._generate_danmu_summary_text(payload) + operator_summary = self._build_operator_summary_text(payload) + + lines = [ + f"# {title_name} 直播每日报告", + f"{meta.get('anchor_day', '')}|场次 {meta.get('session_count', 0)}|弹幕 {meta.get('message_count', 0)}|活跃用户 {meta.get('unique_user_count', 0)}", + "", + "## 弹幕总结", + danmu_summary, + "", + "## 运营数据总结", + operator_summary, + ] + + peak_buckets = payload.get("peak_buckets", []) or [] + if peak_buckets: + lines.extend([ + "", + "## 热点时段", + ]) + for item in peak_buckets[:3]: + terms = "、".join( + [str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()] + ) + lines.append( + f"- `{str(item.get('start_time') or '')[-8:-3]}` 弹幕 {int(item.get('message_count', 0) or 0)} 条,关键词:{terms}" + ) + + merged_templates = payload.get("merged_templates", []) or [] + if merged_templates: + lines.extend([ + "", + "## 高频梗", + ]) + for item in merged_templates[:5]: + text = str(item.get("text") or "").strip() + count = int(item.get("count", 0) or 0) + if text: + lines.append(f"- {text[:72]}({count}次)") + + return "\n".join(lines).strip() + + async def _render_daily_report_image(self, payload: Dict[str, Any]) -> Optional[str]: + markdown = await self._build_daily_report_markdown(payload) + room_id = str((payload.get("report_meta", {}) or {}).get("room_id", "") or "room") + anchor_day = str((payload.get("report_meta", {}) or {}).get("anchor_day", "") or "").replace("-", "") + filename = f"douyu_daily_report_{room_id}_{anchor_day}.png" + try: + danmu_summary = await self._generate_danmu_summary_text(payload) + html_content = render_daily_report_html( + payload=payload, + danmu_summary=danmu_summary, + operator_summary_lines=self._build_operator_summary_lines(payload), + ) + output_dir = os.path.join(os.getcwd(), "temp", "md2image") + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join(output_dir, filename) + await html_to_image(html_content, output_path) + return str(Path(output_path).resolve()) + except Exception as e: + logger.error(f"斗鱼专用模板图片生成失败(room={room_id}, day={anchor_day}): {e}") + try: + return await convert_md_str_to_image(markdown, filename) + except Exception as e: + logger.error(f"斗鱼每日报告图片生成失败(room={room_id}, day={anchor_day}): {e}") + return None + + async def _generate_daily_report_text(self, payload: Dict[str, Any]) -> str: + if self._daily_report_use_llm and self._daily_report_llm_client: + system_prompt, user_prompt = self._build_daily_report_prompt(payload) + result = await asyncio.to_thread( + self._daily_report_llm_client.chat, + system_prompt, + user_prompt, + f"douyu_daily_report_{(payload.get('report_meta', {}) or {}).get('room_id', '')}", + ) + if result: + text = result.strip() + if len(text) > self._daily_report_max_length: + return text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)" + return text + logger.warning( + f"斗鱼每日报告 LLM 生成失败: model={self._daily_report_llm_client.model}, " + f"last_error={self._daily_report_llm_client.last_error}" + ) + return self._build_fallback_daily_report(payload) + + async def _send_daily_reports(self, anchor_day: str): + rooms = self.redis_manager.all_subscribed_rooms() + for room_id in rooms: + if self.redis_manager.get_text_value(self._daily_report_room_key(room_id, anchor_day)): + continue + sessions = self._load_sessions_for_anchor_day(room_id, anchor_day) + if not sessions: + continue + if any(bool(session.get("is_live")) for session in sessions): + continue + payload = self._build_daily_report_payload(room_id, anchor_day, sessions) + if not payload: + continue + report_text = await self._generate_daily_report_text(payload) + report_image = None + if self._daily_report_send_image: + report_image = await self._render_daily_report_image(payload) + groups = self.redis_manager.groups_for_room(room_id) + delivered = False + for gid in groups: + if GroupBotManager.get_group_permission(gid, self.feature) != PermissionStatus.ENABLED: + continue + try: + if report_image: + await self.bot.send_image_message(gid, Path(report_image)) + else: + await self.bot.send_text_message(gid, report_text) + delivered = True + except Exception as e: + logger.error(f"发送斗鱼每日报告失败(room={room_id}, group={gid}): {e}") + if delivered: + self.redis_manager.set_text_value( + self._daily_report_room_key(room_id, anchor_day), + datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + ) + def _start_danmu_record(self, room_id: str): recorder = self._get_danmu_recorder(room_id) recorder.start() diff --git a/plugins/douyu/report_template.py b/plugins/douyu/report_template.py new file mode 100644 index 0000000..da06d65 --- /dev/null +++ b/plugins/douyu/report_template.py @@ -0,0 +1,485 @@ +# -*- coding: utf-8 -*- +import html +from typing import Any, Dict, List + + +def _escape(value: Any) -> str: + return html.escape(str(value or "")) + + +def _render_metric_card(label: str, value: Any, hint: str = "") -> str: + return ( + '
' + f'
{_escape(label)}
' + f'
{_escape(value)}
' + f'
{_escape(hint)}
' + "
" + ) + + +def _render_list(items: List[str], item_class: str = "bullet-list") -> str: + if not items: + return "" + lis = "".join(f'
  • {_escape(item)}
  • ' for item in items if str(item or "").strip()) + return f'
      {lis}
    ' if lis else "" + + +def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str]]: + lead_parts = [] + insight_items = [] + for line in str(danmu_summary or "").splitlines(): + stripped = line.strip() + if not stripped: + continue + if stripped.startswith("- "): + insight_items.append(stripped[2:].strip()) + else: + lead_parts.append(stripped) + lead = " ".join(lead_parts).strip() + return lead, insight_items + + +def _render_insight_cards(items: List[str]) -> str: + labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"] + blocks = [] + for idx, item in enumerate(items[:6]): + blocks.append( + '
    ' + f'
    {_escape(labels[idx] if idx < len(labels) else "观察")}
    ' + f'
    {_escape(item)}
    ' + "
    " + ) + return "".join(blocks) + + +def _render_badges(top_badges: List[Dict[str, Any]]) -> str: + blocks = [] + for item in top_badges[:6]: + badge_name = str(item.get("badge_name") or "").strip() + if not badge_name: + continue + blocks.append( + '
    ' + f'{_escape(badge_name)}' + f'{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条' + "
    " + ) + return "".join(blocks) + + +def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str: + blocks = [] + for item in peak_buckets[:3]: + start_time = str(item.get("start_time") or "")[-8:-3] + terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]] + terms = [term for term in terms if term] + blocks.append( + '
    ' + f'
    {_escape(start_time)}
    ' + f'
    {_escape(item.get("message_count", 0))} 条弹幕
    ' + f'
    {_escape(" / ".join(terms))}
    ' + "
    " + ) + return "".join(blocks) + + +def render_daily_report_html( + payload: Dict[str, Any], + danmu_summary: str, + operator_summary_lines: List[str], +) -> str: + meta = payload.get("report_meta", {}) or {} + operator = payload.get("operator_metrics", {}) or {} + title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") + subtitle = ( + f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}" + f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}" + ) + + metrics_html = "".join([ + _render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"), + _render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"), + _render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"), + _render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"), + ]) + + merged_templates = payload.get("merged_templates", []) or [] + template_items = [ + f"{str(item.get('text') or '').strip()[:72]}({int(item.get('count', 0) or 0)}次)" + for item in merged_templates[:5] + if str(item.get("text") or "").strip() + ] + top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or [] + active_user_items = [] + for item in top_active_users[:10]: + nickname = str(item.get("nickname") or item.get("uid") or "").strip() + fans_name = str(item.get("fans_name") or "").strip() + message_count = int(item.get("message_count", 0) or 0) + if fans_name: + active_user_items.append(f"{nickname} | {fans_name} | {message_count}条") + else: + active_user_items.append(f"{nickname} | {message_count}条") + + lead_summary, danmu_bullets = _split_summary_blocks(danmu_summary) + + html_doc = f""" + + + + + + +
    +
    +
    DOUYU DAILY REPORT
    +
    {_escape(title_name)}
    +
    {_escape(subtitle)}
    +
    +
    +
    + {metrics_html} +
    + +
    +
    弹幕总结
    +
    +
    +
    +
    整体观察
    +
    {_escape(lead_summary)}
    +
    +
    + {_render_insight_cards(danmu_bullets)} +
    +
    +
    +
    高频梗
    + {_render_list(template_items)} +
    +
    + +
    + {_render_hot_times(payload.get("peak_buckets", []) or [])} +
    +
    + +
    +
    运营数据总结
    +
    +
    + {_render_list(operator_summary_lines)} +
    +
    +
    活跃牌子
    +
    {_render_badges(operator.get("top_badges", []) or [])}
    +
    +
    +
    +
    核心发言用户
    + {_render_list(active_user_items, "compact-user-list")} +
    +
    + + +
    +
    + +""" + return html_doc diff --git a/utils/markdown_to_image.py b/utils/markdown_to_image.py index 709e87e..8f59952 100644 --- a/utils/markdown_to_image.py +++ b/utils/markdown_to_image.py @@ -1,6 +1,5 @@ import subprocess import time -import markdown from pathlib import Path import psutil @@ -10,8 +9,69 @@ import asyncio import re from loguru import logger +try: + import markdown +except ImportError: + markdown = None + META_KEYWORDS = ["群", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"] +def _simple_markdown_to_html(md_content: str) -> str: + lines = str(md_content or "").splitlines() + html_parts = [] + in_ul = False + paragraph_lines = [] + + def flush_paragraph(): + nonlocal paragraph_lines + if paragraph_lines: + text = " ".join(item.strip() for item in paragraph_lines if item.strip()) + if text: + html_parts.append(f"

    {text}

    ") + paragraph_lines = [] + + def close_ul(): + nonlocal in_ul + if in_ul: + html_parts.append("") + in_ul = False + + for raw_line in lines: + line = raw_line.rstrip() + stripped = line.strip() + if not stripped: + flush_paragraph() + close_ul() + continue + if stripped.startswith("# "): + flush_paragraph() + close_ul() + html_parts.append(f"

    {stripped[2:].strip()}

    ") + continue + if stripped.startswith("## "): + flush_paragraph() + close_ul() + html_parts.append(f"

    {stripped[3:].strip()}

    ") + continue + if stripped.startswith("### "): + flush_paragraph() + close_ul() + html_parts.append(f"

    {stripped[4:].strip()}

    ") + continue + if stripped.startswith("- "): + flush_paragraph() + if not in_ul: + html_parts.append("
      ") + in_ul = True + html_parts.append(f"
    • {stripped[2:].strip()}
    • ") + continue + close_ul() + paragraph_lines.append(stripped) + + flush_paragraph() + close_ul() + return "\n".join(html_parts) + async def safe_close_browser(browser, timeout: float = 4.0) -> None: if not browser: @@ -105,7 +165,10 @@ def _split_hero(html_body: str): async def md_str_to_html_content(md_content): - html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite']) + if markdown is not None: + html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite']) + else: + html_body = _simple_markdown_to_html(md_content) hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body) css = """