- 将粉丝日报改为信息优先布局,新增重点信息、话题簇、英雄焦点和热点窗口区块 - 让模板直接展示本地提纯出的有效信息,不再只依赖少量乐子文案 - 补充粉丝日报渲染辅助函数,提升证据簇和高频信息的承载能力
968 lines
41 KiB
Python
968 lines
41 KiB
Python
# -*- coding: utf-8 -*-
|
||
import html
|
||
from typing import Any, Dict, List
|
||
|
||
from markupsafe import Markup
|
||
|
||
from utils.html_template_renderer import HtmlTemplateRenderer
|
||
|
||
|
||
def _escape(value: Any) -> str:
|
||
return html.escape(str(value or ""))
|
||
|
||
|
||
def _clean_bullet_text(line: str) -> str:
|
||
"""
|
||
统一清洗模型输出里的 bullet 前缀。
|
||
这样做的目的有两个:
|
||
1. 兼容 `-`、`•`、`1.` 这类常见列表格式;
|
||
2. 让模板层只拿到干净文本,避免在 HTML 里再做重复判断。
|
||
"""
|
||
text = str(line or "").strip()
|
||
if not text:
|
||
return ""
|
||
if text.startswith("- "):
|
||
return text[2:].strip()
|
||
if text.startswith("•"):
|
||
return text[1:].strip()
|
||
if len(text) > 2 and text[0].isdigit() and text[1] in {".", "、"}:
|
||
return text[2:].strip()
|
||
return text
|
||
|
||
|
||
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
|
||
return (
|
||
'<div class="metric-card">'
|
||
f'<div class="metric-label">{_escape(label)}</div>'
|
||
f'<div class="metric-value">{_escape(value)}</div>'
|
||
f'<div class="metric-hint">{_escape(hint)}</div>'
|
||
"</div>"
|
||
)
|
||
|
||
|
||
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
|
||
if not items:
|
||
return ""
|
||
lis = "".join(f'<li>{_escape(item)}</li>' for item in items if str(item or "").strip())
|
||
return f'<ul class="{item_class}">{lis}</ul>' if lis else ""
|
||
|
||
|
||
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]:
|
||
# 这里把 LLM 返回的弹幕总结拆成三部分:
|
||
# 1) lead: 顶部总述段落
|
||
# 2) insight_items: 常规的复盘要点(运营/观察视角)
|
||
# 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点
|
||
# 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。
|
||
lead_parts = []
|
||
insight_items = []
|
||
fans_extract_items = []
|
||
in_fans_extract_block = False
|
||
for line in str(danmu_summary or "").splitlines():
|
||
stripped = line.strip()
|
||
if not stripped:
|
||
continue
|
||
# 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。
|
||
if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"):
|
||
in_fans_extract_block = True
|
||
continue
|
||
if stripped.startswith("- "):
|
||
if in_fans_extract_block:
|
||
fans_extract_items.append(stripped[2:].strip())
|
||
else:
|
||
insight_items.append(stripped[2:].strip())
|
||
else:
|
||
# 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。
|
||
if in_fans_extract_block:
|
||
fans_extract_items.append(stripped)
|
||
else:
|
||
lead_parts.append(stripped)
|
||
lead = " ".join(lead_parts).strip()
|
||
return lead, insight_items, fans_extract_items
|
||
|
||
|
||
def _split_fans_report_blocks(report_text: str) -> Dict[str, Any]:
|
||
"""
|
||
将“粉丝向恶搞日报”文本拆成模板需要的结构化区块。
|
||
约定模型尽量输出如下标题:
|
||
- 【今日笑点】
|
||
- 【弹幕名场面】
|
||
- 【梗王榜】
|
||
- 【收尾播报】
|
||
即便模型没有完全按约定输出,这里也会尽量兜底,保证页面不空。
|
||
"""
|
||
header_alias_map = {
|
||
"今日笑点": "laugh_points",
|
||
"笑点": "laugh_points",
|
||
"欢乐总结": "laugh_points",
|
||
"弹幕名场面": "famous_scenes",
|
||
"名场面": "famous_scenes",
|
||
"现场整活": "famous_scenes",
|
||
"梗王榜": "meme_rank",
|
||
"梗榜": "meme_rank",
|
||
"复读冠军": "meme_rank",
|
||
"收尾播报": "closing",
|
||
"结尾播报": "closing",
|
||
"结尾": "closing",
|
||
}
|
||
sections = {
|
||
"lead": "",
|
||
"laugh_points": [],
|
||
"famous_scenes": [],
|
||
"meme_rank": [],
|
||
"closing": [],
|
||
}
|
||
current_key = "lead"
|
||
lead_parts: List[str] = []
|
||
|
||
for raw_line in str(report_text or "").splitlines():
|
||
stripped = raw_line.strip()
|
||
if not stripped:
|
||
continue
|
||
normalized_title = stripped.strip("【】:#: ").replace(":", "").replace(":", "")
|
||
if normalized_title in header_alias_map:
|
||
current_key = header_alias_map[normalized_title]
|
||
continue
|
||
clean_text = _clean_bullet_text(stripped)
|
||
if not clean_text:
|
||
continue
|
||
if current_key == "lead":
|
||
lead_parts.append(clean_text)
|
||
else:
|
||
sections[current_key].append(clean_text)
|
||
|
||
sections["lead"] = " ".join(lead_parts).strip()
|
||
return sections
|
||
|
||
|
||
def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
|
||
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
|
||
if len(normalized) >= target_count:
|
||
return normalized[:target_count]
|
||
|
||
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()]
|
||
merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()]
|
||
peak_buckets = payload.get("peak_buckets", []) or []
|
||
representative_messages = payload.get("representative_messages", []) or []
|
||
|
||
supplements: List[str] = []
|
||
if top_terms:
|
||
supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {'、'.join(top_terms[:5])} 展开。")
|
||
if merged_templates:
|
||
sample_templates = ";".join(text[:24] for text in merged_templates[:3])
|
||
supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}。")
|
||
if peak_buckets:
|
||
top_bucket = peak_buckets[0]
|
||
bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()]
|
||
if bucket_terms:
|
||
supplements.append(
|
||
f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {'、'.join(bucket_terms[:4])}。"
|
||
)
|
||
if representative_messages:
|
||
supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。")
|
||
|
||
existing = set(normalized)
|
||
for item in supplements:
|
||
if item not in existing:
|
||
normalized.append(item)
|
||
existing.add(item)
|
||
if len(normalized) >= target_count:
|
||
break
|
||
return normalized[:target_count]
|
||
|
||
|
||
def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]:
|
||
# 粉丝向萃取强调“现场感”,优先保留模型给出的条目;
|
||
# 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。
|
||
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
|
||
if len(normalized) >= target_count:
|
||
return normalized[:target_count]
|
||
|
||
supplements: List[str] = []
|
||
representative_messages = payload.get("representative_messages", []) or []
|
||
repeated_messages = payload.get("repeated_messages", []) or []
|
||
burst_terms = payload.get("burst_terms", []) or []
|
||
|
||
for item in representative_messages[:8]:
|
||
nickname = str(item.get("nickname") or "").strip() or "观众"
|
||
content = str(item.get("content") or "").strip()
|
||
if not content:
|
||
continue
|
||
supplements.append(f"{nickname}:{content[:46]}")
|
||
|
||
for item in repeated_messages[:6]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。")
|
||
|
||
for item in burst_terms[:4]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。")
|
||
|
||
existing = set(normalized)
|
||
for item in supplements:
|
||
if item not in existing:
|
||
normalized.append(item)
|
||
existing.add(item)
|
||
if len(normalized) >= target_count:
|
||
break
|
||
return normalized[:target_count]
|
||
|
||
|
||
def _normalize_funny_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 4) -> List[str]:
|
||
"""
|
||
“今日笑点”优先保留模型自己写的梗概;
|
||
如果模型输出偏保守,就从高频梗、爆发词里补出几条更有现场感的句子。
|
||
"""
|
||
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
|
||
if len(normalized) >= target_count:
|
||
return normalized[:target_count]
|
||
|
||
supplements: List[str] = []
|
||
for item in (payload.get("merged_templates", []) or [])[:4]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"同一句梗反复刷了 {count} 次,直播间默认进入复读机模式:{text[:30]}。")
|
||
|
||
for item in (payload.get("burst_terms", []) or [])[:4]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"情绪词「{text}」高频刷屏 {count} 次,说明这一段大家已经集体上头。")
|
||
|
||
existing = set(normalized)
|
||
for item in supplements:
|
||
if item not in existing:
|
||
normalized.append(item)
|
||
existing.add(item)
|
||
if len(normalized) >= target_count:
|
||
break
|
||
return normalized[:target_count]
|
||
|
||
|
||
def _normalize_scene_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
|
||
"""
|
||
“弹幕名场面”强调像直播间回放,因此优先从代表弹幕中补句子,
|
||
让最终成品看起来更像观众之间的接龙,而不是纯数据总结。
|
||
"""
|
||
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
|
||
if len(normalized) >= target_count:
|
||
return normalized[:target_count]
|
||
|
||
supplements: List[str] = []
|
||
for item in (payload.get("representative_messages", []) or [])[:10]:
|
||
nickname = str(item.get("nickname") or "").strip() or "观众"
|
||
content = str(item.get("content") or "").strip()
|
||
if content:
|
||
supplements.append(f"{nickname}:{content[:42]}")
|
||
|
||
existing = set(normalized)
|
||
for item in supplements:
|
||
if item not in existing:
|
||
normalized.append(item)
|
||
existing.add(item)
|
||
if len(normalized) >= target_count:
|
||
break
|
||
return normalized[:target_count]
|
||
|
||
|
||
def _normalize_rank_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 3) -> List[str]:
|
||
"""
|
||
“梗王榜”兜底来源按优先级走:
|
||
1. 已聚合的长模板梗;
|
||
2. 重复短句;
|
||
3. 爆发情绪词。
|
||
这样即便模型漏写榜单,页面也能稳定展示“今天到底大家在刷什么”。
|
||
"""
|
||
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
|
||
if len(normalized) >= target_count:
|
||
return normalized[:target_count]
|
||
|
||
supplements: List[str] = []
|
||
for item in (payload.get("merged_templates", []) or [])[:3]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"{text[:30]}|全场 {count} 次")
|
||
|
||
for item in (payload.get("repeated_messages", []) or [])[:3]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"{text[:30]}|复读 {count} 次")
|
||
|
||
for item in (payload.get("burst_terms", []) or [])[:3]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
supplements.append(f"{text}|情绪爆发 {count} 次")
|
||
|
||
existing = set(normalized)
|
||
for item in supplements:
|
||
if item not in existing:
|
||
normalized.append(item)
|
||
existing.add(item)
|
||
if len(normalized) >= target_count:
|
||
break
|
||
return normalized[:target_count]
|
||
|
||
|
||
def _normalize_closing_text(payload: Dict[str, Any], closing_items: List[str]) -> str:
|
||
"""
|
||
收尾句只有一句,优先保留模型原话;
|
||
如果模型没给,就用当天最高峰时段和热词拼一个轻松结尾。
|
||
"""
|
||
for item in closing_items:
|
||
value = str(item or "").strip()
|
||
if value:
|
||
return value
|
||
|
||
peak_buckets = payload.get("peak_buckets", []) or []
|
||
if peak_buckets:
|
||
top_bucket = peak_buckets[0]
|
||
top_terms = [
|
||
str(term.get("term") or "").strip()
|
||
for term in (top_bucket.get("top_terms", []) or [])[:3]
|
||
if str(term.get("term") or "").strip()
|
||
]
|
||
return (
|
||
f"今晚最佳观影时段锁定 {str(top_bucket.get('start_time') or '')[-8:-3]},"
|
||
f"大家围着 {'、'.join(top_terms) or '节目效果'} 一起起哄,收工时空气里都还是梗。"
|
||
)
|
||
return "今天的直播结论很简单:操作未必全记住了,但弹幕梗已经自动住进群友脑回路。"
|
||
|
||
|
||
def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]:
|
||
items: List[str] = []
|
||
seen = set()
|
||
|
||
def push(text: str, suffix: str = "") -> None:
|
||
value = str(text or "").strip()
|
||
if not value:
|
||
return
|
||
normalized_key = value
|
||
if normalized_key in seen:
|
||
return
|
||
seen.add(normalized_key)
|
||
items.append(f"{value}{suffix}".strip())
|
||
|
||
for item in (payload.get("merged_templates", []) or [])[:6]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
push(text[:72], f"({count}次)")
|
||
|
||
for item in (payload.get("repeated_messages", []) or [])[:6]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
push(text[:72], f"({count}次)")
|
||
|
||
for item in (payload.get("burst_terms", []) or [])[:6]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
push(text[:36], f"(爆发 {count} 次)")
|
||
|
||
for item in (payload.get("top_terms", []) or [])[:6]:
|
||
term = str(item.get("term") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if term:
|
||
push(term, f"({count}次提及)")
|
||
|
||
return items[:limit]
|
||
|
||
|
||
def _render_insight_cards(items: List[str]) -> str:
|
||
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
|
||
blocks = []
|
||
for idx, item in enumerate(items[:6]):
|
||
extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else ""
|
||
blocks.append(
|
||
f'<div class="insight-card{extra_class}">'
|
||
f'<div class="insight-kicker">{_escape(labels[idx] if idx < len(labels) else "观察")}</div>'
|
||
f'<div class="insight-text">{_escape(item)}</div>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_fans_scene_cards(items: List[str]) -> str:
|
||
blocks = []
|
||
for item in items[:6]:
|
||
blocks.append(
|
||
'<div class="fans-scene-card">'
|
||
f'<div class="fans-scene-quote">{_escape(item)}</div>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_rank_cards(items: List[str]) -> str:
|
||
blocks = []
|
||
for idx, item in enumerate(items[:3], start=1):
|
||
blocks.append(
|
||
'<div class="meme-rank-card">'
|
||
f'<div class="meme-rank-no">TOP {idx}</div>'
|
||
f'<div class="meme-rank-text">{_escape(item)}</div>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _build_fans_fun_metrics(payload: Dict[str, Any]) -> List[Dict[str, str]]:
|
||
"""
|
||
粉丝版避免直接沿用“运营指标”命名,改成更轻松的展示口径。
|
||
底层仍然来自同一份 payload,所以既分风格,又不损失真实性。
|
||
"""
|
||
meta = payload.get("report_meta", {}) or {}
|
||
peak_buckets = payload.get("peak_buckets", []) or []
|
||
repeated_messages = payload.get("repeated_messages", []) or []
|
||
burst_terms = payload.get("burst_terms", []) or []
|
||
top_bucket = peak_buckets[0] if peak_buckets else {}
|
||
top_repeat = repeated_messages[0] if repeated_messages else {}
|
||
top_burst = burst_terms[0] if burst_terms else {}
|
||
return [
|
||
{
|
||
"label": "今日弹幕量",
|
||
"value": str(meta.get("message_count", 0) or 0),
|
||
"hint": "今天一共刷了多少句",
|
||
},
|
||
{
|
||
"label": "围观群众",
|
||
"value": str(meta.get("unique_user_count", 0) or 0),
|
||
"hint": "参与一起起哄的人数",
|
||
},
|
||
{
|
||
"label": "最高能时段",
|
||
"value": str(top_bucket.get("start_time") or "")[-8:-3] or "--:--",
|
||
"hint": "弹幕最炸裂的时间点",
|
||
},
|
||
{
|
||
"label": "今日爆词",
|
||
"value": str(top_burst.get("text") or top_repeat.get("text") or "乐"),
|
||
"hint": "刷得最凶的那句",
|
||
},
|
||
]
|
||
|
||
|
||
def _render_fans_metric_cards(metrics: List[Dict[str, str]]) -> str:
|
||
blocks = []
|
||
for item in metrics:
|
||
blocks.append(
|
||
'<div class="fans-metric-card">'
|
||
f'<div class="fans-metric-label">{_escape(item.get("label", ""))}</div>'
|
||
f'<div class="fans-metric-value">{_escape(item.get("value", ""))}</div>'
|
||
f'<div class="fans-metric-hint">{_escape(item.get("hint", ""))}</div>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _build_fans_effective_info_lines(payload: Dict[str, Any], limit: int = 6) -> List[str]:
|
||
"""
|
||
为粉丝日报补一层“有效信息速览”。
|
||
这里优先从本地提纯后的主题证据簇中拿事实,不依赖 LLM 自己归纳,
|
||
这样模板本身就能稳定承载更多有效信息。
|
||
"""
|
||
lines: List[str] = []
|
||
seen = set()
|
||
|
||
def push(text: str) -> None:
|
||
value = str(text or "").strip()
|
||
if not value or value in seen:
|
||
return
|
||
seen.add(value)
|
||
lines.append(value)
|
||
|
||
for item in (payload.get("topic_evidence_clusters", []) or [])[:6]:
|
||
label = str(item.get("label") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
time_range = str(item.get("time_range") or "").strip()
|
||
samples = item.get("samples", []) or []
|
||
sample_text = ""
|
||
if samples:
|
||
first_sample = samples[0]
|
||
sample_text = str(first_sample.get("content") or "").strip()[:42]
|
||
if label and sample_text:
|
||
push(f"{label}在 {time_range or '全场'} 持续被讨论,相关弹幕约 {count} 条,代表内容是「{sample_text}」。")
|
||
elif label:
|
||
push(f"{label}是今天的高关注话题之一,相关弹幕约 {count} 条。")
|
||
if len(lines) >= limit:
|
||
return lines[:limit]
|
||
|
||
hero_mentions = payload.get("compact_scene_material", {}).get("semantic_fact_hints", {}).get("hero_mentions", []) or []
|
||
if hero_mentions:
|
||
hero_names = [str(item.get("hero") or "").strip() for item in hero_mentions[:4] if str(item.get("hero") or "").strip()]
|
||
if hero_names:
|
||
push(f"英雄讨论主要集中在 {'、'.join(hero_names)}。")
|
||
|
||
hot_windows = payload.get("local_stats", {}).get("peak_windows", []) or []
|
||
if hot_windows:
|
||
top_window = hot_windows[0]
|
||
push(
|
||
f"最热窗口出现在 {str(top_window.get('start_time') or '')[-8:-3]},"
|
||
f"该时段累计弹幕 {int(top_window.get('message_count', 0) or 0)} 条。"
|
||
)
|
||
return lines[:limit]
|
||
|
||
|
||
def _render_fans_info_cards(items: List[str]) -> str:
|
||
blocks = []
|
||
for item in items[:6]:
|
||
blocks.append(
|
||
'<div class="fans-info-card">'
|
||
f'<div class="fans-info-text">{_escape(item)}</div>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_topic_clusters(topic_clusters: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in topic_clusters[:5]:
|
||
label = str(item.get("label") or "").strip()
|
||
if not label:
|
||
continue
|
||
count = int(item.get("count", 0) or 0)
|
||
user_count = int(item.get("user_count", 0) or 0)
|
||
time_range = str(item.get("time_range") or "").strip()
|
||
samples = item.get("samples", []) or []
|
||
sample_lines = []
|
||
for sample in samples[:3]:
|
||
content = str(sample.get("content") or "").strip()
|
||
nickname = str(sample.get("nickname") or "").strip() or "观众"
|
||
hm = str(sample.get("hm") or "").strip()
|
||
if content:
|
||
sample_lines.append(f"{hm} {nickname}:{content[:42]}")
|
||
sample_html = "".join(f'<li>{_escape(line)}</li>' for line in sample_lines)
|
||
blocks.append(
|
||
'<div class="topic-cluster-card">'
|
||
f'<div class="topic-cluster-title">{_escape(label)}</div>'
|
||
f'<div class="topic-cluster-meta">{_escape(time_range or "全场")} · {count} 条讨论 · {user_count} 人参与</div>'
|
||
f'<ul class="topic-cluster-list">{sample_html}</ul>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_hero_mentions(hero_mentions: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in hero_mentions[:4]:
|
||
hero_name = str(item.get("hero") or "").strip()
|
||
if not hero_name:
|
||
continue
|
||
mention_count = int(item.get("mention_count", 0) or 0)
|
||
user_count = int(item.get("user_count", 0) or 0)
|
||
sample_text = ""
|
||
samples = item.get("samples", []) or []
|
||
if samples:
|
||
sample = samples[0]
|
||
sample_text = str(sample.get("content") or "").strip()[:46]
|
||
blocks.append(
|
||
'<div class="hero-mention-card">'
|
||
f'<div class="hero-mention-name">{_escape(hero_name)}</div>'
|
||
f'<div class="hero-mention-meta">{mention_count} 次提及 / {user_count} 人讨论</div>'
|
||
f'<div class="hero-mention-sample">{_escape(sample_text)}</div>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_hot_window_cards(hot_windows: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in hot_windows[:4]:
|
||
start_time = str(item.get("start_time") or "")[-8:-3]
|
||
message_count = int(item.get("message_count", 0) or 0)
|
||
user_count = int(item.get("user_count", 0) or 0)
|
||
blocks.append(
|
||
'<div class="fans-hot-window-card">'
|
||
f'<div class="fans-hot-window-time">{_escape(start_time)}</div>'
|
||
f'<div class="fans-hot-window-meta">{message_count} 条弹幕 / {user_count} 人参与</div>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_repeat_digest(payload: Dict[str, Any]) -> str:
|
||
local_stats = payload.get("local_stats", {}) or {}
|
||
repeated_messages = local_stats.get("top_repeated_messages", []) or []
|
||
emotion_bursts = local_stats.get("top_emotion_bursts", []) or []
|
||
blocks = []
|
||
for item in repeated_messages[:4]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
blocks.append(
|
||
'<div class="repeat-chip">'
|
||
f'<span class="repeat-chip-text">{_escape(text[:28])}</span>'
|
||
f'<span class="repeat-chip-count">{count} 次</span>'
|
||
'</div>'
|
||
)
|
||
for item in emotion_bursts[:4]:
|
||
text = str(item.get("text") or "").strip()
|
||
count = int(item.get("count", 0) or 0)
|
||
if text:
|
||
blocks.append(
|
||
'<div class="repeat-chip emotion">'
|
||
f'<span class="repeat-chip-text">{_escape(text[:18])}</span>'
|
||
f'<span class="repeat-chip-count">{count} 次</span>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in top_badges[:6]:
|
||
badge_name = str(item.get("badge_name") or "").strip()
|
||
if not badge_name:
|
||
continue
|
||
blocks.append(
|
||
'<div class="badge-chip">'
|
||
f'<span class="badge-name">{_escape(badge_name)}</span>'
|
||
f'<span class="badge-meta">{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条</span>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in peak_buckets[:3]:
|
||
start_time = str(item.get("start_time") or "")[-8:-3]
|
||
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
|
||
terms = [term for term in terms if term]
|
||
blocks.append(
|
||
'<div class="hot-card">'
|
||
f'<div class="hot-time">{_escape(start_time)}</div>'
|
||
f'<div class="hot-count">{_escape(item.get("message_count", 0))} 条弹幕</div>'
|
||
f'<div class="hot-terms">{_escape(" / ".join(terms))}</div>'
|
||
"</div>"
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str:
|
||
blocks = []
|
||
for item in top_active_users[:10]:
|
||
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
|
||
fans_name = str(item.get("fans_name") or "").strip()
|
||
fans_level = int(item.get("fans_level", 0) or 0)
|
||
room_level = int(item.get("room_level", 0) or 0)
|
||
message_count = int(item.get("message_count", 0) or 0)
|
||
|
||
chips = []
|
||
if fans_name:
|
||
fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name
|
||
chips.append(f'<span class="user-chip fans">{_escape(fans_label)}</span>')
|
||
if room_level > 0:
|
||
chips.append(f'<span class="user-chip room">{_escape(f"平台 Lv{room_level}")}</span>')
|
||
meta_html = "".join(chips) if chips else '<span class="user-chip plain">普通观众</span>'
|
||
|
||
blocks.append(
|
||
'<div class="active-user-card">'
|
||
'<div class="active-user-top">'
|
||
f'<div class="active-user-name">{_escape(nickname)}</div>'
|
||
f'<div class="active-user-count">{_escape(message_count)}<span>条</span></div>'
|
||
'</div>'
|
||
f'<div class="active-user-meta">{meta_html}</div>'
|
||
'</div>'
|
||
)
|
||
return "".join(blocks)
|
||
|
||
|
||
def _render_audience_trend_chart(audience_trend: Dict[str, Any]) -> str:
|
||
points = audience_trend.get("points", []) or []
|
||
if len(points) < 2:
|
||
return (
|
||
'<div class="chart-empty">'
|
||
'<div class="chart-empty-title">人数趋势</div>'
|
||
'<div class="chart-empty-desc">当前直播场次还没有足够的 WS 采样点,暂时无法绘制趋势图。</div>'
|
||
'</div>'
|
||
)
|
||
|
||
width = 820
|
||
height = 276
|
||
padding_left = 62
|
||
padding_right = 64
|
||
padding_top = 24
|
||
padding_bottom = 38
|
||
plot_width = width - padding_left - padding_right
|
||
plot_height = height - padding_top - padding_bottom
|
||
|
||
summary = audience_trend.get("summary", {}) or {}
|
||
|
||
def _compress_chart_points(raw_points: List[Dict[str, Any]], max_points: int = 36) -> List[Dict[str, Any]]:
|
||
if len(raw_points) <= max_points:
|
||
return list(raw_points)
|
||
bucket_size = max(len(raw_points) / max_points, 1)
|
||
compressed: List[Dict[str, Any]] = []
|
||
for idx in range(max_points):
|
||
start = int(round(idx * bucket_size))
|
||
end = int(round((idx + 1) * bucket_size))
|
||
bucket = raw_points[start:end] or raw_points[start:start + 1]
|
||
if not bucket:
|
||
continue
|
||
compressed.append(dict(bucket[-1]))
|
||
first = raw_points[0]
|
||
last = raw_points[-1]
|
||
if compressed:
|
||
compressed[0] = dict(first)
|
||
compressed[-1] = dict(last)
|
||
return compressed
|
||
|
||
chart_points = _compress_chart_points(points, max_points=36)
|
||
vip_values = [int(item.get("vip_count", 0) or 0) for item in chart_points]
|
||
diamond_values = [int(item.get("diamond_count", 0) or 0) for item in chart_points]
|
||
labels = [str(item.get("timestamp") or "")[-8:-3] for item in chart_points]
|
||
|
||
vip_min = min(vip_values)
|
||
vip_max = max(vip_values)
|
||
diamond_min = min(diamond_values)
|
||
diamond_max = max(diamond_values)
|
||
vip_padding = max(int((vip_max - vip_min) * 0.12), 60)
|
||
diamond_padding = 1 if diamond_max - diamond_min <= 2 else max(int((diamond_max - diamond_min) * 0.2), 1)
|
||
vip_display_min = max(vip_min - vip_padding, 0)
|
||
vip_display_max = vip_max + vip_padding
|
||
diamond_display_min = max(diamond_min - diamond_padding, 0)
|
||
diamond_display_max = diamond_max + diamond_padding
|
||
vip_span = max(vip_display_max - vip_display_min, 1)
|
||
diamond_span = max(diamond_display_max - diamond_display_min, 1)
|
||
step_x = plot_width / max(len(chart_points) - 1, 1)
|
||
bar_width = max(min(step_x * 0.58, 18), 6)
|
||
|
||
def x_at(index: int) -> float:
|
||
return padding_left + step_x * index
|
||
|
||
def y_vip(value: int) -> float:
|
||
ratio = (value - vip_display_min) / vip_span if vip_span else 0
|
||
return padding_top + plot_height - ratio * plot_height
|
||
|
||
def y_diamond(value: int) -> float:
|
||
ratio = (value - diamond_display_min) / diamond_span if diamond_span else 0
|
||
return padding_top + plot_height - ratio * plot_height
|
||
|
||
grid_lines = []
|
||
left_ticks = []
|
||
right_ticks = []
|
||
for idx in range(5):
|
||
ratio = idx / 4
|
||
y = padding_top + plot_height - ratio * plot_height
|
||
vip_tick = round(vip_display_min + vip_span * ratio)
|
||
diamond_tick = round(diamond_display_min + diamond_span * ratio)
|
||
grid_lines.append(
|
||
f'<line x1="{padding_left}" y1="{y:.1f}" x2="{width - padding_right}" y2="{y:.1f}" class="chart-grid" />'
|
||
)
|
||
left_ticks.append(
|
||
f'<text x="{padding_left - 10}" y="{y + 4:.1f}" text-anchor="end" class="axis-label axis-left">{vip_tick}</text>'
|
||
)
|
||
right_ticks.append(
|
||
f'<text x="{width - padding_right + 10}" y="{y + 4:.1f}" text-anchor="start" class="axis-label axis-right">{diamond_tick}</text>'
|
||
)
|
||
|
||
bars = []
|
||
line_points = []
|
||
dot_points = []
|
||
label_marks = []
|
||
annotations = []
|
||
if len(chart_points) <= 12:
|
||
label_indexes = list(range(len(chart_points)))
|
||
else:
|
||
label_indexes = sorted(set([0, len(chart_points) - 1] + [int(round((len(chart_points) - 1) * i / 6)) for i in range(1, 6)]))
|
||
vip_peak_index = vip_values.index(vip_max)
|
||
vip_valley_index = vip_values.index(vip_min)
|
||
diamond_latest_index = len(chart_points) - 1
|
||
for idx, item in enumerate(chart_points):
|
||
x = x_at(idx)
|
||
vip_y = y_vip(int(item.get("vip_count", 0) or 0))
|
||
diamond_y = y_diamond(int(item.get("diamond_count", 0) or 0))
|
||
bar_height = padding_top + plot_height - vip_y
|
||
bars.append(
|
||
f'<rect x="{x - bar_width / 2:.1f}" y="{vip_y:.1f}" width="{bar_width:.1f}" height="{bar_height:.1f}" rx="7" class="vip-bar" />'
|
||
)
|
||
line_points.append(f"{x:.1f},{diamond_y:.1f}")
|
||
dot_points.append(f'<circle cx="{x:.1f}" cy="{diamond_y:.1f}" r="3.5" class="diamond-dot" />')
|
||
if idx in label_indexes:
|
||
label_marks.append(
|
||
f'<text x="{x:.1f}" y="{height - 16}" text-anchor="middle" class="time-label">{_escape(labels[idx])}</text>'
|
||
)
|
||
if idx == vip_peak_index:
|
||
annotations.append(
|
||
f'<g class="chart-annotation">'
|
||
f'<text x="{x:.1f}" y="{max(vip_y - 16, 18):.1f}" text-anchor="middle" class="value-label vip">峰值 {vip_max}</text>'
|
||
f'<text x="{x:.1f}" y="{max(vip_y - 2, 30):.1f}" text-anchor="middle" class="value-sub-label vip">{_escape(labels[idx])}</text>'
|
||
f'</g>'
|
||
)
|
||
elif idx == vip_valley_index and vip_min != vip_max:
|
||
annotations.append(
|
||
f'<g class="chart-annotation">'
|
||
f'<text x="{x:.1f}" y="{min(vip_y + 18, padding_top + plot_height - 18):.1f}" text-anchor="middle" class="value-label vip soft">低点 {vip_min}</text>'
|
||
f'<text x="{x:.1f}" y="{min(vip_y + 32, padding_top + plot_height - 6):.1f}" text-anchor="middle" class="value-sub-label vip soft">{_escape(labels[idx])}</text>'
|
||
f'</g>'
|
||
)
|
||
if idx == diamond_latest_index:
|
||
annotations.append(
|
||
f'<g class="chart-annotation">'
|
||
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y - 14, 18):.1f}" text-anchor="start" class="value-label diamond">最新 {int(item.get("diamond_count", 0) or 0)}</text>'
|
||
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y, 30):.1f}" text-anchor="start" class="value-sub-label diamond">{_escape(labels[idx])}</text>'
|
||
f'</g>'
|
||
)
|
||
|
||
polyline = f'<polyline points="{" ".join(line_points)}" class="diamond-line" />'
|
||
vip_latest = int(summary.get("vip_latest", vip_values[-1]) or 0)
|
||
diamond_latest = int(summary.get("diamond_latest", diamond_values[-1]) or 0)
|
||
session_start = str(summary.get("session_start") or "")
|
||
first_point_time = str(summary.get("first_point_time") or "")
|
||
leading_gap_minutes = int(summary.get("leading_gap_minutes", 0) or 0)
|
||
note_text = ""
|
||
if session_start and first_point_time and leading_gap_minutes >= 20:
|
||
note_text = f"采样起点 {first_point_time[-8:-3]},早于该时段的人数数据未记录"
|
||
|
||
return f"""
|
||
<div class="chart-wrap">
|
||
<div class="chart-head">
|
||
<div class="chart-title">贵宾 / 钻粉趋势</div>
|
||
<div class="chart-legend">
|
||
<span class="legend-item"><span class="legend-swatch vip"></span>贵宾柱状</span>
|
||
<span class="legend-item"><span class="legend-swatch diamond"></span>钻粉折线</span>
|
||
<span class="legend-meta">最新:贵宾 {_escape(vip_latest)} / 钻粉 {_escape(diamond_latest)}</span>
|
||
</div>
|
||
</div>
|
||
{'<div class="chart-note">' + _escape(note_text) + '</div>' if note_text else ''}
|
||
<svg viewBox="0 0 {width} {height}" class="audience-chart" role="img" aria-label="贵宾与钻粉趋势图">
|
||
<defs>
|
||
<linearGradient id="vipBarGradient" x1="0" y1="0" x2="0" y2="1">
|
||
<stop offset="0%" stop-color="#2b59ff" stop-opacity="0.95" />
|
||
<stop offset="100%" stop-color="#7aa2ff" stop-opacity="0.72" />
|
||
</linearGradient>
|
||
<linearGradient id="diamondLineGradient" x1="0" y1="0" x2="1" y2="0">
|
||
<stop offset="0%" stop-color="#f59e0b" />
|
||
<stop offset="100%" stop-color="#ffd166" />
|
||
</linearGradient>
|
||
</defs>
|
||
<rect x="0" y="0" width="{width}" height="{height}" rx="24" class="chart-bg" />
|
||
{''.join(grid_lines)}
|
||
<line x1="{padding_left}" y1="{padding_top + plot_height:.1f}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis" />
|
||
<line x1="{padding_left}" y1="{padding_top}" x2="{padding_left}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
|
||
<line x1="{width - padding_right}" y1="{padding_top}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
|
||
{''.join(left_ticks)}
|
||
{''.join(right_ticks)}
|
||
{''.join(bars)}
|
||
{polyline}
|
||
{''.join(dot_points)}
|
||
{''.join(annotations)}
|
||
{''.join(label_marks)}
|
||
</svg>
|
||
</div>
|
||
"""
|
||
|
||
|
||
def render_daily_report_html(
|
||
payload: Dict[str, Any],
|
||
danmu_summary: str,
|
||
operator_summary_lines: List[str],
|
||
) -> str:
|
||
meta = payload.get("report_meta", {}) or {}
|
||
operator = payload.get("operator_metrics", {}) or {}
|
||
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
|
||
subtitle = (
|
||
f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}"
|
||
f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}"
|
||
)
|
||
|
||
metrics_html = "".join([
|
||
_render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"),
|
||
_render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"),
|
||
_render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"),
|
||
_render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"),
|
||
])
|
||
|
||
template_items = _build_template_items(payload)
|
||
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
|
||
audience_trend = payload.get("audience_trend", {}) or {}
|
||
|
||
lead_summary, danmu_bullets, fans_extract_bullets = _split_summary_blocks(danmu_summary)
|
||
danmu_bullets = _normalize_summary_bullets(payload, danmu_bullets, target_count=5)
|
||
fans_extract_bullets = _normalize_fans_extract_bullets(payload, fans_extract_bullets, target_count=6)
|
||
|
||
# 模板化渲染:
|
||
# 1. 报告样式和结构迁移到独立 HTML 模板文件;
|
||
# 2. Python 仅负责准备数据与片段,后续改 UI 只改模板即可。
|
||
renderer = HtmlTemplateRenderer()
|
||
return renderer.render(
|
||
"plugins/douyu/templates/daily_report.html",
|
||
{
|
||
"title_name": title_name,
|
||
"subtitle": subtitle,
|
||
# 下列片段由当前函数内辅助方法生成,文本内容已做转义,可安全注入模板。
|
||
"metrics_html": Markup(metrics_html),
|
||
"lead_summary": lead_summary,
|
||
"danmu_insights_html": Markup(_render_insight_cards(danmu_bullets)),
|
||
"fans_extract_html": Markup(_render_list(fans_extract_bullets, item_class="fans-list")),
|
||
"template_items_html": Markup(_render_list(template_items)),
|
||
"hot_times_html": Markup(_render_hot_times(payload.get("peak_buckets", []) or [])),
|
||
"audience_trend_html": Markup(_render_audience_trend_chart(audience_trend)),
|
||
"operator_summary_html": Markup(_render_list(operator_summary_lines)),
|
||
"badges_html": Markup(_render_badges(operator.get("top_badges", []) or [])),
|
||
"active_users_html": Markup(_render_active_users(top_active_users)),
|
||
},
|
||
)
|
||
|
||
|
||
def render_fans_daily_report_html(
|
||
payload: Dict[str, Any],
|
||
fans_report_text: str,
|
||
) -> str:
|
||
"""
|
||
渲染“粉丝向恶搞日报”。
|
||
这份报告的设计重点不是运营洞察,而是把当天最有节目效果的弹幕氛围单独做成一页,
|
||
方便群里直接转发、看图找乐子。
|
||
"""
|
||
meta = payload.get("report_meta", {}) or {}
|
||
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
|
||
subtitle = (
|
||
f"{meta.get('anchor_day', '')} | 弹幕 {meta.get('message_count', 0)} 条"
|
||
f" | 围观群众 {meta.get('unique_user_count', 0)} 人"
|
||
)
|
||
sections = _split_fans_report_blocks(fans_report_text)
|
||
laugh_points = _normalize_funny_bullets(payload, sections.get("laugh_points", []), target_count=4)
|
||
famous_scenes = _normalize_scene_bullets(payload, sections.get("famous_scenes", []), target_count=5)
|
||
meme_rank = _normalize_rank_bullets(payload, sections.get("meme_rank", []), target_count=3)
|
||
closing_text = _normalize_closing_text(payload, sections.get("closing", []))
|
||
lead_text = str(sections.get("lead") or "").strip()
|
||
if not lead_text:
|
||
lead_text = "今天这场直播的弹幕主打一个集体上头,观众一边盯着画面,一边忙着把梗越刷越离谱。"
|
||
topic_clusters = payload.get("topic_evidence_clusters", []) or []
|
||
hero_mentions = (
|
||
payload.get("compact_scene_material", {})
|
||
.get("semantic_fact_hints", {})
|
||
.get("hero_mentions", [])
|
||
or []
|
||
)
|
||
local_stats = payload.get("local_stats", {}) or {}
|
||
|
||
renderer = HtmlTemplateRenderer()
|
||
return renderer.render(
|
||
"plugins/douyu/templates/daily_fans_report.html",
|
||
{
|
||
"title_name": title_name,
|
||
"subtitle": subtitle,
|
||
"lead_text": lead_text,
|
||
# 粉丝版不再只做“乐子文案展示”,而是补进本地提纯后的有效信息区。
|
||
"fans_metrics_html": Markup(_render_fans_metric_cards(_build_fans_fun_metrics(payload))),
|
||
"effective_info_html": Markup(_render_fans_info_cards(_build_fans_effective_info_lines(payload))),
|
||
"topic_clusters_html": Markup(_render_topic_clusters(topic_clusters)),
|
||
"hero_mentions_html": Markup(_render_hero_mentions(hero_mentions)),
|
||
"hot_windows_html": Markup(_render_hot_window_cards(local_stats.get("peak_windows", []) or [])),
|
||
"repeat_digest_html": Markup(_render_repeat_digest(payload)),
|
||
"laugh_points_html": Markup(_render_list(laugh_points, item_class="funny-list")),
|
||
"famous_scenes_html": Markup(_render_fans_scene_cards(famous_scenes)),
|
||
"meme_rank_html": Markup(_render_rank_cards(meme_rank)),
|
||
"closing_text": closing_text,
|
||
},
|
||
)
|