Files
abot/plugins/douyu/report_template.py
liuwei 4386d0df75 重构斗鱼粉丝日报为信息优先结构
1. 更新粉丝日报提示词,优先提炼赛事、位置、英雄、对局和场外有效信息\n2. 扩展模板解析与渲染逻辑,支持今日重点信息、核心讨论话题、英雄与对局焦点等新板块\n3. 优化粉丝日报兜底文案与模板展示,让本地提纯结果和LLM语义总结共同参与输出
2026-04-29 15:06:56 +08:00

1089 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import html
from typing import Any, Dict, List
from markupsafe import Markup
from utils.html_template_renderer import HtmlTemplateRenderer
def _escape(value: Any) -> str:
return html.escape(str(value or ""))
def _clean_bullet_text(line: str) -> str:
"""
统一清洗模型输出里的 bullet 前缀。
这样做的目的有两个:
1. 兼容 `-`、`•`、`1.` 这类常见列表格式;
2. 让模板层只拿到干净文本,避免在 HTML 里再做重复判断。
"""
text = str(line or "").strip()
if not text:
return ""
if text.startswith("- "):
return text[2:].strip()
if text.startswith(""):
return text[1:].strip()
if len(text) > 2 and text[0].isdigit() and text[1] in {".", ""}:
return text[2:].strip()
return text
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
return (
'<div class="metric-card">'
f'<div class="metric-label">{_escape(label)}</div>'
f'<div class="metric-value">{_escape(value)}</div>'
f'<div class="metric-hint">{_escape(hint)}</div>'
"</div>"
)
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
if not items:
return ""
lis = "".join(f'<li>{_escape(item)}</li>' for item in items if str(item or "").strip())
return f'<ul class="{item_class}">{lis}</ul>' if lis else ""
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]:
# 这里把 LLM 返回的弹幕总结拆成三部分:
# 1) lead: 顶部总述段落
# 2) insight_items: 常规的复盘要点(运营/观察视角)
# 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点
# 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。
lead_parts = []
insight_items = []
fans_extract_items = []
in_fans_extract_block = False
for line in str(danmu_summary or "").splitlines():
stripped = line.strip()
if not stripped:
continue
# 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。
if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"):
in_fans_extract_block = True
continue
if stripped.startswith("- "):
if in_fans_extract_block:
fans_extract_items.append(stripped[2:].strip())
else:
insight_items.append(stripped[2:].strip())
else:
# 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。
if in_fans_extract_block:
fans_extract_items.append(stripped)
else:
lead_parts.append(stripped)
lead = " ".join(lead_parts).strip()
return lead, insight_items, fans_extract_items
def _split_fans_report_blocks(report_text: str) -> Dict[str, Any]:
"""
将“粉丝向恶搞日报”文本拆成模板需要的结构化区块。
约定模型尽量输出如下标题:
- 【今日笑点】
- 【弹幕名场面】
- 【梗王榜】
- 【收尾播报】
即便模型没有完全按约定输出,这里也会尽量兜底,保证页面不空。
"""
header_alias_map = {
"今日重点信息": "key_info",
"重点信息": "key_info",
"有效信息": "key_info",
"核心讨论话题": "topic_focus",
"讨论话题": "topic_focus",
"核心话题": "topic_focus",
"英雄与对局焦点": "hero_focus",
"对局焦点": "hero_focus",
"英雄焦点": "hero_focus",
"今日笑点": "laugh_points",
"笑点": "laugh_points",
"欢乐总结": "laugh_points",
"弹幕名场面": "famous_scenes",
"名场面": "famous_scenes",
"现场整活": "famous_scenes",
"梗王榜": "meme_rank",
"梗榜": "meme_rank",
"复读冠军": "meme_rank",
"收尾播报": "closing",
"结尾播报": "closing",
"结尾": "closing",
}
sections = {
"lead": "",
"key_info": [],
"topic_focus": [],
"hero_focus": [],
"laugh_points": [],
"famous_scenes": [],
"meme_rank": [],
"closing": [],
}
current_key = "lead"
lead_parts: List[str] = []
for raw_line in str(report_text or "").splitlines():
stripped = raw_line.strip()
if not stripped:
continue
normalized_title = stripped.strip("【】:# ").replace("", "").replace(":", "")
if normalized_title in header_alias_map:
current_key = header_alias_map[normalized_title]
continue
clean_text = _clean_bullet_text(stripped)
if not clean_text:
continue
if current_key == "lead":
lead_parts.append(clean_text)
else:
sections[current_key].append(clean_text)
sections["lead"] = " ".join(lead_parts).strip()
return sections
def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()]
merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()]
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
supplements: List[str] = []
if top_terms:
supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {''.join(top_terms[:5])} 展开。")
if merged_templates:
sample_templates = "".join(text[:24] for text in merged_templates[:3])
supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}")
if peak_buckets:
top_bucket = peak_buckets[0]
bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()]
if bucket_terms:
supplements.append(
f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {''.join(bucket_terms[:4])}"
)
if representative_messages:
supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]:
# 粉丝向萃取强调“现场感”,优先保留模型给出的条目;
# 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
representative_messages = payload.get("representative_messages", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
for item in representative_messages[:8]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if not content:
continue
supplements.append(f"{nickname}{content[:46]}")
for item in repeated_messages[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。")
for item in burst_terms[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_funny_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 4) -> List[str]:
"""
“今日笑点”优先保留模型自己写的梗概;
如果模型输出偏保守,就从高频梗、爆发词里补出几条更有现场感的句子。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"同一句梗反复刷了 {count} 次,直播间默认进入复读机模式:{text[:30]}")
for item in (payload.get("burst_terms", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪词「{text}」高频刷屏 {count} 次,说明这一段大家已经集体上头。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_scene_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
"""
“弹幕名场面”强调像直播间回放,因此优先从代表弹幕中补句子,
让最终成品看起来更像观众之间的接龙,而不是纯数据总结。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("representative_messages", []) or [])[:10]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if content:
supplements.append(f"{nickname}{content[:42]}")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_rank_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 3) -> List[str]:
"""
“梗王榜”兜底来源按优先级走:
1. 已聚合的长模板梗;
2. 重复短句;
3. 爆发情绪词。
这样即便模型漏写榜单,页面也能稳定展示“今天到底大家在刷什么”。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|全场 {count}")
for item in (payload.get("repeated_messages", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|复读 {count}")
for item in (payload.get("burst_terms", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text}|情绪爆发 {count}")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_closing_text(payload: Dict[str, Any], closing_items: List[str]) -> str:
"""
收尾句只有一句,优先保留模型原话;
如果模型没给,就用当天最高峰时段和热词拼一个轻松结尾。
"""
for item in closing_items:
value = str(item or "").strip()
if value:
return value
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
top_bucket = peak_buckets[0]
top_terms = [
str(term.get("term") or "").strip()
for term in (top_bucket.get("top_terms", []) or [])[:3]
if str(term.get("term") or "").strip()
]
return (
f"今晚最佳观影时段锁定 {str(top_bucket.get('start_time') or '')[-8:-3]}"
f"大家围着 {''.join(top_terms) or '节目效果'} 一起起哄,收工时空气里都还是梗。"
)
return "今天的直播结论很简单:操作未必全记住了,但弹幕梗已经自动住进群友脑回路。"
def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]:
items: List[str] = []
seen = set()
def push(text: str, suffix: str = "") -> None:
value = str(text or "").strip()
if not value:
return
normalized_key = value
if normalized_key in seen:
return
seen.add(normalized_key)
items.append(f"{value}{suffix}".strip())
for item in (payload.get("merged_templates", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"{count}次)")
for item in (payload.get("repeated_messages", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"{count}次)")
for item in (payload.get("burst_terms", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:36], f"(爆发 {count} 次)")
for item in (payload.get("top_terms", []) or [])[:6]:
term = str(item.get("term") or "").strip()
count = int(item.get("count", 0) or 0)
if term:
push(term, f"{count}次提及)")
return items[:limit]
def _render_insight_cards(items: List[str]) -> str:
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
blocks = []
for idx, item in enumerate(items[:6]):
extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else ""
blocks.append(
f'<div class="insight-card{extra_class}">'
f'<div class="insight-kicker">{_escape(labels[idx] if idx < len(labels) else "观察")}</div>'
f'<div class="insight-text">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _render_fans_scene_cards(items: List[str]) -> str:
blocks = []
for item in items[:6]:
blocks.append(
'<div class="fans-scene-card">'
f'<div class="fans-scene-quote">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _render_rank_cards(items: List[str]) -> str:
blocks = []
for idx, item in enumerate(items[:3], start=1):
blocks.append(
'<div class="meme-rank-card">'
f'<div class="meme-rank-no">TOP {idx}</div>'
f'<div class="meme-rank-text">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _build_fans_fun_metrics(payload: Dict[str, Any]) -> List[Dict[str, str]]:
"""
粉丝版避免直接沿用“运营指标”命名,改成更轻松的展示口径。
底层仍然来自同一份 payload所以既分风格又不损失真实性。
"""
meta = payload.get("report_meta", {}) or {}
peak_buckets = payload.get("peak_buckets", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
top_bucket = peak_buckets[0] if peak_buckets else {}
top_repeat = repeated_messages[0] if repeated_messages else {}
top_burst = burst_terms[0] if burst_terms else {}
return [
{
"label": "今日弹幕量",
"value": str(meta.get("message_count", 0) or 0),
"hint": "今天一共刷了多少句",
},
{
"label": "围观群众",
"value": str(meta.get("unique_user_count", 0) or 0),
"hint": "参与一起起哄的人数",
},
{
"label": "最高能时段",
"value": str(top_bucket.get("start_time") or "")[-8:-3] or "--:--",
"hint": "弹幕最炸裂的时间点",
},
{
"label": "今日爆词",
"value": str(top_burst.get("text") or top_repeat.get("text") or ""),
"hint": "刷得最凶的那句",
},
]
def _render_fans_metric_cards(metrics: List[Dict[str, str]]) -> str:
blocks = []
for item in metrics:
blocks.append(
'<div class="fans-metric-card">'
f'<div class="fans-metric-label">{_escape(item.get("label", ""))}</div>'
f'<div class="fans-metric-value">{_escape(item.get("value", ""))}</div>'
f'<div class="fans-metric-hint">{_escape(item.get("hint", ""))}</div>'
"</div>"
)
return "".join(blocks)
def _build_fans_effective_info_lines(payload: Dict[str, Any], limit: int = 6) -> List[str]:
"""
为粉丝日报补一层“有效信息速览”。
这里优先从本地提纯后的主题证据簇中拿事实,不依赖 LLM 自己归纳,
这样模板本身就能稳定承载更多有效信息。
"""
lines: List[str] = []
seen = set()
def push(text: str) -> None:
value = str(text or "").strip()
if not value or value in seen:
return
seen.add(value)
lines.append(value)
for item in (payload.get("topic_evidence_clusters", []) or [])[:6]:
label = str(item.get("label") or "").strip()
count = int(item.get("count", 0) or 0)
time_range = str(item.get("time_range") or "").strip()
samples = item.get("samples", []) or []
sample_text = ""
if samples:
first_sample = samples[0]
sample_text = str(first_sample.get("content") or "").strip()[:42]
if label and sample_text:
push(f"{label}{time_range or '全场'} 持续被讨论,相关弹幕约 {count} 条,代表内容是「{sample_text}」。")
elif label:
push(f"{label}是今天的高关注话题之一,相关弹幕约 {count} 条。")
if len(lines) >= limit:
return lines[:limit]
hero_mentions = payload.get("compact_scene_material", {}).get("semantic_fact_hints", {}).get("hero_mentions", []) or []
if hero_mentions:
hero_names = [str(item.get("hero") or "").strip() for item in hero_mentions[:4] if str(item.get("hero") or "").strip()]
if hero_names:
push(f"英雄讨论主要集中在 {''.join(hero_names)}")
hot_windows = payload.get("local_stats", {}).get("peak_windows", []) or []
if hot_windows:
top_window = hot_windows[0]
push(
f"最热窗口出现在 {str(top_window.get('start_time') or '')[-8:-3]}"
f"该时段累计弹幕 {int(top_window.get('message_count', 0) or 0)} 条。"
)
return lines[:limit]
def _build_local_topic_focus_lines(payload: Dict[str, Any], limit: int = 4) -> List[str]:
"""
为“核心讨论话题”补充本地可直接确定的摘要句。
这里故意不让模型自己重新发明事实,而是把主题簇已经聚好的结果转成人能读懂的话。
"""
lines: List[str] = []
seen = set()
def push(text: str) -> None:
value = str(text or "").strip()
if not value or value in seen:
return
seen.add(value)
lines.append(value)
for item in (payload.get("topic_evidence_clusters", []) or [])[:4]:
label = str(item.get("label") or "").strip()
keywords = [str(keyword).strip() for keyword in (item.get("keywords", []) or [])[:5] if str(keyword).strip()]
count = int(item.get("count", 0) or 0)
if label and keywords:
push(f"{label}是高频主线,相关讨论约 {count} 条,关键词集中在 {''.join(keywords)}")
elif label:
push(f"{label}是今天反复被拉出来聊的主线之一,相关讨论约 {count} 条。")
if len(lines) >= limit:
return lines[:limit]
return lines[:limit]
def _build_local_hero_focus_lines(payload: Dict[str, Any], limit: int = 4) -> List[str]:
"""
为“英雄与对局焦点”准备本地兜底。
这部分直接复用英雄提及聚类,优先强调出现频次和代表发言,方便粉丝快速看懂今天在聊什么英雄。
"""
hero_mentions = (
payload.get("compact_scene_material", {})
.get("semantic_fact_hints", {})
.get("hero_mentions", [])
or []
)
lines: List[str] = []
seen = set()
def push(text: str) -> None:
value = str(text or "").strip()
if not value or value in seen:
return
seen.add(value)
lines.append(value)
for item in hero_mentions[:4]:
hero_name = str(item.get("hero") or "").strip()
mention_count = int(item.get("mention_count", 0) or 0)
samples = item.get("samples", []) or []
sample_text = ""
if samples:
sample_text = str(samples[0].get("content") or "").strip()[:36]
if hero_name and sample_text:
push(f"{hero_name}被提到 {mention_count} 次,现场典型弹幕是「{sample_text}」。")
elif hero_name:
push(f"{hero_name}是今天的主要英雄讨论点之一,被提到 {mention_count} 次。")
if len(lines) >= limit:
return lines[:limit]
return lines[:limit]
def _normalize_information_section_items(
llm_items: List[str],
local_items: List[str],
target_count: int,
) -> List[str]:
"""
将模型提炼结果与本地事实兜底合并。
设计目标:
1. 先尊重模型已经总结好的“可读句子”;
2. 如果模型漏了,就用本地证据补足;
3. 始终保证最终区块有信息量,而不是空标题。
"""
normalized: List[str] = []
seen = set()
for source in (llm_items, local_items):
for item in source:
value = str(item or "").strip()
if not value or value in seen:
continue
seen.add(value)
normalized.append(value)
if len(normalized) >= target_count:
return normalized[:target_count]
return normalized[:target_count]
def _render_fans_info_cards(items: List[str]) -> str:
blocks = []
for item in items[:6]:
blocks.append(
'<div class="fans-info-card">'
f'<div class="fans-info-text">{_escape(item)}</div>'
'</div>'
)
return "".join(blocks)
def _render_topic_clusters(topic_clusters: List[Dict[str, Any]]) -> str:
blocks = []
for item in topic_clusters[:5]:
label = str(item.get("label") or "").strip()
if not label:
continue
count = int(item.get("count", 0) or 0)
user_count = int(item.get("user_count", 0) or 0)
time_range = str(item.get("time_range") or "").strip()
samples = item.get("samples", []) or []
sample_lines = []
for sample in samples[:3]:
content = str(sample.get("content") or "").strip()
nickname = str(sample.get("nickname") or "").strip() or "观众"
hm = str(sample.get("hm") or "").strip()
if content:
sample_lines.append(f"{hm} {nickname}{content[:42]}")
sample_html = "".join(f'<li>{_escape(line)}</li>' for line in sample_lines)
blocks.append(
'<div class="topic-cluster-card">'
f'<div class="topic-cluster-title">{_escape(label)}</div>'
f'<div class="topic-cluster-meta">{_escape(time_range or "全场")} · {count} 条讨论 · {user_count} 人参与</div>'
f'<ul class="topic-cluster-list">{sample_html}</ul>'
'</div>'
)
return "".join(blocks)
def _render_hero_mentions(hero_mentions: List[Dict[str, Any]]) -> str:
blocks = []
for item in hero_mentions[:4]:
hero_name = str(item.get("hero") or "").strip()
if not hero_name:
continue
mention_count = int(item.get("mention_count", 0) or 0)
user_count = int(item.get("user_count", 0) or 0)
sample_text = ""
samples = item.get("samples", []) or []
if samples:
sample = samples[0]
sample_text = str(sample.get("content") or "").strip()[:46]
blocks.append(
'<div class="hero-mention-card">'
f'<div class="hero-mention-name">{_escape(hero_name)}</div>'
f'<div class="hero-mention-meta">{mention_count} 次提及 / {user_count} 人讨论</div>'
f'<div class="hero-mention-sample">{_escape(sample_text)}</div>'
'</div>'
)
return "".join(blocks)
def _render_hot_window_cards(hot_windows: List[Dict[str, Any]]) -> str:
blocks = []
for item in hot_windows[:4]:
start_time = str(item.get("start_time") or "")[-8:-3]
message_count = int(item.get("message_count", 0) or 0)
user_count = int(item.get("user_count", 0) or 0)
blocks.append(
'<div class="fans-hot-window-card">'
f'<div class="fans-hot-window-time">{_escape(start_time)}</div>'
f'<div class="fans-hot-window-meta">{message_count} 条弹幕 / {user_count} 人参与</div>'
'</div>'
)
return "".join(blocks)
def _render_repeat_digest(payload: Dict[str, Any]) -> str:
local_stats = payload.get("local_stats", {}) or {}
repeated_messages = local_stats.get("top_repeated_messages", []) or []
emotion_bursts = local_stats.get("top_emotion_bursts", []) or []
blocks = []
for item in repeated_messages[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
blocks.append(
'<div class="repeat-chip">'
f'<span class="repeat-chip-text">{_escape(text[:28])}</span>'
f'<span class="repeat-chip-count">{count} 次</span>'
'</div>'
)
for item in emotion_bursts[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
blocks.append(
'<div class="repeat-chip emotion">'
f'<span class="repeat-chip-text">{_escape(text[:18])}</span>'
f'<span class="repeat-chip-count">{count} 次</span>'
'</div>'
)
return "".join(blocks)
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_badges[:6]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
blocks.append(
'<div class="badge-chip">'
f'<span class="badge-name">{_escape(badge_name)}</span>'
f'<span class="badge-meta">{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条</span>'
"</div>"
)
return "".join(blocks)
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
blocks = []
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
terms = [term for term in terms if term]
blocks.append(
'<div class="hot-card">'
f'<div class="hot-time">{_escape(start_time)}</div>'
f'<div class="hot-count">{_escape(item.get("message_count", 0))} 条弹幕</div>'
f'<div class="hot-terms">{_escape(" / ".join(terms))}</div>'
"</div>"
)
return "".join(blocks)
def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_active_users[:10]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
fans_name = str(item.get("fans_name") or "").strip()
fans_level = int(item.get("fans_level", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
chips = []
if fans_name:
fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name
chips.append(f'<span class="user-chip fans">{_escape(fans_label)}</span>')
if room_level > 0:
chips.append(f'<span class="user-chip room">{_escape(f"平台 Lv{room_level}")}</span>')
meta_html = "".join(chips) if chips else '<span class="user-chip plain">普通观众</span>'
blocks.append(
'<div class="active-user-card">'
'<div class="active-user-top">'
f'<div class="active-user-name">{_escape(nickname)}</div>'
f'<div class="active-user-count">{_escape(message_count)}<span>条</span></div>'
'</div>'
f'<div class="active-user-meta">{meta_html}</div>'
'</div>'
)
return "".join(blocks)
def _render_audience_trend_chart(audience_trend: Dict[str, Any]) -> str:
points = audience_trend.get("points", []) or []
if len(points) < 2:
return (
'<div class="chart-empty">'
'<div class="chart-empty-title">人数趋势</div>'
'<div class="chart-empty-desc">当前直播场次还没有足够的 WS 采样点,暂时无法绘制趋势图。</div>'
'</div>'
)
width = 820
height = 276
padding_left = 62
padding_right = 64
padding_top = 24
padding_bottom = 38
plot_width = width - padding_left - padding_right
plot_height = height - padding_top - padding_bottom
summary = audience_trend.get("summary", {}) or {}
def _compress_chart_points(raw_points: List[Dict[str, Any]], max_points: int = 36) -> List[Dict[str, Any]]:
if len(raw_points) <= max_points:
return list(raw_points)
bucket_size = max(len(raw_points) / max_points, 1)
compressed: List[Dict[str, Any]] = []
for idx in range(max_points):
start = int(round(idx * bucket_size))
end = int(round((idx + 1) * bucket_size))
bucket = raw_points[start:end] or raw_points[start:start + 1]
if not bucket:
continue
compressed.append(dict(bucket[-1]))
first = raw_points[0]
last = raw_points[-1]
if compressed:
compressed[0] = dict(first)
compressed[-1] = dict(last)
return compressed
chart_points = _compress_chart_points(points, max_points=36)
vip_values = [int(item.get("vip_count", 0) or 0) for item in chart_points]
diamond_values = [int(item.get("diamond_count", 0) or 0) for item in chart_points]
labels = [str(item.get("timestamp") or "")[-8:-3] for item in chart_points]
vip_min = min(vip_values)
vip_max = max(vip_values)
diamond_min = min(diamond_values)
diamond_max = max(diamond_values)
vip_padding = max(int((vip_max - vip_min) * 0.12), 60)
diamond_padding = 1 if diamond_max - diamond_min <= 2 else max(int((diamond_max - diamond_min) * 0.2), 1)
vip_display_min = max(vip_min - vip_padding, 0)
vip_display_max = vip_max + vip_padding
diamond_display_min = max(diamond_min - diamond_padding, 0)
diamond_display_max = diamond_max + diamond_padding
vip_span = max(vip_display_max - vip_display_min, 1)
diamond_span = max(diamond_display_max - diamond_display_min, 1)
step_x = plot_width / max(len(chart_points) - 1, 1)
bar_width = max(min(step_x * 0.58, 18), 6)
def x_at(index: int) -> float:
return padding_left + step_x * index
def y_vip(value: int) -> float:
ratio = (value - vip_display_min) / vip_span if vip_span else 0
return padding_top + plot_height - ratio * plot_height
def y_diamond(value: int) -> float:
ratio = (value - diamond_display_min) / diamond_span if diamond_span else 0
return padding_top + plot_height - ratio * plot_height
grid_lines = []
left_ticks = []
right_ticks = []
for idx in range(5):
ratio = idx / 4
y = padding_top + plot_height - ratio * plot_height
vip_tick = round(vip_display_min + vip_span * ratio)
diamond_tick = round(diamond_display_min + diamond_span * ratio)
grid_lines.append(
f'<line x1="{padding_left}" y1="{y:.1f}" x2="{width - padding_right}" y2="{y:.1f}" class="chart-grid" />'
)
left_ticks.append(
f'<text x="{padding_left - 10}" y="{y + 4:.1f}" text-anchor="end" class="axis-label axis-left">{vip_tick}</text>'
)
right_ticks.append(
f'<text x="{width - padding_right + 10}" y="{y + 4:.1f}" text-anchor="start" class="axis-label axis-right">{diamond_tick}</text>'
)
bars = []
line_points = []
dot_points = []
label_marks = []
annotations = []
if len(chart_points) <= 12:
label_indexes = list(range(len(chart_points)))
else:
label_indexes = sorted(set([0, len(chart_points) - 1] + [int(round((len(chart_points) - 1) * i / 6)) for i in range(1, 6)]))
vip_peak_index = vip_values.index(vip_max)
vip_valley_index = vip_values.index(vip_min)
diamond_latest_index = len(chart_points) - 1
for idx, item in enumerate(chart_points):
x = x_at(idx)
vip_y = y_vip(int(item.get("vip_count", 0) or 0))
diamond_y = y_diamond(int(item.get("diamond_count", 0) or 0))
bar_height = padding_top + plot_height - vip_y
bars.append(
f'<rect x="{x - bar_width / 2:.1f}" y="{vip_y:.1f}" width="{bar_width:.1f}" height="{bar_height:.1f}" rx="7" class="vip-bar" />'
)
line_points.append(f"{x:.1f},{diamond_y:.1f}")
dot_points.append(f'<circle cx="{x:.1f}" cy="{diamond_y:.1f}" r="3.5" class="diamond-dot" />')
if idx in label_indexes:
label_marks.append(
f'<text x="{x:.1f}" y="{height - 16}" text-anchor="middle" class="time-label">{_escape(labels[idx])}</text>'
)
if idx == vip_peak_index:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{x:.1f}" y="{max(vip_y - 16, 18):.1f}" text-anchor="middle" class="value-label vip">峰值 {vip_max}</text>'
f'<text x="{x:.1f}" y="{max(vip_y - 2, 30):.1f}" text-anchor="middle" class="value-sub-label vip">{_escape(labels[idx])}</text>'
f'</g>'
)
elif idx == vip_valley_index and vip_min != vip_max:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{x:.1f}" y="{min(vip_y + 18, padding_top + plot_height - 18):.1f}" text-anchor="middle" class="value-label vip soft">低点 {vip_min}</text>'
f'<text x="{x:.1f}" y="{min(vip_y + 32, padding_top + plot_height - 6):.1f}" text-anchor="middle" class="value-sub-label vip soft">{_escape(labels[idx])}</text>'
f'</g>'
)
if idx == diamond_latest_index:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y - 14, 18):.1f}" text-anchor="start" class="value-label diamond">最新 {int(item.get("diamond_count", 0) or 0)}</text>'
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y, 30):.1f}" text-anchor="start" class="value-sub-label diamond">{_escape(labels[idx])}</text>'
f'</g>'
)
polyline = f'<polyline points="{" ".join(line_points)}" class="diamond-line" />'
vip_latest = int(summary.get("vip_latest", vip_values[-1]) or 0)
diamond_latest = int(summary.get("diamond_latest", diamond_values[-1]) or 0)
session_start = str(summary.get("session_start") or "")
first_point_time = str(summary.get("first_point_time") or "")
leading_gap_minutes = int(summary.get("leading_gap_minutes", 0) or 0)
note_text = ""
if session_start and first_point_time and leading_gap_minutes >= 20:
note_text = f"采样起点 {first_point_time[-8:-3]},早于该时段的人数数据未记录"
return f"""
<div class="chart-wrap">
<div class="chart-head">
<div class="chart-title">贵宾 / 钻粉趋势</div>
<div class="chart-legend">
<span class="legend-item"><span class="legend-swatch vip"></span>贵宾柱状</span>
<span class="legend-item"><span class="legend-swatch diamond"></span>钻粉折线</span>
<span class="legend-meta">最新:贵宾 {_escape(vip_latest)} / 钻粉 {_escape(diamond_latest)}</span>
</div>
</div>
{'<div class="chart-note">' + _escape(note_text) + '</div>' if note_text else ''}
<svg viewBox="0 0 {width} {height}" class="audience-chart" role="img" aria-label="贵宾与钻粉趋势图">
<defs>
<linearGradient id="vipBarGradient" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stop-color="#2b59ff" stop-opacity="0.95" />
<stop offset="100%" stop-color="#7aa2ff" stop-opacity="0.72" />
</linearGradient>
<linearGradient id="diamondLineGradient" x1="0" y1="0" x2="1" y2="0">
<stop offset="0%" stop-color="#f59e0b" />
<stop offset="100%" stop-color="#ffd166" />
</linearGradient>
</defs>
<rect x="0" y="0" width="{width}" height="{height}" rx="24" class="chart-bg" />
{''.join(grid_lines)}
<line x1="{padding_left}" y1="{padding_top + plot_height:.1f}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis" />
<line x1="{padding_left}" y1="{padding_top}" x2="{padding_left}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
<line x1="{width - padding_right}" y1="{padding_top}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
{''.join(left_ticks)}
{''.join(right_ticks)}
{''.join(bars)}
{polyline}
{''.join(dot_points)}
{''.join(annotations)}
{''.join(label_marks)}
</svg>
</div>
"""
def render_daily_report_html(
payload: Dict[str, Any],
danmu_summary: str,
operator_summary_lines: List[str],
) -> str:
meta = payload.get("report_meta", {}) or {}
operator = payload.get("operator_metrics", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
subtitle = (
f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}"
f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}"
)
metrics_html = "".join([
_render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"),
_render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"),
_render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"),
_render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"),
])
template_items = _build_template_items(payload)
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
audience_trend = payload.get("audience_trend", {}) or {}
lead_summary, danmu_bullets, fans_extract_bullets = _split_summary_blocks(danmu_summary)
danmu_bullets = _normalize_summary_bullets(payload, danmu_bullets, target_count=5)
fans_extract_bullets = _normalize_fans_extract_bullets(payload, fans_extract_bullets, target_count=6)
# 模板化渲染:
# 1. 报告样式和结构迁移到独立 HTML 模板文件;
# 2. Python 仅负责准备数据与片段,后续改 UI 只改模板即可。
renderer = HtmlTemplateRenderer()
return renderer.render(
"plugins/douyu/templates/daily_report.html",
{
"title_name": title_name,
"subtitle": subtitle,
# 下列片段由当前函数内辅助方法生成,文本内容已做转义,可安全注入模板。
"metrics_html": Markup(metrics_html),
"lead_summary": lead_summary,
"danmu_insights_html": Markup(_render_insight_cards(danmu_bullets)),
"fans_extract_html": Markup(_render_list(fans_extract_bullets, item_class="fans-list")),
"template_items_html": Markup(_render_list(template_items)),
"hot_times_html": Markup(_render_hot_times(payload.get("peak_buckets", []) or [])),
"audience_trend_html": Markup(_render_audience_trend_chart(audience_trend)),
"operator_summary_html": Markup(_render_list(operator_summary_lines)),
"badges_html": Markup(_render_badges(operator.get("top_badges", []) or [])),
"active_users_html": Markup(_render_active_users(top_active_users)),
},
)
def render_fans_daily_report_html(
payload: Dict[str, Any],
fans_report_text: str,
) -> str:
"""
渲染“粉丝向恶搞日报”。
这份报告的设计重点不是运营洞察,而是把当天最有节目效果的弹幕氛围单独做成一页,
方便群里直接转发、看图找乐子。
"""
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
subtitle = (
f"{meta.get('anchor_day', '')} | 弹幕 {meta.get('message_count', 0)}"
f" | 围观群众 {meta.get('unique_user_count', 0)}"
)
sections = _split_fans_report_blocks(fans_report_text)
effective_info_lines = _normalize_information_section_items(
sections.get("key_info", []),
_build_fans_effective_info_lines(payload),
target_count=6,
)
topic_focus_lines = _normalize_information_section_items(
sections.get("topic_focus", []),
_build_local_topic_focus_lines(payload),
target_count=4,
)
hero_focus_lines = _normalize_information_section_items(
sections.get("hero_focus", []),
_build_local_hero_focus_lines(payload),
target_count=4,
)
laugh_points = _normalize_funny_bullets(payload, sections.get("laugh_points", []), target_count=4)
famous_scenes = _normalize_scene_bullets(payload, sections.get("famous_scenes", []), target_count=5)
meme_rank = _normalize_rank_bullets(payload, sections.get("meme_rank", []), target_count=3)
closing_text = _normalize_closing_text(payload, sections.get("closing", []))
lead_text = str(sections.get("lead") or "").strip()
if not lead_text:
lead_text = "今天这场直播的弹幕主打一个集体上头,观众一边盯着画面,一边忙着把梗越刷越离谱。"
topic_clusters = payload.get("topic_evidence_clusters", []) or []
hero_mentions = (
payload.get("compact_scene_material", {})
.get("semantic_fact_hints", {})
.get("hero_mentions", [])
or []
)
local_stats = payload.get("local_stats", {}) or {}
renderer = HtmlTemplateRenderer()
return renderer.render(
"plugins/douyu/templates/daily_fans_report.html",
{
"title_name": title_name,
"subtitle": subtitle,
"lead_text": lead_text,
# 粉丝版不再只做“乐子文案展示”,而是补进本地提纯后的有效信息区。
"fans_metrics_html": Markup(_render_fans_metric_cards(_build_fans_fun_metrics(payload))),
"effective_summary_html": Markup(_render_list(effective_info_lines, item_class="section-summary-list")),
"effective_info_html": Markup(_render_fans_info_cards(effective_info_lines)),
"topic_focus_html": Markup(_render_list(topic_focus_lines, item_class="section-summary-list")),
"topic_clusters_html": Markup(_render_topic_clusters(topic_clusters)),
"hero_focus_html": Markup(_render_list(hero_focus_lines, item_class="section-summary-list")),
"hero_mentions_html": Markup(_render_hero_mentions(hero_mentions)),
"hot_windows_html": Markup(_render_hot_window_cards(local_stats.get("peak_windows", []) or [])),
"repeat_digest_html": Markup(_render_repeat_digest(payload)),
"laugh_points_html": Markup(_render_list(laugh_points, item_class="funny-list")),
"famous_scenes_html": Markup(_render_fans_scene_cards(famous_scenes)),
"meme_rank_html": Markup(_render_rank_cards(meme_rank)),
"closing_text": closing_text,
},
)