Files
abot/plugins/douyu/report_template.py
liuwei 515361c34f 新增斗鱼粉丝向恶搞弹幕日报并拆分运营版
1. 新增斗鱼粉丝日报和强制粉丝日报命令,手动触发时走独立发送链路。\n2. 为粉丝向日报补充独立提示词、兜底文案、缓存分类和图片渲染逻辑。\n3. 新增粉丝向日报 HTML 模板与模板解析函数,整体风格调整为开心欢乐的整活总结。\n4. 保留原有斗鱼运营日报定时与发送逻辑,避免两种日报互相污染。
2026-04-27 12:22:30 +08:00

802 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import html
from typing import Any, Dict, List
from markupsafe import Markup
from utils.html_template_renderer import HtmlTemplateRenderer
def _escape(value: Any) -> str:
return html.escape(str(value or ""))
def _clean_bullet_text(line: str) -> str:
"""
统一清洗模型输出里的 bullet 前缀。
这样做的目的有两个:
1. 兼容 `-`、`•`、`1.` 这类常见列表格式;
2. 让模板层只拿到干净文本,避免在 HTML 里再做重复判断。
"""
text = str(line or "").strip()
if not text:
return ""
if text.startswith("- "):
return text[2:].strip()
if text.startswith(""):
return text[1:].strip()
if len(text) > 2 and text[0].isdigit() and text[1] in {".", ""}:
return text[2:].strip()
return text
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
return (
'<div class="metric-card">'
f'<div class="metric-label">{_escape(label)}</div>'
f'<div class="metric-value">{_escape(value)}</div>'
f'<div class="metric-hint">{_escape(hint)}</div>'
"</div>"
)
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
if not items:
return ""
lis = "".join(f'<li>{_escape(item)}</li>' for item in items if str(item or "").strip())
return f'<ul class="{item_class}">{lis}</ul>' if lis else ""
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str], List[str]]:
# 这里把 LLM 返回的弹幕总结拆成三部分:
# 1) lead: 顶部总述段落
# 2) insight_items: 常规的复盘要点(运营/观察视角)
# 3) fans_extract_items: 专门给粉丝看的“弹幕萃取”要点
# 约定:当检测到“【粉丝向弹幕萃取】”或同义标记后,后续条目归入 fans_extract_items。
lead_parts = []
insight_items = []
fans_extract_items = []
in_fans_extract_block = False
for line in str(danmu_summary or "").splitlines():
stripped = line.strip()
if not stripped:
continue
# 兼容不同模型可能产出的标题样式,尽量把粉丝向内容稳定识别出来。
if stripped.startswith("【粉丝向弹幕萃取】") or stripped.startswith("粉丝向弹幕萃取") or stripped.startswith("给粉丝看的弹幕萃取"):
in_fans_extract_block = True
continue
if stripped.startswith("- "):
if in_fans_extract_block:
fans_extract_items.append(stripped[2:].strip())
else:
insight_items.append(stripped[2:].strip())
else:
# 非 bullet 文本在粉丝区块中也保留,避免模型偶发输出短段落导致信息丢失。
if in_fans_extract_block:
fans_extract_items.append(stripped)
else:
lead_parts.append(stripped)
lead = " ".join(lead_parts).strip()
return lead, insight_items, fans_extract_items
def _split_fans_report_blocks(report_text: str) -> Dict[str, Any]:
"""
将“粉丝向恶搞日报”文本拆成模板需要的结构化区块。
约定模型尽量输出如下标题:
- 【今日笑点】
- 【弹幕名场面】
- 【梗王榜】
- 【收尾播报】
即便模型没有完全按约定输出,这里也会尽量兜底,保证页面不空。
"""
header_alias_map = {
"今日笑点": "laugh_points",
"笑点": "laugh_points",
"欢乐总结": "laugh_points",
"弹幕名场面": "famous_scenes",
"名场面": "famous_scenes",
"现场整活": "famous_scenes",
"梗王榜": "meme_rank",
"梗榜": "meme_rank",
"复读冠军": "meme_rank",
"收尾播报": "closing",
"结尾播报": "closing",
"结尾": "closing",
}
sections = {
"lead": "",
"laugh_points": [],
"famous_scenes": [],
"meme_rank": [],
"closing": [],
}
current_key = "lead"
lead_parts: List[str] = []
for raw_line in str(report_text or "").splitlines():
stripped = raw_line.strip()
if not stripped:
continue
normalized_title = stripped.strip("【】:# ").replace("", "").replace(":", "")
if normalized_title in header_alias_map:
current_key = header_alias_map[normalized_title]
continue
clean_text = _clean_bullet_text(stripped)
if not clean_text:
continue
if current_key == "lead":
lead_parts.append(clean_text)
else:
sections[current_key].append(clean_text)
sections["lead"] = " ".join(lead_parts).strip()
return sections
def _normalize_summary_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or []) if str(item.get("term") or "").strip()]
merged_templates = [str(item.get("text") or "").strip() for item in (payload.get("merged_templates", []) or []) if str(item.get("text") or "").strip()]
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
supplements: List[str] = []
if top_terms:
supplements.append(f"讨论焦点比较集中,弹幕反复围绕 {''.join(top_terms[:5])} 展开。")
if merged_templates:
sample_templates = "".join(text[:24] for text in merged_templates[:3])
supplements.append(f"复读和共识梗比较强,重复内容主要集中在 {sample_templates}")
if peak_buckets:
top_bucket = peak_buckets[0]
bucket_terms = [str(term.get('term') or '').strip() for term in (top_bucket.get("top_terms", []) or []) if str(term.get('term') or '').strip()]
if bucket_terms:
supplements.append(
f"高峰时段出现在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,话题明显偏向 {''.join(bucket_terms[:4])}"
)
if representative_messages:
supplements.append("代表性发言里既有操作反馈,也有玩梗调侃和情绪宣泄,互动意愿比较强。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_fans_extract_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 6) -> List[str]:
# 粉丝向萃取强调“现场感”,优先保留模型给出的条目;
# 不足时再从代表弹幕/重复梗中补齐,避免页面出现空区块。
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
representative_messages = payload.get("representative_messages", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
for item in representative_messages[:8]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if not content:
continue
supplements.append(f"{nickname}{content[:46]}")
for item in repeated_messages[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"复读梗「{text[:34]}」出现 {count} 次。")
for item in burst_terms[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪短词「{text}」集中刷了 {count} 次。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_funny_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 4) -> List[str]:
"""
“今日笑点”优先保留模型自己写的梗概;
如果模型输出偏保守,就从高频梗、爆发词里补出几条更有现场感的句子。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"同一句梗反复刷了 {count} 次,直播间默认进入复读机模式:{text[:30]}")
for item in (payload.get("burst_terms", []) or [])[:4]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"情绪词「{text}」高频刷屏 {count} 次,说明这一段大家已经集体上头。")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_scene_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 5) -> List[str]:
"""
“弹幕名场面”强调像直播间回放,因此优先从代表弹幕中补句子,
让最终成品看起来更像观众之间的接龙,而不是纯数据总结。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("representative_messages", []) or [])[:10]:
nickname = str(item.get("nickname") or "").strip() or "观众"
content = str(item.get("content") or "").strip()
if content:
supplements.append(f"{nickname}{content[:42]}")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_rank_bullets(payload: Dict[str, Any], items: List[str], target_count: int = 3) -> List[str]:
"""
“梗王榜”兜底来源按优先级走:
1. 已聚合的长模板梗;
2. 重复短句;
3. 爆发情绪词。
这样即便模型漏写榜单,页面也能稳定展示“今天到底大家在刷什么”。
"""
normalized = [str(item or "").strip() for item in items if str(item or "").strip()]
if len(normalized) >= target_count:
return normalized[:target_count]
supplements: List[str] = []
for item in (payload.get("merged_templates", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|全场 {count}")
for item in (payload.get("repeated_messages", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text[:30]}|复读 {count}")
for item in (payload.get("burst_terms", []) or [])[:3]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
supplements.append(f"{text}|情绪爆发 {count}")
existing = set(normalized)
for item in supplements:
if item not in existing:
normalized.append(item)
existing.add(item)
if len(normalized) >= target_count:
break
return normalized[:target_count]
def _normalize_closing_text(payload: Dict[str, Any], closing_items: List[str]) -> str:
"""
收尾句只有一句,优先保留模型原话;
如果模型没给,就用当天最高峰时段和热词拼一个轻松结尾。
"""
for item in closing_items:
value = str(item or "").strip()
if value:
return value
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
top_bucket = peak_buckets[0]
top_terms = [
str(term.get("term") or "").strip()
for term in (top_bucket.get("top_terms", []) or [])[:3]
if str(term.get("term") or "").strip()
]
return (
f"今晚最佳观影时段锁定 {str(top_bucket.get('start_time') or '')[-8:-3]}"
f"大家围着 {''.join(top_terms) or '节目效果'} 一起起哄,收工时空气里都还是梗。"
)
return "今天的直播结论很简单:操作未必全记住了,但弹幕梗已经自动住进群友脑回路。"
def _build_template_items(payload: Dict[str, Any], limit: int = 8) -> List[str]:
items: List[str] = []
seen = set()
def push(text: str, suffix: str = "") -> None:
value = str(text or "").strip()
if not value:
return
normalized_key = value
if normalized_key in seen:
return
seen.add(normalized_key)
items.append(f"{value}{suffix}".strip())
for item in (payload.get("merged_templates", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"{count}次)")
for item in (payload.get("repeated_messages", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:72], f"{count}次)")
for item in (payload.get("burst_terms", []) or [])[:6]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
push(text[:36], f"(爆发 {count} 次)")
for item in (payload.get("top_terms", []) or [])[:6]:
term = str(item.get("term") or "").strip()
count = int(item.get("count", 0) or 0)
if term:
push(term, f"{count}次提及)")
return items[:limit]
def _render_insight_cards(items: List[str]) -> str:
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
blocks = []
for idx, item in enumerate(items[:6]):
extra_class = " full-span" if len(items[:6]) % 2 == 1 and idx == len(items[:6]) - 1 else ""
blocks.append(
f'<div class="insight-card{extra_class}">'
f'<div class="insight-kicker">{_escape(labels[idx] if idx < len(labels) else "观察")}</div>'
f'<div class="insight-text">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _render_fans_scene_cards(items: List[str]) -> str:
blocks = []
for item in items[:6]:
blocks.append(
'<div class="fans-scene-card">'
f'<div class="fans-scene-quote">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _render_rank_cards(items: List[str]) -> str:
blocks = []
for idx, item in enumerate(items[:3], start=1):
blocks.append(
'<div class="meme-rank-card">'
f'<div class="meme-rank-no">TOP {idx}</div>'
f'<div class="meme-rank-text">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _build_fans_fun_metrics(payload: Dict[str, Any]) -> List[Dict[str, str]]:
"""
粉丝版避免直接沿用“运营指标”命名,改成更轻松的展示口径。
底层仍然来自同一份 payload所以既分风格又不损失真实性。
"""
meta = payload.get("report_meta", {}) or {}
peak_buckets = payload.get("peak_buckets", []) or []
repeated_messages = payload.get("repeated_messages", []) or []
burst_terms = payload.get("burst_terms", []) or []
top_bucket = peak_buckets[0] if peak_buckets else {}
top_repeat = repeated_messages[0] if repeated_messages else {}
top_burst = burst_terms[0] if burst_terms else {}
return [
{
"label": "今日弹幕量",
"value": str(meta.get("message_count", 0) or 0),
"hint": "今天一共刷了多少句",
},
{
"label": "围观群众",
"value": str(meta.get("unique_user_count", 0) or 0),
"hint": "参与一起起哄的人数",
},
{
"label": "最高能时段",
"value": str(top_bucket.get("start_time") or "")[-8:-3] or "--:--",
"hint": "弹幕最炸裂的时间点",
},
{
"label": "今日爆词",
"value": str(top_burst.get("text") or top_repeat.get("text") or ""),
"hint": "刷得最凶的那句",
},
]
def _render_fans_metric_cards(metrics: List[Dict[str, str]]) -> str:
blocks = []
for item in metrics:
blocks.append(
'<div class="fans-metric-card">'
f'<div class="fans-metric-label">{_escape(item.get("label", ""))}</div>'
f'<div class="fans-metric-value">{_escape(item.get("value", ""))}</div>'
f'<div class="fans-metric-hint">{_escape(item.get("hint", ""))}</div>'
"</div>"
)
return "".join(blocks)
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_badges[:6]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
blocks.append(
'<div class="badge-chip">'
f'<span class="badge-name">{_escape(badge_name)}</span>'
f'<span class="badge-meta">{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条</span>'
"</div>"
)
return "".join(blocks)
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
blocks = []
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
terms = [term for term in terms if term]
blocks.append(
'<div class="hot-card">'
f'<div class="hot-time">{_escape(start_time)}</div>'
f'<div class="hot-count">{_escape(item.get("message_count", 0))} 条弹幕</div>'
f'<div class="hot-terms">{_escape(" / ".join(terms))}</div>'
"</div>"
)
return "".join(blocks)
def _render_active_users(top_active_users: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_active_users[:10]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
fans_name = str(item.get("fans_name") or "").strip()
fans_level = int(item.get("fans_level", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
chips = []
if fans_name:
fans_label = f"{fans_name} Lv{fans_level}" if fans_level > 0 else fans_name
chips.append(f'<span class="user-chip fans">{_escape(fans_label)}</span>')
if room_level > 0:
chips.append(f'<span class="user-chip room">{_escape(f"平台 Lv{room_level}")}</span>')
meta_html = "".join(chips) if chips else '<span class="user-chip plain">普通观众</span>'
blocks.append(
'<div class="active-user-card">'
'<div class="active-user-top">'
f'<div class="active-user-name">{_escape(nickname)}</div>'
f'<div class="active-user-count">{_escape(message_count)}<span>条</span></div>'
'</div>'
f'<div class="active-user-meta">{meta_html}</div>'
'</div>'
)
return "".join(blocks)
def _render_audience_trend_chart(audience_trend: Dict[str, Any]) -> str:
points = audience_trend.get("points", []) or []
if len(points) < 2:
return (
'<div class="chart-empty">'
'<div class="chart-empty-title">人数趋势</div>'
'<div class="chart-empty-desc">当前直播场次还没有足够的 WS 采样点,暂时无法绘制趋势图。</div>'
'</div>'
)
width = 820
height = 276
padding_left = 62
padding_right = 64
padding_top = 24
padding_bottom = 38
plot_width = width - padding_left - padding_right
plot_height = height - padding_top - padding_bottom
summary = audience_trend.get("summary", {}) or {}
def _compress_chart_points(raw_points: List[Dict[str, Any]], max_points: int = 36) -> List[Dict[str, Any]]:
if len(raw_points) <= max_points:
return list(raw_points)
bucket_size = max(len(raw_points) / max_points, 1)
compressed: List[Dict[str, Any]] = []
for idx in range(max_points):
start = int(round(idx * bucket_size))
end = int(round((idx + 1) * bucket_size))
bucket = raw_points[start:end] or raw_points[start:start + 1]
if not bucket:
continue
compressed.append(dict(bucket[-1]))
first = raw_points[0]
last = raw_points[-1]
if compressed:
compressed[0] = dict(first)
compressed[-1] = dict(last)
return compressed
chart_points = _compress_chart_points(points, max_points=36)
vip_values = [int(item.get("vip_count", 0) or 0) for item in chart_points]
diamond_values = [int(item.get("diamond_count", 0) or 0) for item in chart_points]
labels = [str(item.get("timestamp") or "")[-8:-3] for item in chart_points]
vip_min = min(vip_values)
vip_max = max(vip_values)
diamond_min = min(diamond_values)
diamond_max = max(diamond_values)
vip_padding = max(int((vip_max - vip_min) * 0.12), 60)
diamond_padding = 1 if diamond_max - diamond_min <= 2 else max(int((diamond_max - diamond_min) * 0.2), 1)
vip_display_min = max(vip_min - vip_padding, 0)
vip_display_max = vip_max + vip_padding
diamond_display_min = max(diamond_min - diamond_padding, 0)
diamond_display_max = diamond_max + diamond_padding
vip_span = max(vip_display_max - vip_display_min, 1)
diamond_span = max(diamond_display_max - diamond_display_min, 1)
step_x = plot_width / max(len(chart_points) - 1, 1)
bar_width = max(min(step_x * 0.58, 18), 6)
def x_at(index: int) -> float:
return padding_left + step_x * index
def y_vip(value: int) -> float:
ratio = (value - vip_display_min) / vip_span if vip_span else 0
return padding_top + plot_height - ratio * plot_height
def y_diamond(value: int) -> float:
ratio = (value - diamond_display_min) / diamond_span if diamond_span else 0
return padding_top + plot_height - ratio * plot_height
grid_lines = []
left_ticks = []
right_ticks = []
for idx in range(5):
ratio = idx / 4
y = padding_top + plot_height - ratio * plot_height
vip_tick = round(vip_display_min + vip_span * ratio)
diamond_tick = round(diamond_display_min + diamond_span * ratio)
grid_lines.append(
f'<line x1="{padding_left}" y1="{y:.1f}" x2="{width - padding_right}" y2="{y:.1f}" class="chart-grid" />'
)
left_ticks.append(
f'<text x="{padding_left - 10}" y="{y + 4:.1f}" text-anchor="end" class="axis-label axis-left">{vip_tick}</text>'
)
right_ticks.append(
f'<text x="{width - padding_right + 10}" y="{y + 4:.1f}" text-anchor="start" class="axis-label axis-right">{diamond_tick}</text>'
)
bars = []
line_points = []
dot_points = []
label_marks = []
annotations = []
if len(chart_points) <= 12:
label_indexes = list(range(len(chart_points)))
else:
label_indexes = sorted(set([0, len(chart_points) - 1] + [int(round((len(chart_points) - 1) * i / 6)) for i in range(1, 6)]))
vip_peak_index = vip_values.index(vip_max)
vip_valley_index = vip_values.index(vip_min)
diamond_latest_index = len(chart_points) - 1
for idx, item in enumerate(chart_points):
x = x_at(idx)
vip_y = y_vip(int(item.get("vip_count", 0) or 0))
diamond_y = y_diamond(int(item.get("diamond_count", 0) or 0))
bar_height = padding_top + plot_height - vip_y
bars.append(
f'<rect x="{x - bar_width / 2:.1f}" y="{vip_y:.1f}" width="{bar_width:.1f}" height="{bar_height:.1f}" rx="7" class="vip-bar" />'
)
line_points.append(f"{x:.1f},{diamond_y:.1f}")
dot_points.append(f'<circle cx="{x:.1f}" cy="{diamond_y:.1f}" r="3.5" class="diamond-dot" />')
if idx in label_indexes:
label_marks.append(
f'<text x="{x:.1f}" y="{height - 16}" text-anchor="middle" class="time-label">{_escape(labels[idx])}</text>'
)
if idx == vip_peak_index:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{x:.1f}" y="{max(vip_y - 16, 18):.1f}" text-anchor="middle" class="value-label vip">峰值 {vip_max}</text>'
f'<text x="{x:.1f}" y="{max(vip_y - 2, 30):.1f}" text-anchor="middle" class="value-sub-label vip">{_escape(labels[idx])}</text>'
f'</g>'
)
elif idx == vip_valley_index and vip_min != vip_max:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{x:.1f}" y="{min(vip_y + 18, padding_top + plot_height - 18):.1f}" text-anchor="middle" class="value-label vip soft">低点 {vip_min}</text>'
f'<text x="{x:.1f}" y="{min(vip_y + 32, padding_top + plot_height - 6):.1f}" text-anchor="middle" class="value-sub-label vip soft">{_escape(labels[idx])}</text>'
f'</g>'
)
if idx == diamond_latest_index:
annotations.append(
f'<g class="chart-annotation">'
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y - 14, 18):.1f}" text-anchor="start" class="value-label diamond">最新 {int(item.get("diamond_count", 0) or 0)}</text>'
f'<text x="{min(x + 10, width - padding_right - 6):.1f}" y="{max(diamond_y, 30):.1f}" text-anchor="start" class="value-sub-label diamond">{_escape(labels[idx])}</text>'
f'</g>'
)
polyline = f'<polyline points="{" ".join(line_points)}" class="diamond-line" />'
vip_latest = int(summary.get("vip_latest", vip_values[-1]) or 0)
diamond_latest = int(summary.get("diamond_latest", diamond_values[-1]) or 0)
session_start = str(summary.get("session_start") or "")
first_point_time = str(summary.get("first_point_time") or "")
leading_gap_minutes = int(summary.get("leading_gap_minutes", 0) or 0)
note_text = ""
if session_start and first_point_time and leading_gap_minutes >= 20:
note_text = f"采样起点 {first_point_time[-8:-3]},早于该时段的人数数据未记录"
return f"""
<div class="chart-wrap">
<div class="chart-head">
<div class="chart-title">贵宾 / 钻粉趋势</div>
<div class="chart-legend">
<span class="legend-item"><span class="legend-swatch vip"></span>贵宾柱状</span>
<span class="legend-item"><span class="legend-swatch diamond"></span>钻粉折线</span>
<span class="legend-meta">最新:贵宾 {_escape(vip_latest)} / 钻粉 {_escape(diamond_latest)}</span>
</div>
</div>
{'<div class="chart-note">' + _escape(note_text) + '</div>' if note_text else ''}
<svg viewBox="0 0 {width} {height}" class="audience-chart" role="img" aria-label="贵宾与钻粉趋势图">
<defs>
<linearGradient id="vipBarGradient" x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stop-color="#2b59ff" stop-opacity="0.95" />
<stop offset="100%" stop-color="#7aa2ff" stop-opacity="0.72" />
</linearGradient>
<linearGradient id="diamondLineGradient" x1="0" y1="0" x2="1" y2="0">
<stop offset="0%" stop-color="#f59e0b" />
<stop offset="100%" stop-color="#ffd166" />
</linearGradient>
</defs>
<rect x="0" y="0" width="{width}" height="{height}" rx="24" class="chart-bg" />
{''.join(grid_lines)}
<line x1="{padding_left}" y1="{padding_top + plot_height:.1f}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis" />
<line x1="{padding_left}" y1="{padding_top}" x2="{padding_left}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
<line x1="{width - padding_right}" y1="{padding_top}" x2="{width - padding_right}" y2="{padding_top + plot_height:.1f}" class="chart-axis soft" />
{''.join(left_ticks)}
{''.join(right_ticks)}
{''.join(bars)}
{polyline}
{''.join(dot_points)}
{''.join(annotations)}
{''.join(label_marks)}
</svg>
</div>
"""
def render_daily_report_html(
payload: Dict[str, Any],
danmu_summary: str,
operator_summary_lines: List[str],
) -> str:
meta = payload.get("report_meta", {}) or {}
operator = payload.get("operator_metrics", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
subtitle = (
f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}"
f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}"
)
metrics_html = "".join([
_render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"),
_render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"),
_render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"),
_render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"),
])
template_items = _build_template_items(payload)
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
audience_trend = payload.get("audience_trend", {}) or {}
lead_summary, danmu_bullets, fans_extract_bullets = _split_summary_blocks(danmu_summary)
danmu_bullets = _normalize_summary_bullets(payload, danmu_bullets, target_count=5)
fans_extract_bullets = _normalize_fans_extract_bullets(payload, fans_extract_bullets, target_count=6)
# 模板化渲染:
# 1. 报告样式和结构迁移到独立 HTML 模板文件;
# 2. Python 仅负责准备数据与片段,后续改 UI 只改模板即可。
renderer = HtmlTemplateRenderer()
return renderer.render(
"plugins/douyu/templates/daily_report.html",
{
"title_name": title_name,
"subtitle": subtitle,
# 下列片段由当前函数内辅助方法生成,文本内容已做转义,可安全注入模板。
"metrics_html": Markup(metrics_html),
"lead_summary": lead_summary,
"danmu_insights_html": Markup(_render_insight_cards(danmu_bullets)),
"fans_extract_html": Markup(_render_list(fans_extract_bullets, item_class="fans-list")),
"template_items_html": Markup(_render_list(template_items)),
"hot_times_html": Markup(_render_hot_times(payload.get("peak_buckets", []) or [])),
"audience_trend_html": Markup(_render_audience_trend_chart(audience_trend)),
"operator_summary_html": Markup(_render_list(operator_summary_lines)),
"badges_html": Markup(_render_badges(operator.get("top_badges", []) or [])),
"active_users_html": Markup(_render_active_users(top_active_users)),
},
)
def render_fans_daily_report_html(
payload: Dict[str, Any],
fans_report_text: str,
) -> str:
"""
渲染“粉丝向恶搞日报”。
这份报告的设计重点不是运营洞察,而是把当天最有节目效果的弹幕氛围单独做成一页,
方便群里直接转发、看图找乐子。
"""
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
subtitle = (
f"{meta.get('anchor_day', '')} | 弹幕 {meta.get('message_count', 0)}"
f" | 围观群众 {meta.get('unique_user_count', 0)}"
)
sections = _split_fans_report_blocks(fans_report_text)
laugh_points = _normalize_funny_bullets(payload, sections.get("laugh_points", []), target_count=4)
famous_scenes = _normalize_scene_bullets(payload, sections.get("famous_scenes", []), target_count=5)
meme_rank = _normalize_rank_bullets(payload, sections.get("meme_rank", []), target_count=3)
closing_text = _normalize_closing_text(payload, sections.get("closing", []))
lead_text = str(sections.get("lead") or "").strip()
if not lead_text:
lead_text = "今天这场直播的弹幕主打一个集体上头,观众一边盯着画面,一边忙着把梗越刷越离谱。"
renderer = HtmlTemplateRenderer()
return renderer.render(
"plugins/douyu/templates/daily_fans_report.html",
{
"title_name": title_name,
"subtitle": subtitle,
"lead_text": lead_text,
# 粉丝版刻意弱化“分析感”,下面几块都只展示娱乐化后的结果。
"fans_metrics_html": Markup(_render_fans_metric_cards(_build_fans_fun_metrics(payload))),
"laugh_points_html": Markup(_render_list(laugh_points, item_class="funny-list")),
"famous_scenes_html": Markup(_render_fans_scene_cards(famous_scenes)),
"meme_rank_html": Markup(_render_rank_cards(meme_rank)),
"closing_text": closing_text,
},
)