# -*- coding: utf-8 -*-
import html
from typing import Any, Dict, List
from markupsafe import Markup
from utils.html_template_renderer import HtmlTemplateRenderer
def _escape(value: Any) -> str:
return html.escape(str(value or ""))
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
return (
'
'
f'
{_escape(label)}
'
f'
{_escape(value)}
'
f'
{_escape(hint)}
'
"
"
)
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
if not items:
return ""
lis = "".join(f'
{_escape(item)}
' for item in items if str(item or "").strip())
return f'
{lis}
' if lis else ""
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]:
# 这里把 LLM 返回的弹幕总结拆成三部分:
# 1) lead: 顶部总述段落
# 2) insight_items: 常规的复盘要点(运营/观察视角)
# 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点
# 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。
lead_parts = []
insight_items = []
fans_extract_items = []
in_fans_extract_block = False
for line in str(danmu_summary or "").splitlines():
stripped = line.strip()
if not stripped:
continue
# 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。
if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"):
in_fans_extract_block = True
continue
if stripped.startswith("- "):
if in_fans_extract_block:
fans_extract_items.append(stripped[2:].strip())
else:
insight_items.append(stripped[2:].strip())
else:
# 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。
if in_fans_extract_block:
fans_extract_items.append(stripped)
else:
lead_parts.append(stripped)
lead = " ".join(lead_parts).strip()
return lead, insight_items, fans_extract_items
def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()]
merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()]
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
supplements: List[str] = []
if top_terms:
supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {'、'.join(top_terms[:5])} 展开。")
if merged_templates:
sample_templates = ";".join(text[:24] for text in merged_templates[:3])
supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}。")
if peak_buckets:
top_bucket = peak_buckets[0]
bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()]
if bucket_terms:
supplements.append(
f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {'、'.join(bucket_terms[:4])}。"
)
if representative_messages:
supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]:
# 粉丝向萃取强调“现场感”,优先保留模型给出的条目;
# 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
representative_messages = payload.get("representative_messages", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
for item in representative_messages[:8]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if not content:
continue
supplements.append(f"{nickname}:{content[:46]}")
for item in repeated_messages[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。")
for item in burst_terms[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]:
items: List[str] = []
seen = set()
def push(text: str, suffix: str = "") -> None:
value = str(text or "").strip()
if not value:
return
normalized_key = value
if normalized_key in seen:
return
seen.add(normalized_key)
items.append(f"{value}{suffix}".strip())
for item in (payload.get("merged_templates", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"({count}次)")
for item in (payload.get("repeated_messages", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"({count}次)")
for item in (payload.get("burst_terms", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:36], f"(爆发 {count} 次)")
for item in (payload.get("top_terms", []) or [])[:6]:
term = str(item.get("term") or "").strip()
count = int(item.get("count", 0) or 0)
if term:
push(term, f"({count}次提及)")
return items[:limit]
def _render_insight_cards(items: List[str]) -> str:
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
blocks = []
for idx, item in enumerate(items[:6]):
extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else ""
blocks.append(
f'
'
f'
{_escape(labels[idx] if idx < len(labels) else "观察")}
'
f'
{_escape(item)}
'
"
"
)
return "".join(blocks)
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_badges[:6]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
blocks.append(
'
"
)
return "".join(blocks)
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
blocks = []
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
terms = [term for term in terms if term]
blocks.append(
'
'
f'
{_escape(start_time)}
'
f'
{_escape(item.get("message_count", 0))} 条弹幕
'
f'
{_escape(" / ".join(terms))}
'
"
"
)
return "".join(blocks)
def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_active_users[:10]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
fans_name = str(item.get("fans_name") or "").strip()
fans_level = int(item.get("fans_level", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
chips = []
if fans_name:
fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name
chips.append(f'{_escape(fans_label)}')
if room_level > 0:
chips.append(f'{_escape(f"平台 Lv{room_level}")}')
meta_html = "".join(chips) if chips else '普通观众'
blocks.append(
'