# -*- coding: utf-8 -*-
import html
from typing import Any, Dict, List
from markupsafe import Markup
from utils.html_template_renderer import HtmlTemplateRenderer
def _escape(value: Any) -> str:
return html.escape(str(value or ""))
def _clean_bullet_text(line: str) -> str:
"""
统一清洗模型输出里的 bullet 前缀。
这样做的目的有两个:
1. 兼容 `-`、`•`、`1.` 这类常见列表格式;
2. 让模板层只拿到干净文本,避免在 HTML 里再做重复判断。
"""
text = str(line or "").strip()
if not text:
return ""
if text.startswith("- "):
return text[2:].strip()
if text.startswith("•"):
return text[1:].strip()
if len(text) > 2 and text[0].isdigit() and text[1] in {".", "、"}:
return text[2:].strip()
return text
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
return (
'
'
f'
{_escape(label)}
'
f'
{_escape(value)}
'
f'
{_escape(hint)}
'
"
"
)
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
if not items:
return ""
lis = "".join(f'
{_escape(item)}
' for item in items if str(item or "").strip())
return f'
{lis}
' if lis else ""
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]:
# 这里把 LLM 返回的弹幕总结拆成三部分:
# 1) lead: 顶部总述段落
# 2) insight_items: 常规的复盘要点(运营/观察视角)
# 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点
# 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。
lead_parts = []
insight_items = []
fans_extract_items = []
in_fans_extract_block = False
for line in str(danmu_summary or "").splitlines():
stripped = line.strip()
if not stripped:
continue
# 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。
if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"):
in_fans_extract_block = True
continue
if stripped.startswith("- "):
if in_fans_extract_block:
fans_extract_items.append(stripped[2:].strip())
else:
insight_items.append(stripped[2:].strip())
else:
# 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。
if in_fans_extract_block:
fans_extract_items.append(stripped)
else:
lead_parts.append(stripped)
lead = " ".join(lead_parts).strip()
return lead, insight_items, fans_extract_items
def _split_fans_report_blocks(report_text: str) -> Dict[str, Any]:
"""
将“粉丝向恶搞日报”文本拆成模板需要的结构化区块。
约定模型尽量输出如下标题:
- 【今日笑点】
- 【弹幕名场面】
- 【梗王榜】
- 【收尾播报】
即便模型没有完全按约定输出,这里也会尽量兜底,保证页面不空。
"""
header_alias_map = {
"今日笑点": "laugh_points",
"笑点": "laugh_points",
"欢乐总结": "laugh_points",
"弹幕名场面": "famous_scenes",
"名场面": "famous_scenes",
"现场整活": "famous_scenes",
"梗王榜": "meme_rank",
"梗榜": "meme_rank",
"复读冠军": "meme_rank",
"收尾播报": "closing",
"结尾播报": "closing",
"结尾": "closing",
}
sections = {
"lead": "",
"laugh_points": [],
"famous_scenes": [],
"meme_rank": [],
"closing": [],
}
current_key = "lead"
lead_parts: List[str] = []
for raw_line in str(report_text or "").splitlines():
stripped = raw_line.strip()
if not stripped:
continue
normalized_title = stripped.strip("【】:#: ").replace(":", "").replace(":", "")
if normalized_title in header_alias_map:
current_key = header_alias_map[normalized_title]
continue
clean_text = _clean_bullet_text(stripped)
if not clean_text:
continue
if current_key == "lead":
lead_parts.append(clean_text)
else:
sections[current_key].append(clean_text)
sections["lead"] = " ".join(lead_parts).strip()
return sections
def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()]
merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()]
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
supplements: List[str] = []
if top_terms:
supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {'、'.join(top_terms[:5])} 展开。")
if merged_templates:
sample_templates = ";".join(text[:24] for text in merged_templates[:3])
supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}。")
if peak_buckets:
top_bucket = peak_buckets[0]
bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()]
if bucket_terms:
supplements.append(
f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {'、'.join(bucket_terms[:4])}。"
)
if representative_messages:
supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]:
# 粉丝向萃取强调“现场感”,优先保留模型给出的条目;
# 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
representative_messages = payload.get("representative_messages", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
for item in representative_messages[:8]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if not content:
continue
supplements.append(f"{nickname}:{content[:46]}")
for item in repeated_messages[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。")
for item in burst_terms[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_funny_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 4) -> List[str]:
"""
“今日笑点”优先保留模型自己写的梗概;
如果模型输出偏保守,就从高频梗、爆发词里补出几条更有现场感的句子。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"同一句梗反复刷了 {count} 次,直播间默认进入复读机模式:{text[:30]}。")
for item in (payload.get("burst_terms", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪词「{text}」高频刷屏 {count} 次,说明这一段大家已经集体上头。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_scene_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
"""
“弹幕名场面”强调像直播间回放,因此优先从代表弹幕中补句子,
让最终成品看起来更像观众之间的接龙,而不是纯数据总结。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("representative_messages", []) or [])[:10]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if content:
supplements.append(f"{nickname}:{content[:42]}")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_rank_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 3) -> List[str]:
"""
“梗王榜”兜底来源按优先级走:
1. 已聚合的长模板梗;
2. 重复短句;
3. 爆发情绪词。
这样即便模型漏写榜单,页面也能稳定展示“今天到底大家在刷什么”。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|全场 {count} 次")
for item in (payload.get("repeated_messages", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|复读 {count} 次")
for item in (payload.get("burst_terms", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text}|情绪爆发 {count} 次")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_closing_text(payload: Dict[str, Any], closing_items: List[str]) -> str:
"""
收尾句只有一句,优先保留模型原话;
如果模型没给,就用当天最高峰时段和热词拼一个轻松结尾。
"""
for item in closing_items:
value = str(item or "").strip()
if value:
return value
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
top_bucket = peak_buckets[0]
top_terms = [
str(term.get("term") or "").strip()
for term in (top_bucket.get("top_terms", []) or [])[:3]
if str(term.get("term") or "").strip()
]
return (
f"今晚最佳观影时段锁定 {str(top_bucket.get('start_time') or '')[-8:-3]},"
f"大家围着 {'、'.join(top_terms) or '节目效果'} 一起起哄,收工时空气里都还是梗。"
)
return "今天的直播结论很简单:操作未必全记住了,但弹幕梗已经自动住进群友脑回路。"
def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]:
items: List[str] = []
seen = set()
def push(text: str, suffix: str = "") -> None:
value = str(text or "").strip()
if not value:
return
normalized_key = value
if normalized_key in seen:
return
seen.add(normalized_key)
items.append(f"{value}{suffix}".strip())
for item in (payload.get("merged_templates", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"({count}次)")
for item in (payload.get("repeated_messages", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"({count}次)")
for item in (payload.get("burst_terms", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:36], f"(爆发 {count} 次)")
for item in (payload.get("top_terms", []) or [])[:6]:
term = str(item.get("term") or "").strip()
count = int(item.get("count", 0) or 0)
if term:
push(term, f"({count}次提及)")
return items[:limit]
def _render_insight_cards(items: List[str]) -> str:
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
blocks = []
for idx, item in enumerate(items[:6]):
extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else ""
blocks.append(
f'
'
f'
{_escape(labels[idx] if idx < len(labels) else "观察")}
'
f'
{_escape(item)}
'
"
"
)
return "".join(blocks)
def _render_fans_scene_cards(items: List[str]) -> str:
blocks = []
for item in items[:6]:
blocks.append(
'
'
f'
{_escape(item)}
'
"
"
)
return "".join(blocks)
def _render_rank_cards(items: List[str]) -> str:
blocks = []
for idx, item in enumerate(items[:3], start=1):
blocks.append(
'
'
f'
TOP {idx}
'
f'
{_escape(item)}
'
"
"
)
return "".join(blocks)
def _build_fans_fun_metrics(payload: Dict[str, Any]) -> List[Dict[str, str]]:
"""
粉丝版避免直接沿用“运营指标”命名,改成更轻松的展示口径。
底层仍然来自同一份 payload,所以既分风格,又不损失真实性。
"""
meta = payload.get("report_meta", {}) or {}
peak_buckets = payload.get("peak_buckets", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
top_bucket = peak_buckets[0] if peak_buckets else {}
top_repeat = repeated_messages[0] if repeated_messages else {}
top_burst = burst_terms[0] if burst_terms else {}
return [
{
"label": "今日弹幕量",
"value": str(meta.get("message_count", 0) or 0),
"hint": "今天一共刷了多少句",
},
{
"label": "围观群众",
"value": str(meta.get("unique_user_count", 0) or 0),
"hint": "参与一起起哄的人数",
},
{
"label": "最高能时段",
"value": str(top_bucket.get("start_time") or "")[-8:-3] or "--:--",
"hint": "弹幕最炸裂的时间点",
},
{
"label": "今日爆词",
"value": str(top_burst.get("text") or top_repeat.get("text") or "乐"),
"hint": "刷得最凶的那句",
},
]
def _render_fans_metric_cards(metrics: List[Dict[str, str]]) -> str:
blocks = []
for item in metrics:
blocks.append(
'
'
f'
{_escape(item.get("label", ""))}
'
f'
{_escape(item.get("value", ""))}
'
f'
{_escape(item.get("hint", ""))}
'
"
"
)
return "".join(blocks)
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_badges[:6]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
blocks.append(
'
"
)
return "".join(blocks)
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
blocks = []
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
terms = [term for term in terms if term]
blocks.append(
'
'
f'
{_escape(start_time)}
'
f'
{_escape(item.get("message_count", 0))} 条弹幕
'
f'
{_escape(" / ".join(terms))}
'
"
"
)
return "".join(blocks)
def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_active_users[:10]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
fans_name = str(item.get("fans_name") or "").strip()
fans_level = int(item.get("fans_level", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
chips = []
if fans_name:
fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name
chips.append(f'{_escape(fans_label)}')
if room_level > 0:
chips.append(f'{_escape(f"平台 Lv{room_level}")}')
meta_html = "".join(chips) if chips else '普通观众'
blocks.append(
'