# -*- coding: utf-8 -*- import html from typing import Any, Dict, List from markupsafe import Markup from utils.html_template_renderer import HtmlTemplateRenderer def _escape(value: Any) -> str: return html.escape(str(value or "")) def _render_metric_card(label: str, value: Any, hint: str = "") -> str: return ( '
' f'
{_escape(label)}
' f'
{_escape(value)}
' f'
{_escape(hint)}
' "
" ) def _render_list(items: List[str], item_class: str = "bullet-list") -> str: if not items: return "" lis = "".join(f'
  • {_escape(item)}
  • ' for item in items if str(item or "").strip()) return f'' if lis else "" def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]: # 这里把 LLM 返回的弹幕总结拆成三部分: # 1) lead: 顶部总述段落 # 2) insight_items: 常规的复盘要点(运营/观察视角) # 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点 # 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。 lead_parts = [] insight_items = [] fans_extract_items = [] in_fans_extract_block = False for line in str(danmu_summary or "").splitlines(): stripped = line.strip() if not stripped: continue # 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。 if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"): in_fans_extract_block = True continue if stripped.startswith("- "): if in_fans_extract_block: fans_extract_items.append(stripped[2:].strip()) else: insight_items.append(stripped[2:].strip()) else: # 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。 if in_fans_extract_block: fans_extract_items.append(stripped) else: lead_parts.append(stripped) lead = " ".join(lead_parts).strip() return lead, insight_items, fans_extract_items def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]: normalized = [str(item or "").strip() for item in items if str(item or "").strip()] if len(normalized) >= target_count: return normalized[:target_count] top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()] merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()] peak_buckets = payload.get("peak_buckets", []) or [] representative_messages = payload.get("representative_messages", []) or [] supplements: List[str] = [] if top_terms: supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {'、'.join(top_terms[:5])} 展开。") if merged_templates: sample_templates = ";".join(text[:24] for text in merged_templates[:3]) supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}。") if peak_buckets: top_bucket = peak_buckets[0] bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()] if bucket_terms: supplements.append( f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {'、'.join(bucket_terms[:4])}。" ) if representative_messages: supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。") existing = set(normalized) for item in supplements: if item not in existing: normalized.append(item) existing.add(item) if len(normalized) >= target_count: break return normalized[:target_count] def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]: # 粉丝向萃取强调“现场感”,优先保留模型给出的条目; # 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。 normalized = [str(item or "").strip() for item in items if str(item or "").strip()] if len(normalized) >= target_count: return normalized[:target_count] supplements: List[str] = [] representative_messages = payload.get("representative_messages", []) or [] repeated_messages = payload.get("repeated_messages", []) or [] burst_terms = payload.get("burst_terms", []) or [] for item in representative_messages[:8]: nickname = str(item.get("nickname") or "").strip() or "观众" content = str(item.get("content") or "").strip() if not content: continue supplements.append(f"{nickname}:{content[:46]}") for item in repeated_messages[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。") for item in burst_terms[:4]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。") existing = set(normalized) for item in supplements: if item not in existing: normalized.append(item) existing.add(item) if len(normalized) >= target_count: break return normalized[:target_count] def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]: items: List[str] = [] seen = set() def push(text: str, suffix: str = "") -> None: value = str(text or "").strip() if not value: return normalized_key = value if normalized_key in seen: return seen.add(normalized_key) items.append(f"{value}{suffix}".strip()) for item in (payload.get("merged_templates", []) or [])[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(text[:72], f"({count}次)") for item in (payload.get("repeated_messages", []) or [])[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(text[:72], f"({count}次)") for item in (payload.get("burst_terms", []) or [])[:6]: text = str(item.get("text") or "").strip() count = int(item.get("count", 0) or 0) if text: push(text[:36], f"(爆发 {count} 次)") for item in (payload.get("top_terms", []) or [])[:6]: term = str(item.get("term") or "").strip() count = int(item.get("count", 0) or 0) if term: push(term, f"({count}次提及)") return items[:limit] def _render_insight_cards(items: List[str]) -> str: labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"] blocks = [] for idx, item in enumerate(items[:6]): extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else "" blocks.append( f'
    ' f'
    {_escape(labels[idx] if idx < len(labels) else "观察")}
    ' f'
    {_escape(item)}
    ' "
    " ) return "".join(blocks) def _render_badges(top_badges: List[Dict[str, Any]]) -> str: blocks = [] for item in top_badges[:6]: badge_name = str(item.get("badge_name") or "").strip() if not badge_name: continue blocks.append( '
    ' f'{_escape(badge_name)}' f'{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条' "
    " ) return "".join(blocks) def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str: blocks = [] for item in peak_buckets[:3]: start_time = str(item.get("start_time") or "")[-8:-3] terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]] terms = [term for term in terms if term] blocks.append( '
    ' f'
    {_escape(start_time)}
    ' f'
    {_escape(item.get("message_count", 0))} 条弹幕
    ' f'
    {_escape(" / ".join(terms))}
    ' "
    " ) return "".join(blocks) def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str: blocks = [] for item in top_active_users[:10]: nickname = str(item.get("nickname") or item.get("uid") or "").strip() fans_name = str(item.get("fans_name") or "").strip() fans_level = int(item.get("fans_level", 0) or 0) room_level = int(item.get("room_level", 0) or 0) message_count = int(item.get("message_count", 0) or 0) chips = [] if fans_name: fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name chips.append(f'{_escape(fans_label)}') if room_level > 0: chips.append(f'{_escape(f"平台 Lv{room_level}")}') meta_html = "".join(chips) if chips else '普通观众' blocks.append( '
    ' '
    ' f'
    {_escape(nickname)}
    ' f'
    {_escape(message_count)}
    ' '
    ' f'
    {meta_html}
    ' '
    ' ) return "".join(blocks) def _render_audience_trend_chart(audience_trend: Dict[str, Any]) -> str: points = audience_trend.get("points", []) or [] if len(points) < 2: return ( '
    ' '
    人数趋势
    ' '
    当前直播场次还没有足够的 WS 采样点,暂时无法绘制趋势图。
    ' '
    ' ) width = 820 height = 276 padding_left = 62 padding_right = 64 padding_top = 24 padding_bottom = 38 plot_width = width - padding_left - padding_right plot_height = height - padding_top - padding_bottom summary = audience_trend.get("summary", {}) or {} def _compress_chart_points(raw_points: List[Dict[str, Any]], max_points: int = 36) -> List[Dict[str, Any]]: if len(raw_points) <= max_points: return list(raw_points) bucket_size = max(len(raw_points) / max_points, 1) compressed: List[Dict[str, Any]] = [] for idx in range(max_points): start = int(round(idx * bucket_size)) end = int(round((idx + 1) * bucket_size)) bucket = raw_points[start:end] or raw_points[start:start + 1] if not bucket: continue compressed.append(dict(bucket[-1])) first = raw_points[0] last = raw_points[-1] if compressed: compressed[0] = dict(first) compressed[-1] = dict(last) return compressed chart_points = _compress_chart_points(points, max_points=36) vip_values = [int(item.get("vip_count", 0) or 0) for item in chart_points] diamond_values = [int(item.get("diamond_count", 0) or 0) for item in chart_points] labels = [str(item.get("timestamp") or "")[-8:-3] for item in chart_points] vip_min = min(vip_values) vip_max = max(vip_values) diamond_min = min(diamond_values) diamond_max = max(diamond_values) vip_padding = max(int((vip_max - vip_min) * 0.12), 60) diamond_padding = 1 if diamond_max - diamond_min <= 2 else max(int((diamond_max - diamond_min) * 0.2), 1) vip_display_min = max(vip_min - vip_padding, 0) vip_display_max = vip_max + vip_padding diamond_display_min = max(diamond_min - diamond_padding, 0) diamond_display_max = diamond_max + diamond_padding vip_span = max(vip_display_max - vip_display_min, 1) diamond_span = max(diamond_display_max - diamond_display_min, 1) step_x = plot_width / max(len(chart_points) - 1, 1) bar_width = max(min(step_x * 0.58, 18), 6) def x_at(index: int) -> float: return padding_left + step_x * index def y_vip(value: int) -> float: ratio = (value - vip_display_min) / vip_span if vip_span else 0 return padding_top + plot_height - ratio * plot_height def y_diamond(value: int) -> float: ratio = (value - diamond_display_min) / diamond_span if diamond_span else 0 return padding_top + plot_height - ratio * plot_height grid_lines = [] left_ticks = [] right_ticks = [] for idx in range(5): ratio = idx / 4 y = padding_top + plot_height - ratio * plot_height vip_tick = round(vip_display_min + vip_span * ratio) diamond_tick = round(diamond_display_min + diamond_span * ratio) grid_lines.append( f'' ) left_ticks.append( f'{vip_tick}' ) right_ticks.append( f'{diamond_tick}' ) bars = [] line_points = [] dot_points = [] label_marks = [] annotations = [] if len(chart_points) <= 12: label_indexes = list(range(len(chart_points))) else: label_indexes = sorted(set([0, len(chart_points) - 1] + [int(round((len(chart_points) - 1) * i / 6)) for i in range(1, 6)])) vip_peak_index = vip_values.index(vip_max) vip_valley_index = vip_values.index(vip_min) diamond_latest_index = len(chart_points) - 1 for idx, item in enumerate(chart_points): x = x_at(idx) vip_y = y_vip(int(item.get("vip_count", 0) or 0)) diamond_y = y_diamond(int(item.get("diamond_count", 0) or 0)) bar_height = padding_top + plot_height - vip_y bars.append( f'' ) line_points.append(f"{x:.1f},{diamond_y:.1f}") dot_points.append(f'') if idx in label_indexes: label_marks.append( f'{_escape(labels[idx])}' ) if idx == vip_peak_index: annotations.append( f'' f'峰值 {vip_max}' f'{_escape(labels[idx])}' f'' ) elif idx == vip_valley_index and vip_min != vip_max: annotations.append( f'' f'低点 {vip_min}' f'{_escape(labels[idx])}' f'' ) if idx == diamond_latest_index: annotations.append( f'' f'最新 {int(item.get("diamond_count", 0) or 0)}' f'{_escape(labels[idx])}' f'' ) polyline = f'' vip_latest = int(summary.get("vip_latest", vip_values[-1]) or 0) diamond_latest = int(summary.get("diamond_latest", diamond_values[-1]) or 0) session_start = str(summary.get("session_start") or "") first_point_time = str(summary.get("first_point_time") or "") leading_gap_minutes = int(summary.get("leading_gap_minutes", 0) or 0) note_text = "" if session_start and first_point_time and leading_gap_minutes >= 20: note_text = f"采样起点 {first_point_time[-8:-3]},早于该时段的人数数据未记录" return f"""
    贵宾 / 钻粉趋势
    贵宾柱状 钻粉折线 最新:贵宾 {_escape(vip_latest)} / 钻粉 {_escape(diamond_latest)}
    {'
    ' + _escape(note_text) + '
    ' if note_text else ''} {''.join(grid_lines)} {''.join(left_ticks)} {''.join(right_ticks)} {''.join(bars)} {polyline} {''.join(dot_points)} {''.join(annotations)} {''.join(label_marks)}
    """ def render_daily_report_html( payload: Dict[str, Any], danmu_summary: str, operator_summary_lines: List[str], ) -> str: meta = payload.get("report_meta", {}) or {} operator = payload.get("operator_metrics", {}) or {} title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播") subtitle = ( f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}" f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}" ) metrics_html = "".join([ _render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"), _render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"), _render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"), _render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"), ]) template_items = _build_template_items(payload) top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or [] audience_trend = payload.get("audience_trend", {}) or {} lead_summary, danmu_bullets, fans_extract_bullets = _split_summary_blocks(danmu_summary) danmu_bullets = _normalize_summary_bullets(payload, danmu_bullets, target_count=5) fans_extract_bullets = _normalize_fans_extract_bullets(payload, fans_extract_bullets, target_count=6) # 模板化渲染: # 1. 报告样式和结构迁移到独立 HTML 模板文件; # 2. Python 仅负责准备数据与片段,后续改 UI 只改模板即可。 renderer = HtmlTemplateRenderer() return renderer.render( "plugins/douyu/templates/daily_report.html", { "title_name": title_name, "subtitle": subtitle, # 下列片段由当前函数内辅助方法生成,文本内容已做转义,可安全注入模板。 "metrics_html": Markup(metrics_html), "lead_summary": lead_summary, "danmu_insights_html": Markup(_render_insight_cards(danmu_bullets)), "fans_extract_html": Markup(_render_list(fans_extract_bullets, item_class="fans-list")), "template_items_html": Markup(_render_list(template_items)), "hot_times_html": Markup(_render_hot_times(payload.get("peak_buckets", []) or [])), "audience_trend_html": Markup(_render_audience_trend_chart(audience_trend)), "operator_summary_html": Markup(_render_list(operator_summary_lines)), "badges_html": Markup(_render_badges(operator.get("top_badges", []) or [])), "active_users_html": Markup(_render_active_users(top_active_users)), }, )