feat(douyu): add daily danmu report pipeline

This commit is contained in:
liuwei
2026-04-08 13:17:29 +08:00
parent 6401ec02de
commit 66f4a3e604
5 changed files with 2181 additions and 3 deletions

View File

@@ -4,3 +4,25 @@ command = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表"]
check_interval_minutes = 5
api_url_template = "https://www.douyu.com/betard/{room_id}"
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
session_cutoff_hour = 6
merge_gap_hours = 4
daily_report_enable = true
daily_report_time = "10:05"
daily_report_min_messages = 120
daily_report_use_llm = false
daily_report_max_sessions = 4
daily_report_max_length = 1800
daily_report_send_image = true
[Douyu.report_api]
provider = "openai_compatible"
api_base_url = ""
endpoint = "chat/completions"
api_key = ""
model = ""
timeout_seconds = 45
temperature = 0.3
max_tokens = 900
stream = true
max_retries = 3
retry_delay_seconds = 1.0

View File

@@ -0,0 +1,839 @@
# -*- coding: utf-8 -*-
import os
import re
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
class DouyuDanmuSummaryHelper:
"""斗鱼弹幕场次抽取与压缩辅助器。"""
LINE_PATTERN = re.compile(
r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+"
r"(?P<nickname>.*?)\s+\(UID:\s*(?P<uid>[^,\)]+)(?P<profile>.*?)\)(?P<content>.*)$"
)
STOPWORDS: Set[str] = {
"哈哈", "哈哈哈", "hh", "hhh", "啊啊", "啊啊啊", "可以", "这个", "那个", "真的", "就是",
"一下", "主播", "兄弟", "老铁", "大家", "我们", "你们", "他们", "不是", "什么", "怎么",
"为啥", "有点", "感觉", "这里", "那里", "然后", "今天", "刚刚", "现在", "一个", "一下子",
}
SHORT_BURST_WORDS: Set[str] = {
"666", "6666", "", "牛逼", "", "", "", "", "", "", "", "", "",
"哈哈", "哈哈哈", "笑死", "卧槽", "wc", "awsl", "nb", "nbl", "c", "6",
}
NOISE_PATTERNS = [
re.compile(r"本条弹幕.*机器人", re.I),
re.compile(r"请不要.*统计机器人数", re.I),
re.compile(r"原来直播间有.*非机器人用户", re.I),
re.compile(r"如果您的直播内容是以观看他人操作为主", re.I),
]
TEMPLATE_HINT_PATTERNS = [
re.compile(r"闭目不语任由"),
re.compile(r"你就忍心一辈"),
re.compile(r"刚刚偷看你直播被老板发现"),
re.compile(r"还好老板是蝙蝠侠"),
re.compile(r"原来直播间有这么多姐妹"),
re.compile(r"之前我一直不敢发弹幕"),
re.compile(r"强子也就是吃了直播的红利"),
re.compile(r"你声音好像强子"),
re.compile(r"怎么回事强子"),
re.compile(r"你是个人吗强"),
]
TEMPLATE_MIN_LENGTH = 14
TEMPLATE_MIN_REPEAT = 4
REPEAT_MIN_COUNT = 3
@classmethod
def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]:
text = str(line or "").strip()
if not text:
return None
match = cls.LINE_PATTERN.match(text)
if not match:
return None
try:
ts = datetime.strptime(match.group("timestamp"), "%Y-%m-%d %H:%M:%S")
except Exception:
return None
return {
"timestamp": ts,
"timestamp_text": match.group("timestamp"),
"nickname": match.group("nickname").strip(),
"uid": str(match.group("uid")).strip(),
**cls._parse_profile_text(str(match.group("profile") or "")),
"content": match.group("content").strip(),
"raw": text,
}
@classmethod
def load_session_messages(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[Dict[str, Any]]:
segments = cls._normalize_segments(session.get("segments", []) or [])
if not room_id or not segments:
return []
date_keys = sorted({segment["start"].strftime("%Y%m%d") for segment in segments} |
{segment["end"].strftime("%Y%m%d") for segment in segments})
collected: List[Dict[str, Any]] = []
for date_key in date_keys:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if not parsed:
continue
if cls._in_any_segment(parsed["timestamp"], segments):
collected.append(parsed)
except Exception:
continue
return collected
@classmethod
def load_day_messages(cls, room_id: str, date_key: str, base_dir: str = "temp") -> List[Dict[str, Any]]:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
return []
collected: List[Dict[str, Any]] = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if parsed:
collected.append(parsed)
return collected
@classmethod
def build_summary_material(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 10,
) -> Dict[str, Any]:
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages(normalized)
organized_messages = prepared["organized_messages"]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
deduped_messages = cls._dedupe_consecutive_messages(organized_messages)
burst_terms = cls._build_burst_terms(organized_messages)
top_terms = cls._extract_top_terms(organized_messages, limit=30)
bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes)
peak_buckets = sorted(bucket_stats, key=lambda item: item.get("message_count", 0), reverse=True)[:top_bucket_count]
return {
"session_id": session.get("session_id", ""),
"room_id": room_id,
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"deduped_message_count": len(deduped_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
"merged_templates": prepared["merged_templates"],
"top_terms": top_terms,
"burst_terms": burst_terms,
"time_buckets": bucket_stats,
"peak_buckets": peak_buckets,
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
}
@classmethod
def build_llm_payload(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 8,
top_repeat_count: int = 24,
) -> Dict[str, Any]:
"""
面向 LLM 的高保真弹幕载荷。
规则:
1. 仅过滤平台/机器人类系统噪音。
2. 相同或高度模板化的内容做聚合,不直接删除。
3. 其他不同内容尽量保留,并按时段/热点组织给模型。
"""
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages(normalized)
organized_messages = prepared["organized_messages"]
bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes)
peak_buckets = sorted(
bucket_stats,
key=lambda item: item.get("message_count", 0),
reverse=True,
)[:top_bucket_count]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
return {
"session_meta": {
"room_id": room_id,
"session_id": session.get("session_id", ""),
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
},
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
"cleaning_rules": [
"仅过滤系统噪音、机器人探测、平台提示类弹幕。",
"明显重复的长模板文案按内容聚合,保留出现次数、人数、首末时间。",
"其他相同内容按重复短语归并,但不抹掉不同观点和不同句式。",
"高峰时段补充原始弹幕样本,方便 LLM 还原语境。",
],
"merged_templates": prepared["merged_templates"],
"repeated_messages": cls._build_repeated_messages(
organized_messages,
limit=top_repeat_count,
),
"top_terms": cls._extract_top_terms(organized_messages, limit=30),
"burst_terms": cls._build_burst_terms(organized_messages),
"peak_buckets": cls._simplify_peak_buckets(peak_buckets),
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=8),
}
@staticmethod
def _parse_profile_text(profile_text: str) -> Dict[str, Any]:
text = str(profile_text or "").strip()
room_level = 0
fans_name = ""
fans_level = 0
noble_name = ""
room_match = re.search(r",\s*Lv\s*(\d+)", text, re.I)
if room_match:
try:
room_level = int(room_match.group(1))
except Exception:
room_level = 0
fans_match = re.search(r"/\s*([^/]+?)\s+Lv\s*(\d+)", text, re.I)
if fans_match:
fans_name = str(fans_match.group(1) or "").strip()
try:
fans_level = int(fans_match.group(2))
except Exception:
fans_level = 0
noble_match = re.search(r"/\s*(骑士|子爵|伯爵|公爵|国王|皇帝|游侠|超级皇帝|幻神)\b", text)
if noble_match:
noble_name = str(noble_match.group(1) or "").strip()
return {
"room_level": room_level,
"fans_name": fans_name,
"fans_level": fans_level,
"noble_name": noble_name,
"has_fans_badge": bool(fans_name),
"has_noble": bool(noble_name),
}
@classmethod
def _build_operator_metrics(cls, messages: List[Dict[str, Any]], organized_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
user_profiles: Dict[str, Dict[str, Any]] = {}
user_message_count = Counter()
user_organized_count = Counter()
fans_badge_users: Set[str] = set()
noble_users: Set[str] = set()
high_room_level_users: Set[str] = set()
high_fans_level_users: Set[str] = set()
fans_badge_message_count = 0
noble_message_count = 0
room_level_histogram = Counter()
fans_level_histogram = Counter()
badge_user_counter = Counter()
badge_message_counter = Counter()
for item in messages:
uid = str(item.get("uid") or "").strip()
if not uid:
continue
user_message_count[uid] += 1
profile = user_profiles.setdefault(uid, {
"uid": uid,
"nickname": str(item.get("nickname") or "").strip(),
"room_level": 0,
"fans_name": "",
"fans_level": 0,
"noble_name": "",
})
room_level = int(item.get("room_level") or 0)
fans_level = int(item.get("fans_level") or 0)
fans_name = str(item.get("fans_name") or "").strip()
noble_name = str(item.get("noble_name") or "").strip()
if room_level > int(profile.get("room_level") or 0):
profile["room_level"] = room_level
if fans_level > int(profile.get("fans_level") or 0):
profile["fans_level"] = fans_level
if fans_name and not profile.get("fans_name"):
profile["fans_name"] = fans_name
if noble_name and not profile.get("noble_name"):
profile["noble_name"] = noble_name
if profile.get("nickname") == "" and str(item.get("nickname") or "").strip():
profile["nickname"] = str(item.get("nickname") or "").strip()
if fans_name:
fans_badge_users.add(uid)
fans_badge_message_count += 1
badge_user_counter[fans_name] += 0
badge_message_counter[fans_name] += 1
if noble_name:
noble_users.add(uid)
noble_message_count += 1
for uid, profile in user_profiles.items():
room_level = int(profile.get("room_level") or 0)
fans_level = int(profile.get("fans_level") or 0)
fans_name = str(profile.get("fans_name") or "").strip()
noble_name = str(profile.get("noble_name") or "").strip()
room_level_histogram[cls._level_bucket(room_level)] += 1
if fans_name:
badge_user_counter[fans_name] += 1
fans_level_histogram[cls._fans_level_bucket(fans_level)] += 1
if noble_name:
noble_users.add(uid)
if room_level >= 30:
high_room_level_users.add(uid)
if fans_level >= 10:
high_fans_level_users.add(uid)
for item in organized_messages:
uid = str(item.get("uid") or "").strip()
if uid:
user_organized_count[uid] += 1
active_users_5plus = sum(1 for uid, count in user_message_count.items() if count >= 5)
active_users_10plus = sum(1 for uid, count in user_message_count.items() if count >= 10)
top_active_users = []
for uid, count in user_message_count.most_common(12):
profile = user_profiles.get(uid, {})
top_active_users.append({
"uid": uid,
"nickname": str(profile.get("nickname") or ""),
"message_count": count,
"organized_message_count": int(user_organized_count.get(uid, 0) or 0),
"room_level": int(profile.get("room_level", 0) or 0),
"fans_name": str(profile.get("fans_name") or ""),
"fans_level": int(profile.get("fans_level", 0) or 0),
"noble_name": str(profile.get("noble_name") or ""),
})
top_badges = []
for badge_name, unique_user_count in badge_user_counter.most_common(10):
if not badge_name:
continue
top_badges.append({
"badge_name": badge_name,
"user_count": unique_user_count,
"message_count": int(badge_message_counter.get(badge_name, 0) or 0),
})
total_unique_users = max(len(user_profiles), 1)
return {
"active_unique_users": len(user_profiles),
"active_users_5plus": active_users_5plus,
"active_users_10plus": active_users_10plus,
"fans_badge_user_count": len(fans_badge_users),
"fans_badge_user_ratio": round(len(fans_badge_users) / total_unique_users, 4),
"fans_badge_message_count": fans_badge_message_count,
"high_room_level_user_count": len(high_room_level_users),
"high_room_level_threshold": 30,
"high_fans_level_user_count": len(high_fans_level_users),
"high_fans_level_threshold": 10,
"noble_user_count": len(noble_users),
"noble_message_count": noble_message_count,
"room_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(room_level_histogram.items())
],
"fans_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(fans_level_histogram.items())
],
"top_badges": top_badges,
"top_active_users": top_active_users,
}
@staticmethod
def _level_bucket(level: int) -> str:
if level >= 40:
return "40+"
if level >= 30:
return "30-39"
if level >= 20:
return "20-29"
if level >= 10:
return "10-19"
return "1-9"
@staticmethod
def _fans_level_bucket(level: int) -> str:
if level >= 20:
return "20+"
if level >= 15:
return "15-19"
if level >= 10:
return "10-14"
if level >= 5:
return "5-9"
return "1-4"
@classmethod
def infer_sessions_from_messages(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
merge_gap_hours: int = 4,
min_session_messages: int = 50,
) -> List[Dict[str, Any]]:
if not messages:
return []
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
sessions: List[Dict[str, Any]] = []
current_messages: List[Dict[str, Any]] = []
def flush_current():
if len(current_messages) < min_session_messages:
return
session = cls._build_inferred_session(
room_id,
current_messages,
session_cutoff_hour=session_cutoff_hour,
)
if session:
sessions.append(session)
prev_dt: Optional[datetime] = None
for item in ordered:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
if prev_dt is not None:
gap_seconds = (ts - prev_dt).total_seconds()
if gap_seconds > merge_gap_hours * 3600:
flush_current()
current_messages = []
current_messages.append(item)
prev_dt = ts
flush_current()
return sessions
@classmethod
def _build_inferred_session(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
) -> Optional[Dict[str, Any]]:
if not messages:
return None
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
start_dt = ordered[0]["timestamp"]
end_dt = ordered[-1]["timestamp"]
anchor_dt = start_dt
if start_dt.hour < session_cutoff_hour:
from datetime import timedelta
anchor_dt = start_dt - timedelta(days=1)
anchor_day = anchor_dt.strftime("%Y-%m-%d")
segments = []
seg_start = ordered[0]["timestamp"]
prev_dt = ordered[0]["timestamp"]
for item in ordered[1:]:
current_dt = item["timestamp"]
if (current_dt - prev_dt).total_seconds() > 30 * 60:
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": prev_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
seg_start = current_dt
prev_dt = current_dt
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
return {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_{start_dt.strftime('%H%M%S')}",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": segments,
"is_live": False,
"source": "inferred_from_danmu",
}
@staticmethod
def _normalize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, datetime]]:
normalized = []
for item in segments:
try:
start_dt = datetime.strptime(str(item.get("start_time") or ""), "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(str(item.get("end_time") or ""), "%Y-%m-%d %H:%M:%S")
except Exception:
continue
if end_dt < start_dt:
continue
normalized.append({"start": start_dt, "end": end_dt})
return normalized
@staticmethod
def _serialize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, str]]:
result = []
for item in segments:
start_time = str(item.get("start_time") or "").strip()
end_time = str(item.get("end_time") or "").strip()
if start_time and end_time:
result.append({"start_time": start_time, "end_time": end_time})
return result
@staticmethod
def _in_any_segment(target_dt: datetime, segments: List[Dict[str, datetime]]) -> bool:
for segment in segments:
if segment["start"] <= target_dt <= segment["end"]:
return True
return False
@staticmethod
def _dedupe_consecutive_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result: List[Dict[str, Any]] = []
prev_key = None
repeat_count = 0
for item in messages:
current_key = (item.get("uid"), item.get("content"))
if current_key == prev_key:
repeat_count += 1
result[-1]["repeat_count"] = repeat_count
continue
copied = dict(item)
copied["repeat_count"] = 1
result.append(copied)
prev_key = current_key
repeat_count = 1
return result
@classmethod
def _filter_noise_messages(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
continue
result.append(item)
return result
@classmethod
def _prepare_messages(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
noise_messages: List[Dict[str, Any]] = []
candidate_messages: List[Dict[str, Any]] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
noise_messages.append(item)
continue
candidate_messages.append(item)
merged_templates, organized_messages = cls._merge_template_messages(candidate_messages)
return {
"noise_messages": noise_messages,
"merged_templates": merged_templates,
"organized_messages": organized_messages,
}
@classmethod
def _is_noise_message(cls, content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
for pattern in cls.NOISE_PATTERNS:
if pattern.search(text):
return True
if len(text) >= 30 and len(set(text)) <= 6:
return True
return False
@classmethod
def _merge_template_messages(cls, messages: List[Dict[str, Any]]) -> (List[Dict[str, Any]], List[Dict[str, Any]]):
normalized_counter = Counter()
normalized_to_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
normalized = cls._normalize_template_text(str(item.get("content") or ""))
if not normalized:
continue
normalized_counter[normalized] += 1
normalized_to_messages[normalized].append(item)
template_keys = {
key for key, count in normalized_counter.items()
if len(key) >= cls.TEMPLATE_MIN_LENGTH and count >= cls.TEMPLATE_MIN_REPEAT
}
for key in list(normalized_counter.keys()):
if any(pattern.search(key) for pattern in cls.TEMPLATE_HINT_PATTERNS):
template_keys.add(key)
merged_templates: List[Dict[str, Any]] = []
organized_messages: List[Dict[str, Any]] = []
for normalized, items in normalized_to_messages.items():
if normalized in template_keys:
first = items[0]
merged_templates.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({str(item.get('uid') or '') for item in items if str(item.get('uid') or '').strip()}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
else:
organized_messages.extend(items)
merged_templates.sort(key=lambda item: item.get("count", 0), reverse=True)
organized_messages.sort(key=lambda item: item.get("timestamp") or datetime.min)
return merged_templates[:20], organized_messages
@classmethod
def _build_repeated_messages(cls, messages: List[Dict[str, Any]], limit: int = 24) -> List[Dict[str, Any]]:
grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
normalized = cls._normalize_template_text(content)
if not normalized:
continue
if cls._looks_like_pure_punctuation(content):
continue
grouped[normalized].append(item)
repeated_messages: List[Dict[str, Any]] = []
for items in grouped.values():
if len(items) < cls.REPEAT_MIN_COUNT:
continue
first = items[0]
repeated_messages.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({
str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()
}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True)
return repeated_messages[:limit]
@classmethod
def _build_burst_terms(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
counters: Dict[str, Dict[str, Any]] = {}
for item in messages:
content = str(item.get("content") or "").strip().lower()
if not content:
continue
if content not in cls.SHORT_BURST_WORDS and len(content) > 6:
continue
target = counters.setdefault(content, {"text": content, "count": 0, "users": set()})
target["count"] += 1
target["users"].add(str(item.get("uid") or ""))
result = []
for item in sorted(counters.values(), key=lambda entry: entry["count"], reverse=True)[:15]:
result.append({
"text": item["text"],
"count": item["count"],
"user_count": len([uid for uid in item["users"] if uid]),
})
return result
@classmethod
def _extract_top_terms(cls, messages: List[Dict[str, Any]], limit: int = 30) -> List[Dict[str, Any]]:
counter = Counter()
for item in messages:
for token in cls._tokenize(str(item.get("content") or "")):
counter[token] += 1
result = []
for term, count in counter.most_common(limit):
result.append({"term": term, "count": count})
return result
@classmethod
def _tokenize(cls, content: str) -> List[str]:
text = str(content or "").lower().strip()
if not text:
return []
chinese_terms = re.findall(r"[\u4e00-\u9fff]{2,6}", text)
alpha_terms = re.findall(r"[a-z0-9_\-]{3,20}", text)
tokens = []
for token in chinese_terms + alpha_terms:
normalized = token.strip().lower()
if not normalized or normalized in cls.STOPWORDS:
continue
if normalized.isdigit() and len(normalized) <= 2:
continue
tokens.append(normalized)
return tokens
@staticmethod
def _normalize_template_text(content: str) -> str:
text = str(content or "").strip().lower()
if not text:
return ""
text = re.sub(r"\s+", "", text)
text = re.sub(r"[`~!@#$%^&*()_\-+=\[\]{}\\|;:'\",.<>/?,。!?、…()【】《》“”‘’·]", "", text)
text = re.sub(r"(.)\1{4,}", r"\1\1\1", text)
return text
@classmethod
def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]:
buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
bucket_minute = (ts.minute // minutes) * minutes
bucket_key = ts.replace(minute=bucket_minute, second=0)
buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item)
results: List[Dict[str, Any]] = []
for bucket_start, items in sorted(buckets.items()):
top_terms = cls._extract_top_terms(items, limit=8)
burst_terms = cls._build_burst_terms(items)[:5]
results.append({
"start_time": bucket_start,
"message_count": len(items),
"user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}),
"top_terms": top_terms,
"burst_terms": burst_terms,
"sample_messages": cls._pick_bucket_samples(items, limit=6),
})
return results
@staticmethod
def _pick_bucket_samples(items: List[Dict[str, Any]], limit: int = 6) -> List[Dict[str, str]]:
if not items:
return []
indexes = sorted({0, len(items) // 3, len(items) // 2, (len(items) * 2) // 3, len(items) - 1})
selected = []
seen = set()
for idx in indexes:
item = items[idx]
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= limit:
break
return selected
@classmethod
def _pick_representative_messages(cls, messages: List[Dict[str, Any]], buckets: List[Dict[str, Any]]) -> List[Dict[str, str]]:
selected: List[Dict[str, str]] = []
seen = set()
for bucket in sorted(buckets, key=lambda item: item.get("message_count", 0), reverse=True)[:6]:
for sample in bucket.get("sample_messages", []):
content = str(sample.get("content") or "").strip()
if not content or content in seen:
continue
selected.append(sample)
seen.add(content)
if len(selected) >= 18:
return selected
for item in messages:
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= 18:
break
return selected
@classmethod
def _build_raw_window_samples(
cls,
peak_buckets: List[Dict[str, Any]],
per_bucket_limit: int = 8,
) -> List[Dict[str, Any]]:
windows: List[Dict[str, Any]] = []
for bucket in peak_buckets:
samples = []
for sample in bucket.get("sample_messages", [])[:per_bucket_limit]:
content = str(sample.get("content") or "").strip()
if not content:
continue
samples.append({
"time": str(sample.get("time") or ""),
"nickname": str(sample.get("nickname") or ""),
"content": content,
})
if not samples:
continue
windows.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"samples": samples,
})
return windows
@staticmethod
def _simplify_peak_buckets(buckets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
simplified = []
for bucket in buckets:
simplified.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"top_terms": bucket.get("top_terms", [])[:6],
"burst_terms": bucket.get("burst_terms", [])[:5],
})
return simplified
@staticmethod
def _looks_like_pure_punctuation(content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
return re.fullmatch(r"[\W_]+", text, re.UNICODE) is not None

View File

@@ -1,7 +1,9 @@
import asyncio
import json
from datetime import datetime
from collections import Counter
from datetime import datetime, timedelta
import os
from pathlib import Path
import threading
import time
from typing import Dict, Any, List, Optional, Tuple, Set
@@ -19,9 +21,13 @@ except ImportError:
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from plugins.ai_auto_response.llm_client import LLMClient
from plugins.douyu.danmu_summary import DouyuDanmuSummaryHelper
from plugins.douyu.report_template import render_daily_report_html
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML
@@ -316,6 +322,61 @@ class DouyuRedisManager:
key = f"{self.prefix}room_status:{room_id}"
return self.redis.set(key, json.dumps(status, ensure_ascii=False))
def get_room_session(self, room_id: str, session_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room:{room_id}:session:{session_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def save_room_session(self, room_id: str, session: Dict[str, Any]) -> bool:
session_id = str(session.get("session_id") or "").strip()
if not session_id:
return False
payload = json.dumps(session, ensure_ascii=False)
session_key = f"{self.prefix}room:{room_id}:session:{session_id}"
latest_key = f"{self.prefix}room:{room_id}:latest_session"
index_key = f"{self.prefix}room:{room_id}:session_ids"
pipe = self.redis.pipeline()
pipe.set(session_key, payload)
pipe.set(latest_key, session_id)
pipe.lrem(index_key, 0, session_id)
pipe.lpush(index_key, session_id)
pipe.ltrim(index_key, 0, 29)
result = pipe.execute()
return bool(result)
def get_latest_room_session(self, room_id: str) -> Optional[Dict[str, Any]]:
latest_key = f"{self.prefix}room:{room_id}:latest_session"
session_id = self.redis.get(latest_key)
if not session_id:
return None
if isinstance(session_id, bytes):
session_id = session_id.decode("utf-8")
return self.get_room_session(room_id, str(session_id))
def list_room_session_ids(self, room_id: str, limit: int = 10) -> List[str]:
key = f"{self.prefix}room:{room_id}:session_ids"
rows = self.redis.lrange(key, 0, max(limit - 1, 0)) or []
result = []
for row in rows:
result.append(row.decode("utf-8") if isinstance(row, bytes) else str(row))
return result
def get_text_value(self, key: str) -> Optional[str]:
data = self.redis.get(key)
if not data:
return None
return data.decode("utf-8") if isinstance(data, bytes) else str(data)
def set_text_value(self, key: str, value: str) -> bool:
return bool(self.redis.set(key, value))
class DouyuPlugin(MessagePluginInterface):
FEATURE_KEY = "DOUYU_MONITOR"
@@ -364,18 +425,44 @@ class DouyuPlugin(MessagePluginInterface):
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
self._session_cutoff_hour = 6
self._merge_gap_hours = 4
self._daily_report_enable = True
self._daily_report_time = "10:05"
self._daily_report_min_messages = 120
self._daily_report_use_llm = False
self._daily_report_max_sessions = 4
self._daily_report_max_length = 1800
self._daily_report_send_image = True
self._daily_report_llm_client: Optional[LLMClient] = None
self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {}
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
async_job.every_minutes(5)(self._scheduled_daily_report_tick)
async def _scheduled_unified_check_job(self):
"""统一检查直播和鱼吧动态"""
await self._scheduled_check_job()
await self._scheduled_yuba_check_job()
async def _scheduled_daily_report_tick(self):
"""每 5 分钟检查一次,命中配置时间后发送前一天日报。"""
if not self._daily_report_enable or not self.redis_manager or not self.bot:
return
now_dt = datetime.now()
if not self._should_run_daily_report(now_dt):
return
anchor_day = (now_dt - timedelta(days=1)).strftime("%Y-%m-%d")
try:
await self._send_daily_reports(anchor_day)
self.redis_manager.set_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")), now_dt.strftime("%Y-%m-%d %H:%M:%S"))
except Exception as e:
logger.error(f"斗鱼每日报告任务失败(anchor_day={anchor_day}): {e}")
def initialize(self, context: Dict[str, Any]) -> bool:
try:
dbm = DBConnectionManager.get_instance()
self.redis_manager = DouyuRedisManager(dbm)
self.bot = context.get("bot", self.bot)
cfg = self._config.get("Douyu", {})
cfg_cmds = cfg.get("command", [])
if isinstance(cfg_cmds, list) and cfg_cmds:
@@ -383,6 +470,21 @@ class DouyuPlugin(MessagePluginInterface):
self._api_template = cfg.get("api_url_template", self._api_template)
self._user_agent = cfg.get("user_agent", self._user_agent)
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
self._session_cutoff_hour = int(cfg.get("session_cutoff_hour", self._session_cutoff_hour))
self._merge_gap_hours = int(cfg.get("merge_gap_hours", self._merge_gap_hours))
self._daily_report_enable = bool(cfg.get("daily_report_enable", self._daily_report_enable))
self._daily_report_time = str(cfg.get("daily_report_time", self._daily_report_time) or self._daily_report_time)
self._daily_report_min_messages = int(
cfg.get("daily_report_min_messages", self._daily_report_min_messages)
)
self._daily_report_use_llm = bool(cfg.get("daily_report_use_llm", self._daily_report_use_llm))
self._daily_report_max_sessions = int(cfg.get("daily_report_max_sessions", self._daily_report_max_sessions))
self._daily_report_max_length = int(cfg.get("daily_report_max_length", self._daily_report_max_length))
self._daily_report_send_image = bool(cfg.get("daily_report_send_image", self._daily_report_send_image))
report_api_cfg = cfg.get("report_api", {}) or {}
if report_api_cfg:
self._daily_report_llm_client = LLMClient(report_api_cfg)
return True
except Exception as e:
logger.error(f"{self.name} 初始化失败: {e}")
@@ -544,6 +646,12 @@ class DouyuPlugin(MessagePluginInterface):
continue
if prev_live is True and curr_live is True and room_id not in self._danmu_recorders:
try:
session = self._open_or_resume_session(room_id, nickname, room_name)
if session:
logger.info(
f"检测到持续直播状态,续接斗鱼直播会话({room_id}): "
f"session={session.get('session_id')}"
)
logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})")
self._start_danmu_record(room_id)
except Exception as e:
@@ -573,6 +681,12 @@ class DouyuPlugin(MessagePluginInterface):
logger.error(f"发送斗鱼开播提醒失败: {e}")
continue
try:
session = self._open_or_resume_session(room_id, nickname, room_name)
if session:
logger.info(
f"斗鱼直播会话开启/续接: room={room_id}, session={session.get('session_id')}, "
f"segments={len(session.get('segments', []))}, anchor_day={session.get('anchor_day')}"
)
logger.info(f"启动斗鱼弹幕记录({room_id})")
self._start_danmu_record(room_id)
except Exception as e:
@@ -591,6 +705,12 @@ class DouyuPlugin(MessagePluginInterface):
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue
try:
session = self._close_active_session(room_id, nickname, room_name)
if session:
logger.info(
f"斗鱼直播会话关闭片段: room={room_id}, session={session.get('session_id')}, "
f"segments={len(session.get('segments', []))}, is_live={session.get('is_live')}"
)
logger.info(f"停止斗鱼弹幕记录({room_id})")
self._stop_danmu_record(room_id)
except Exception as e:
@@ -685,6 +805,655 @@ class DouyuPlugin(MessagePluginInterface):
self._danmu_recorders[room_id] = recorder
return recorder
def _resolve_anchor_day(self, target_dt: datetime) -> str:
if target_dt.hour < self._session_cutoff_hour:
target_dt = target_dt - timedelta(days=1)
return target_dt.strftime("%Y-%m-%d")
@staticmethod
def _parse_session_time(value: str) -> Optional[datetime]:
if not value:
return None
try:
return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S")
except Exception:
return None
@staticmethod
def _find_open_segment(session: Dict[str, Any]) -> Optional[Dict[str, Any]]:
for segment in reversed(session.get("segments", []) or []):
if not str(segment.get("end_time") or "").strip():
return segment
return None
def _should_merge_with_latest_session(self, latest_session: Optional[Dict[str, Any]], now_dt: datetime) -> bool:
if not latest_session:
return False
if latest_session.get("is_live"):
return True
segments = latest_session.get("segments", []) or []
if not segments:
return False
last_segment = segments[-1]
end_dt = self._parse_session_time(last_segment.get("end_time", ""))
if not end_dt:
return False
gap_seconds = (now_dt - end_dt).total_seconds()
return 0 <= gap_seconds <= self._merge_gap_hours * 3600
def _open_or_resume_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]:
if not self.redis_manager:
return None
now_dt = datetime.now()
now_str = now_dt.strftime("%Y-%m-%d %H:%M:%S")
latest_session = self.redis_manager.get_latest_room_session(room_id) or {}
if self._should_merge_with_latest_session(latest_session, now_dt):
session = dict(latest_session)
open_segment = self._find_open_segment(session)
if not open_segment:
segments = list(session.get("segments", []) or [])
segments.append({"start_time": now_str, "end_time": ""})
session["segments"] = segments
else:
anchor_day = self._resolve_anchor_day(now_dt)
session = {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_{now_dt.strftime('%H%M%S')}",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": nickname,
"room_name": room_name,
"segments": [{"start_time": now_str, "end_time": ""}],
"is_live": True,
"summary_status": "pending",
"summary_generated_at": "",
"created_at": now_str,
}
session["nickname"] = nickname or session.get("nickname", "")
session["room_name"] = room_name or session.get("room_name", "")
session["is_live"] = True
session["updated_at"] = now_str
session["last_live_at"] = now_str
self.redis_manager.save_room_session(room_id, session)
return session
def _close_active_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]:
if not self.redis_manager:
return None
session = self.redis_manager.get_latest_room_session(room_id)
if not session:
return None
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
open_segment = self._find_open_segment(session)
if open_segment:
open_segment["end_time"] = now_str
session["nickname"] = nickname or session.get("nickname", "")
session["room_name"] = room_name or session.get("room_name", "")
session["is_live"] = False
session["updated_at"] = now_str
session["last_offline_at"] = now_str
self.redis_manager.save_room_session(room_id, session)
return session
def get_room_session(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
if not self.redis_manager or not room_id:
return None
if session_id:
return self.redis_manager.get_room_session(room_id, session_id)
return self.redis_manager.get_latest_room_session(room_id)
def build_session_danmu_material(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
旁路能力:从已有日文件中按直播 session 抽取有效弹幕,并压缩成可供后续总结使用的材料。
当前不影响提醒、采集、群消息发送主流程。
"""
session = self.get_room_session(room_id, session_id=session_id)
if not session:
return None
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
material = DouyuDanmuSummaryHelper.build_summary_material(room_id, session, messages)
material["session"] = {
"session_id": session.get("session_id", ""),
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"is_live": bool(session.get("is_live")),
}
return material
def build_session_llm_payload(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
旁路能力:构造可直接发送给 LLM 的弹幕总结载荷。
不改变现有弹幕采集和通知主流程。
"""
session = self.get_room_session(room_id, session_id=session_id)
if not session:
return None
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
return DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages)
def _daily_report_job_key(self, day_key: str) -> str:
return f"{self.redis_manager.prefix}daily_report_job:{day_key}"
def _daily_report_room_key(self, room_id: str, anchor_day: str) -> str:
return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}"
def _should_run_daily_report(self, now_dt: datetime) -> bool:
time_text = str(self._daily_report_time or "").strip()
try:
target_hour, target_minute = [int(part) for part in time_text.split(":", 1)]
except Exception:
return False
target_dt = now_dt.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
if now_dt < target_dt or now_dt > target_dt + timedelta(minutes=4, seconds=59):
return False
last_run = self.redis_manager.get_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")))
return not last_run
def _load_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]:
if not self.redis_manager:
return []
sessions = []
for session_id in self.redis_manager.list_room_session_ids(room_id, limit=30):
session = self.redis_manager.get_room_session(room_id, session_id)
if not session:
continue
if str(session.get("anchor_day") or "") != anchor_day:
continue
sessions.append(session)
sessions.sort(
key=lambda item: str(((item.get("segments") or [{}])[0]).get("start_time", "")),
)
return sessions[:self._daily_report_max_sessions]
def _build_daily_report_payload(self, room_id: str, anchor_day: str, sessions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not sessions:
return None
session_payloads: List[Dict[str, Any]] = []
total_message_count = 0
total_noise_filtered_count = 0
total_organized_message_count = 0
total_unique_users: Set[str] = set()
merged_templates: List[Dict[str, Any]] = []
repeated_messages: List[Dict[str, Any]] = []
peak_buckets: List[Dict[str, Any]] = []
representative_messages: List[Dict[str, Any]] = []
raw_window_samples: List[Dict[str, Any]] = []
top_terms_counter = Counter()
burst_terms_counter = Counter()
operator_totals = {
"fans_badge_user_count": 0,
"fans_badge_message_count": 0,
"high_room_level_user_count": 0,
"high_fans_level_user_count": 0,
"noble_user_count": 0,
"noble_message_count": 0,
"active_users_5plus": 0,
"active_users_10plus": 0,
}
top_badge_counter = Counter()
top_badge_message_counter = Counter()
nickname = ""
room_name = ""
for session in sessions:
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
if len(messages) < self._daily_report_min_messages:
continue
payload = DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages)
session_payloads.append(payload)
meta = payload.get("session_meta", {}) or {}
operator_metrics = payload.get("operator_metrics", {}) or {}
total_message_count += int(meta.get("message_count", 0) or 0)
total_noise_filtered_count += int(meta.get("noise_filtered_count", 0) or 0)
total_organized_message_count += int(meta.get("organized_message_count", 0) or 0)
nickname = nickname or str(meta.get("nickname") or session.get("nickname") or "")
room_name = room_name or str(meta.get("room_name") or session.get("room_name") or "")
for key in operator_totals:
operator_totals[key] += int(operator_metrics.get(key, 0) or 0)
for item in payload.get("merged_templates", []) or []:
merged_templates.append(dict(item))
for item in payload.get("repeated_messages", []) or []:
repeated_messages.append(dict(item))
for item in payload.get("peak_buckets", []) or []:
peak_buckets.append(dict(item))
for item in payload.get("representative_messages", []) or []:
representative_messages.append(dict(item))
for item in payload.get("raw_window_samples", []) or []:
raw_window_samples.append(dict(item))
for item in payload.get("top_terms", []) or []:
term = str(item.get("term") or "").strip()
if term:
top_terms_counter[term] += int(item.get("count", 0) or 0)
for item in payload.get("burst_terms", []) or []:
term = str(item.get("text") or "").strip()
if term:
burst_terms_counter[term] += int(item.get("count", 0) or 0)
for item in operator_metrics.get("top_badges", []) or []:
badge_name = str(item.get("badge_name") or "").strip()
if badge_name:
top_badge_counter[badge_name] += int(item.get("user_count", 0) or 0)
top_badge_message_counter[badge_name] += int(item.get("message_count", 0) or 0)
for session_message in messages:
uid = str(session_message.get("uid") or "").strip()
if uid:
total_unique_users.add(uid)
if not session_payloads:
return None
merged_templates.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True)
repeated_messages.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True)
peak_buckets.sort(key=lambda item: int(item.get("message_count", 0) or 0), reverse=True)
artifact_dir = os.path.join("temp", "douyu_materials")
os.makedirs(artifact_dir, exist_ok=True)
payload = {
"report_meta": {
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": nickname,
"room_name": room_name,
"session_count": len(session_payloads),
"message_count": total_message_count,
"noise_filtered_count": total_noise_filtered_count,
"organized_message_count": total_organized_message_count,
"unique_user_count": len(total_unique_users),
},
"operator_metrics": {
**operator_totals,
"fans_badge_user_ratio": round(operator_totals["fans_badge_user_count"] / max(len(total_unique_users), 1), 4),
"top_badges": [
{
"badge_name": badge_name,
"user_count": user_count,
"message_count": int(top_badge_message_counter.get(badge_name, 0) or 0),
}
for badge_name, user_count in top_badge_counter.most_common(10)
],
},
"sessions": [
{
"session_id": (item.get("session_meta", {}) or {}).get("session_id", ""),
"segments": (item.get("session_meta", {}) or {}).get("segments", []),
"message_count": (item.get("session_meta", {}) or {}).get("message_count", 0),
"organized_message_count": (item.get("session_meta", {}) or {}).get("organized_message_count", 0),
}
for item in session_payloads
],
"merged_templates": merged_templates[:24],
"repeated_messages": repeated_messages[:24],
"top_terms": [{"term": term, "count": count} for term, count in top_terms_counter.most_common(24)],
"burst_terms": [{"text": term, "count": count} for term, count in burst_terms_counter.most_common(16)],
"peak_buckets": peak_buckets[:10],
"representative_messages": representative_messages[:24],
"raw_window_samples": raw_window_samples[:10],
}
artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json")
with open(artifact_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
return payload
def _build_daily_report_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
meta = payload.get("report_meta", {}) or {}
system_prompt = (
"你是斗鱼直播日报助手。请基于给定的结构化弹幕材料,输出一份适合发群的中文日报。"
"要求简洁、自然、信息密度高,不要编造,不要使用代码块。"
)
user_prompt = (
"请输出一份斗鱼每日报告,格式要求:\n"
"1. 第一行写标题,包含主播名和日期。\n"
"2. 用 3-5 条概括直播主线、弹幕情绪、观众关注点。\n"
"3. 单独补充运营视角观察,比如带牌活跃用户、高等级用户、核心发言用户、活跃牌子分布。\n"
"4. 单独列出高频梗/复读内容(不超过 5 条)。\n"
"5. 单独列出 2-3 个热点时段。\n"
"6. 整体控制在 600 字以内。\n\n"
f"材料如下:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
return system_prompt, user_prompt
def _build_danmu_summary_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
meta = payload.get("report_meta", {}) or {}
system_prompt = (
"你是直播弹幕总结助手。请只根据给定材料,总结这场直播的弹幕内容与氛围。"
"不要输出运营数据,不要编造,不要写空话套话。"
)
user_prompt = (
"请输出一段适合放在日报图片上半部分的弹幕总结,要求:\n"
"1. 先用 1 段总述直播氛围与主线。\n"
"2. 再用 3-5 条要点总结观众关注点、情绪变化、反复出现的梗。\n"
"3. 语言像运营复盘,简洁自然。\n"
"4. 不要写标题,不要写“根据数据”。\n\n"
f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n"
f"日期:{meta.get('anchor_day', '')}\n"
f"材料:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
return system_prompt, user_prompt
def _build_fallback_daily_report(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
lines = [
f"斗鱼每日报告 | {title_name} | {meta.get('anchor_day', '')}",
f"{meta.get('session_count', 0)} 场,弹幕 {meta.get('message_count', 0)} 条,参与用户 {meta.get('unique_user_count', 0)} 人。",
]
operator_metrics = payload.get("operator_metrics", {}) or {}
sessions = payload.get("sessions", []) or []
if sessions:
session_parts = []
for item in sessions[:4]:
segments = item.get("segments", []) or []
if not segments:
continue
start_time = str(segments[0].get("start_time", ""))[-8:-3]
end_time = str(segments[-1].get("end_time", ""))[-8:-3]
session_parts.append(f"{start_time}-{end_time}")
if session_parts:
lines.append("场次时间:" + " / ".join(session_parts))
top_terms = payload.get("top_terms", []) or []
if top_terms:
lines.append("关注焦点:" + "".join([str(item.get("term") or "") for item in top_terms[:8] if str(item.get("term") or "").strip()]))
if operator_metrics:
op_parts = []
fans_badge_user_count = int(operator_metrics.get("fans_badge_user_count", 0) or 0)
high_room_level_user_count = int(operator_metrics.get("high_room_level_user_count", 0) or 0)
high_fans_level_user_count = int(operator_metrics.get("high_fans_level_user_count", 0) or 0)
active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0)
if fans_badge_user_count:
op_parts.append(f"带牌活跃用户 {fans_badge_user_count}")
if high_room_level_user_count:
op_parts.append(f"30级+活跃用户 {high_room_level_user_count}")
if high_fans_level_user_count:
op_parts.append(f"10级+粉丝牌用户 {high_fans_level_user_count}")
if active_users_10plus:
op_parts.append(f"高活跃核心用户 {active_users_10plus}")
if op_parts:
lines.append("运营侧:" + "".join(op_parts))
top_badges = operator_metrics.get("top_badges", []) or []
if top_badges:
lines.append("活跃粉丝牌:")
for item in top_badges[:5]:
badge_name = str(item.get("badge_name") or "").strip()
user_count = int(item.get("user_count", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
if badge_name:
lines.append(f"- {badge_name}{user_count}人,{message_count}")
merged_templates = payload.get("merged_templates", []) or []
if merged_templates:
lines.append("高频梗:")
for item in merged_templates[:5]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
lines.append(f"- {text[:42]}{count}次)")
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
lines.append("热点时段:")
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
message_count = int(item.get("message_count", 0) or 0)
terms = "".join(
[str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(f"- {start_time}{message_count}条,关键词:{terms}")
representative_messages = payload.get("representative_messages", []) or []
if representative_messages:
lines.append("代表弹幕:")
for item in representative_messages[:4]:
nickname = str(item.get("nickname") or "").strip()
content = str(item.get("content") or "").strip()
if content:
lines.append(f"- {nickname}{content[:60]}")
text = "\n".join(lines).strip()
if len(text) > self._daily_report_max_length:
text = text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)"
return text
def _build_fallback_danmu_summary(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or [])[:6] if str(item.get("term") or "").strip()]
merged_templates = payload.get("merged_templates", []) or []
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
lines = [
f"{meta.get('anchor_day', '')} 这场直播弹幕整体比较密集,讨论重心主要围绕 {''.join(top_terms[:4]) or '对局过程'} 展开,观众互动意愿较强,梗和复读内容持续出现。"
]
if merged_templates:
sample_templates = "".join(
[str(item.get("text") or "").strip()[:26] for item in merged_templates[:3] if str(item.get("text") or "").strip()]
)
if sample_templates:
lines.append(f"- 主线观察:直播间共识梗很强,重复刷屏内容主要集中在 {sample_templates}")
for item in merged_templates[:4]:
break
if peak_buckets:
top_bucket = peak_buckets[0]
terms = "".join(
[str(term.get("term") or "") for term in (top_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- 节奏变化:高峰集中在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,单时段弹幕 {int(top_bucket.get('message_count', 0) or 0)} 条,关键词偏向 {terms}"
)
if len(peak_buckets) > 1:
second_bucket = peak_buckets[1]
second_terms = "".join(
[str(term.get("term") or "") for term in (second_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- 热点补充:{str(second_bucket.get('start_time') or '')[-8:-3]} 也出现明显抬升,弹幕讨论继续围绕 {second_terms} 展开。"
)
if representative_messages:
lines.append("- 情绪特点:代表性发言里既有对操作和决策的即时反馈,也有大量玩梗、调侃和情绪宣泄。")
if top_terms:
lines.append(f"- 关注焦点:高频词主要落在 {''.join(top_terms[:6])},说明观众注意力相对集中。")
return "\n".join(lines).strip()
def _build_operator_summary_text(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
operator_metrics = payload.get("operator_metrics", {}) or {}
total_users = int(meta.get("unique_user_count", 0) or 0)
fans_badge_users = int(operator_metrics.get("fans_badge_user_count", 0) or 0)
high_room_users = int(operator_metrics.get("high_room_level_user_count", 0) or 0)
high_fans_users = int(operator_metrics.get("high_fans_level_user_count", 0) or 0)
active_users_5plus = int(operator_metrics.get("active_users_5plus", 0) or 0)
active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0)
fans_badge_ratio = float(operator_metrics.get("fans_badge_user_ratio", 0) or 0)
lines = [
f"- 活跃用户规模:{total_users} 人,其中发言 5 次以上 {active_users_5plus}10 次以上 {active_users_10plus} 人。",
f"- 粉丝粘性:带粉丝牌活跃用户 {fans_badge_users} 人,占活跃用户 {fans_badge_ratio * 100:.1f}%10 级以上粉丝牌用户 {high_fans_users} 人。",
f"- 用户质量:房间等级 30 级以上活跃用户 {high_room_users} 人,说明高等级老观众参与度不低。",
]
top_badges = payload.get("operator_metrics", {}).get("top_badges", []) or []
if top_badges:
badge_parts = []
for item in top_badges[:5]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
badge_parts.append(f"{badge_name} {int(item.get('user_count', 0) or 0)}人/{int(item.get('message_count', 0) or 0)}")
if badge_parts:
lines.append(f"- 活跃牌子分布:{''.join(badge_parts)}")
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
if top_active_users:
core_parts = []
for item in top_active_users[:5]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
msg_count = int(item.get("message_count", 0) or 0)
fans_name = str(item.get("fans_name") or "").strip()
if fans_name:
core_parts.append(f"{nickname}{fans_name}{msg_count}条)")
else:
core_parts.append(f"{nickname}{msg_count}条)")
if core_parts:
lines.append(f"- 核心发言用户:{''.join(core_parts)}")
return "\n".join(lines).strip()
def _build_operator_summary_lines(self, payload: Dict[str, Any]) -> List[str]:
return [line.strip()[2:].strip() for line in self._build_operator_summary_text(payload).splitlines() if line.strip().startswith("- ")]
async def _generate_danmu_summary_text(self, payload: Dict[str, Any]) -> str:
if self._daily_report_use_llm and self._daily_report_llm_client:
system_prompt, user_prompt = self._build_danmu_summary_prompt(payload)
result = await asyncio.to_thread(
self._daily_report_llm_client.chat,
system_prompt,
user_prompt,
f"douyu_danmu_summary_{(payload.get('report_meta', {}) or {}).get('room_id', '')}",
)
if result:
return result.strip()
logger.warning(
f"斗鱼弹幕总结 LLM 生成失败: model={self._daily_report_llm_client.model}, "
f"last_error={self._daily_report_llm_client.last_error}"
)
return self._build_fallback_danmu_summary(payload)
async def _build_daily_report_markdown(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
danmu_summary = await self._generate_danmu_summary_text(payload)
operator_summary = self._build_operator_summary_text(payload)
lines = [
f"# {title_name} 直播每日报告",
f"{meta.get('anchor_day', '')}|场次 {meta.get('session_count', 0)}|弹幕 {meta.get('message_count', 0)}|活跃用户 {meta.get('unique_user_count', 0)}",
"",
"## 弹幕总结",
danmu_summary,
"",
"## 运营数据总结",
operator_summary,
]
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
lines.extend([
"",
"## 热点时段",
])
for item in peak_buckets[:3]:
terms = "".join(
[str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- `{str(item.get('start_time') or '')[-8:-3]}` 弹幕 {int(item.get('message_count', 0) or 0)} 条,关键词:{terms}"
)
merged_templates = payload.get("merged_templates", []) or []
if merged_templates:
lines.extend([
"",
"## 高频梗",
])
for item in merged_templates[:5]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
lines.append(f"- {text[:72]}{count}次)")
return "\n".join(lines).strip()
async def _render_daily_report_image(self, payload: Dict[str, Any]) -> Optional[str]:
markdown = await self._build_daily_report_markdown(payload)
room_id = str((payload.get("report_meta", {}) or {}).get("room_id", "") or "room")
anchor_day = str((payload.get("report_meta", {}) or {}).get("anchor_day", "") or "").replace("-", "")
filename = f"douyu_daily_report_{room_id}_{anchor_day}.png"
try:
danmu_summary = await self._generate_danmu_summary_text(payload)
html_content = render_daily_report_html(
payload=payload,
danmu_summary=danmu_summary,
operator_summary_lines=self._build_operator_summary_lines(payload),
)
output_dir = os.path.join(os.getcwd(), "temp", "md2image")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
await html_to_image(html_content, output_path)
return str(Path(output_path).resolve())
except Exception as e:
logger.error(f"斗鱼专用模板图片生成失败(room={room_id}, day={anchor_day}): {e}")
try:
return await convert_md_str_to_image(markdown, filename)
except Exception as e:
logger.error(f"斗鱼每日报告图片生成失败(room={room_id}, day={anchor_day}): {e}")
return None
async def _generate_daily_report_text(self, payload: Dict[str, Any]) -> str:
if self._daily_report_use_llm and self._daily_report_llm_client:
system_prompt, user_prompt = self._build_daily_report_prompt(payload)
result = await asyncio.to_thread(
self._daily_report_llm_client.chat,
system_prompt,
user_prompt,
f"douyu_daily_report_{(payload.get('report_meta', {}) or {}).get('room_id', '')}",
)
if result:
text = result.strip()
if len(text) > self._daily_report_max_length:
return text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)"
return text
logger.warning(
f"斗鱼每日报告 LLM 生成失败: model={self._daily_report_llm_client.model}, "
f"last_error={self._daily_report_llm_client.last_error}"
)
return self._build_fallback_daily_report(payload)
async def _send_daily_reports(self, anchor_day: str):
rooms = self.redis_manager.all_subscribed_rooms()
for room_id in rooms:
if self.redis_manager.get_text_value(self._daily_report_room_key(room_id, anchor_day)):
continue
sessions = self._load_sessions_for_anchor_day(room_id, anchor_day)
if not sessions:
continue
if any(bool(session.get("is_live")) for session in sessions):
continue
payload = self._build_daily_report_payload(room_id, anchor_day, sessions)
if not payload:
continue
report_text = await self._generate_daily_report_text(payload)
report_image = None
if self._daily_report_send_image:
report_image = await self._render_daily_report_image(payload)
groups = self.redis_manager.groups_for_room(room_id)
delivered = False
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) != PermissionStatus.ENABLED:
continue
try:
if report_image:
await self.bot.send_image_message(gid, Path(report_image))
else:
await self.bot.send_text_message(gid, report_text)
delivered = True
except Exception as e:
logger.error(f"发送斗鱼每日报告失败(room={room_id}, group={gid}): {e}")
if delivered:
self.redis_manager.set_text_value(
self._daily_report_room_key(room_id, anchor_day),
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
def _start_danmu_record(self, room_id: str):
recorder = self._get_danmu_recorder(room_id)
recorder.start()

View File

@@ -0,0 +1,485 @@
# -*- coding: utf-8 -*-
import html
from typing import Any, Dict, List
def _escape(value: Any) -> str:
return html.escape(str(value or ""))
def _render_metric_card(label: str, value: Any, hint: str = "") -> str:
return (
'<div class="metric-card">'
f'<div class="metric-label">{_escape(label)}</div>'
f'<div class="metric-value">{_escape(value)}</div>'
f'<div class="metric-hint">{_escape(hint)}</div>'
"</div>"
)
def _render_list(items: List[str], item_class: str = "bullet-list") -> str:
if not items:
return ""
lis = "".join(f'<li>{_escape(item)}</li>' for item in items if str(item or "").strip())
return f'<ul class="{item_class}">{lis}</ul>' if lis else ""
def _split_summary_blocks(danmu_summary: str) -> tuple[str, List[str]]:
lead_parts = []
insight_items = []
for line in str(danmu_summary or "").splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("- "):
insight_items.append(stripped[2:].strip())
else:
lead_parts.append(stripped)
lead = " ".join(lead_parts).strip()
return lead, insight_items
def _render_insight_cards(items: List[str]) -> str:
labels = ["主线", "情绪", "梗点", "节奏", "反馈", "补充"]
blocks = []
for idx, item in enumerate(items[:6]):
blocks.append(
'<div class="insight-card">'
f'<div class="insight-kicker">{_escape(labels[idx] if idx < len(labels) else "观察")}</div>'
f'<div class="insight-text">{_escape(item)}</div>'
"</div>"
)
return "".join(blocks)
def _render_badges(top_badges: List[Dict[str, Any]]) -> str:
blocks = []
for item in top_badges[:6]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
blocks.append(
'<div class="badge-chip">'
f'<span class="badge-name">{_escape(badge_name)}</span>'
f'<span class="badge-meta">{_escape(item.get("user_count", 0))}人 / {_escape(item.get("message_count", 0))}条</span>'
"</div>"
)
return "".join(blocks)
def _render_hot_times(peak_buckets: List[Dict[str, Any]]) -> str:
blocks = []
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
terms = [str(term.get("term") or "").strip() for term in (item.get("top_terms", []) or [])[:4]]
terms = [term for term in terms if term]
blocks.append(
'<div class="hot-card">'
f'<div class="hot-time">{_escape(start_time)}</div>'
f'<div class="hot-count">{_escape(item.get("message_count", 0))} 条弹幕</div>'
f'<div class="hot-terms">{_escape(" / ".join(terms))}</div>'
"</div>"
)
return "".join(blocks)
def render_daily_report_html(
payload: Dict[str, Any],
danmu_summary: str,
operator_summary_lines: List[str],
) -> str:
meta = payload.get("report_meta", {}) or {}
operator = payload.get("operator_metrics", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
subtitle = (
f"{meta.get('anchor_day', '')} | 场次 {meta.get('session_count', 0)}"
f" | 弹幕 {meta.get('message_count', 0)} | 活跃用户 {meta.get('unique_user_count', 0)}"
)
metrics_html = "".join([
_render_metric_card("活跃用户", meta.get("unique_user_count", 0), "当天参与弹幕的去重人数"),
_render_metric_card("带牌活跃", operator.get("fans_badge_user_count", 0), "带粉丝牌发言用户"),
_render_metric_card("10+粉丝牌", operator.get("high_fans_level_user_count", 0), "高粘性活跃用户"),
_render_metric_card("30+等级用户", operator.get("high_room_level_user_count", 0), "高等级老观众"),
])
merged_templates = payload.get("merged_templates", []) or []
template_items = [
f"{str(item.get('text') or '').strip()[:72]}{int(item.get('count', 0) or 0)}次)"
for item in merged_templates[:5]
if str(item.get("text") or "").strip()
]
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
active_user_items = []
for item in top_active_users[:10]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
fans_name = str(item.get("fans_name") or "").strip()
message_count = int(item.get("message_count", 0) or 0)
if fans_name:
active_user_items.append(f"{nickname} | {fans_name} | {message_count}")
else:
active_user_items.append(f"{nickname} | {message_count}")
lead_summary, danmu_bullets = _split_summary_blocks(danmu_summary)
html_doc = f"""<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
:root {{
--bg-top: #f3efe5;
--bg-bottom: #e6edf5;
--paper: rgba(255, 252, 247, 0.97);
--text: #1f2937;
--muted: #6b7280;
--line: rgba(137, 148, 163, 0.18);
--navy: #14213d;
--blue: #2b59ff;
--cyan: #1fa8a0;
--gold: #c89b3c;
--gold-soft: rgba(200, 155, 60, 0.14);
--red-soft: rgba(210, 84, 61, 0.10);
--shadow: 0 26px 60px rgba(33, 52, 84, 0.14);
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
padding: 28px;
background:
radial-gradient(circle at 0% 0%, rgba(43, 89, 255, 0.08), transparent 24%),
radial-gradient(circle at 100% 0%, rgba(31, 168, 160, 0.10), transparent 20%),
linear-gradient(180deg, var(--bg-top) 0%, var(--bg-bottom) 100%);
font-family: 'Microsoft YaHei', 'PingFang SC', 'Segoe UI', sans-serif;
color: var(--text);
}}
.sheet {{
width: 920px;
margin: 0 auto;
background: var(--paper);
border-radius: 34px;
overflow: hidden;
box-shadow: var(--shadow);
border: 1px solid rgba(255,255,255,0.6);
}}
.hero {{
position: relative;
padding: 34px 40px 30px;
background:
radial-gradient(circle at 18% 18%, rgba(255,255,255,0.12), transparent 18%),
radial-gradient(circle at 84% 14%, rgba(255,255,255,0.11), transparent 19%),
linear-gradient(135deg, #111827 0%, #1d4ed8 46%, #0f766e 100%);
color: #fff;
}}
.hero::after {{
content: "";
position: absolute;
right: -52px;
top: -40px;
width: 230px;
height: 230px;
border-radius: 50%;
border: 1px solid rgba(255,255,255,0.14);
box-shadow: 0 0 0 28px rgba(255,255,255,0.04), 0 0 0 60px rgba(255,255,255,0.02);
}}
.eyebrow {{
display: inline-block;
padding: 7px 14px;
border-radius: 999px;
background: rgba(255,255,255,0.12);
border: 1px solid rgba(255,255,255,0.18);
font-size: 12px;
letter-spacing: .08em;
}}
.hero-title {{
margin: 18px 0 10px;
font-size: 44px;
font-weight: 800;
line-height: 1.16;
letter-spacing: -0.03em;
}}
.hero-subtitle {{
color: rgba(240, 246, 255, 0.84);
font-size: 16px;
}}
.content {{
padding: 28px 30px 34px;
}}
.metric-grid {{
display: grid;
grid-template-columns: repeat(4, minmax(0, 1fr));
gap: 14px;
margin-top: -34px;
position: relative;
z-index: 2;
}}
.metric-card {{
background: rgba(255,255,255,0.9);
backdrop-filter: blur(8px);
border: 1px solid rgba(255,255,255,0.8);
border-radius: 22px;
padding: 18px 18px 16px;
box-shadow: 0 10px 24px rgba(17, 24, 39, 0.08);
}}
.metric-label {{
color: var(--muted);
font-size: 13px;
margin-bottom: 8px;
}}
.metric-value {{
font-size: 32px;
font-weight: 800;
color: var(--navy);
line-height: 1;
}}
.metric-hint {{
color: #8090a7;
font-size: 12px;
margin-top: 8px;
}}
.section {{
margin-top: 20px;
padding: 24px;
border-radius: 26px;
border: 1px solid var(--line);
background: linear-gradient(180deg, rgba(255,255,255,0.95), rgba(248,250,252,0.92));
}}
.section.danmu {{
background:
linear-gradient(180deg, rgba(255,255,255,0.95), rgba(247, 249, 255, 0.94));
}}
.section.ops {{
background:
linear-gradient(180deg, rgba(255,251,244,0.96), rgba(255,255,255,0.95));
}}
.section-title {{
display: flex;
align-items: center;
gap: 10px;
font-size: 27px;
font-weight: 800;
margin-bottom: 16px;
color: var(--navy);
}}
.section-title .icon {{
width: 14px;
height: 30px;
border-radius: 999px;
background: linear-gradient(180deg, var(--blue), var(--cyan));
box-shadow: 0 6px 16px rgba(43,89,255,0.24);
}}
.section.ops .section-title .icon {{
background: linear-gradient(180deg, #d6a547, #f59e0b);
box-shadow: 0 6px 16px rgba(200,155,60,0.24);
}}
.summary-grid {{
display: grid;
grid-template-columns: minmax(0, 1.6fr) minmax(255px, 0.78fr);
gap: 18px;
}}
.prose p {{
margin: 0 0 12px;
color: #334155;
font-size: 17px;
line-height: 1.84;
}}
.lead-panel {{
padding: 18px 18px 16px;
border-radius: 20px;
background: linear-gradient(180deg, rgba(242,246,255,0.92), rgba(255,255,255,0.96));
border: 1px solid rgba(125, 145, 186, 0.14);
margin-bottom: 14px;
}}
.lead-title {{
color: #5e6d87;
font-size: 13px;
letter-spacing: .06em;
margin-bottom: 10px;
}}
.lead-text {{
color: #24364c;
font-size: 18px;
line-height: 1.9;
font-weight: 500;
}}
.insight-grid {{
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 12px;
}}
.insight-card {{
padding: 15px 16px;
border-radius: 18px;
background: rgba(255,255,255,0.9);
border: 1px solid rgba(125, 145, 186, 0.14);
min-height: 110px;
}}
.insight-kicker {{
color: #2b59ff;
font-size: 12px;
letter-spacing: .08em;
font-weight: 700;
margin-bottom: 8px;
}}
.insight-text {{
color: #334155;
font-size: 15px;
line-height: 1.76;
}}
.bullet-list {{
margin: 0;
padding-left: 22px;
}}
.bullet-list li {{
color: #334155;
margin: 10px 0;
line-height: 1.72;
font-size: 16px;
}}
.compact-user-list {{
margin: 0;
padding-left: 18px;
column-count: 2;
column-gap: 20px;
}}
.compact-user-list li {{
break-inside: avoid;
color: #475569;
margin: 8px 0;
line-height: 1.62;
font-size: 14px;
}}
.aside-card {{
padding: 18px;
border-radius: 20px;
background: rgba(245, 248, 255, 0.86);
border: 1px solid rgba(125, 145, 186, 0.16);
}}
.aside-card.warm {{
background: rgba(255, 248, 234, 0.82);
border: 1px solid rgba(200, 155, 60, 0.18);
}}
.aside-title {{
font-size: 14px;
letter-spacing: .06em;
color: #68758a;
margin-bottom: 12px;
}}
.badge-wall {{
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 10px;
}}
.badge-chip {{
padding: 10px 12px;
border-radius: 16px;
background: linear-gradient(180deg, rgba(255,255,255,0.92), rgba(242,246,255,0.92));
border: 1px solid rgba(129, 147, 181, 0.16);
min-width: 0;
}}
.badge-name {{
display: block;
font-weight: 700;
color: var(--navy);
margin-bottom: 4px;
}}
.badge-meta {{
display: block;
font-size: 12px;
color: #6b7280;
}}
.hot-grid {{
display: grid;
grid-template-columns: repeat(3, minmax(0, 1fr));
gap: 12px;
margin-top: 16px;
}}
.hot-card {{
padding: 16px;
border-radius: 18px;
background: linear-gradient(180deg, rgba(255,255,255,0.94), rgba(241,246,255,0.92));
border: 1px solid rgba(129,147,181,0.16);
}}
.hot-time {{
font-size: 22px;
font-weight: 800;
color: var(--blue);
margin-bottom: 6px;
}}
.hot-count {{
font-size: 14px;
color: #334155;
margin-bottom: 8px;
}}
.hot-terms {{
font-size: 13px;
color: #64748b;
line-height: 1.56;
}}
.footer-note {{
margin-top: 20px;
text-align: right;
color: #7b8798;
font-size: 12px;
letter-spacing: .04em;
}}
</style>
</head>
<body>
<div class="sheet">
<div class="hero">
<div class="eyebrow">DOUYU DAILY REPORT</div>
<div class="hero-title">{_escape(title_name)}</div>
<div class="hero-subtitle">{_escape(subtitle)}</div>
</div>
<div class="content">
<div class="metric-grid">
{metrics_html}
</div>
<div class="section danmu">
<div class="section-title"><span class="icon"></span><span>弹幕总结</span></div>
<div class="summary-grid">
<div class="prose">
<div class="lead-panel">
<div class="lead-title">整体观察</div>
<div class="lead-text">{_escape(lead_summary)}</div>
</div>
<div class="insight-grid">
{_render_insight_cards(danmu_bullets)}
</div>
</div>
<div class="aside-card">
<div class="aside-title">高频梗</div>
{_render_list(template_items)}
</div>
</div>
<div class="hot-grid">
{_render_hot_times(payload.get("peak_buckets", []) or [])}
</div>
</div>
<div class="section ops">
<div class="section-title"><span class="icon"></span><span>运营数据总结</span></div>
<div class="summary-grid">
<div class="prose">
{_render_list(operator_summary_lines)}
</div>
<div class="aside-card warm">
<div class="aside-title">活跃牌子</div>
<div class="badge-wall">{_render_badges(operator.get("top_badges", []) or [])}</div>
</div>
</div>
<div class="aside-card" style="margin-top: 16px;">
<div class="aside-title">核心发言用户</div>
{_render_list(active_user_items, "compact-user-list")}
</div>
</div>
<div class="footer-note">ABOT · Douyu Report Template</div>
</div>
</div>
</body>
</html>"""
return html_doc

View File

@@ -1,6 +1,5 @@
import subprocess
import time
import markdown
from pathlib import Path
import psutil
@@ -10,8 +9,69 @@ import asyncio
import re
from loguru import logger
try:
import markdown
except ImportError:
markdown = None
META_KEYWORDS = ["", "群名", "时间", "日期", "成员", "消息", "统计", "总结", "来源", "生成", "记录"]
def _simple_markdown_to_html(md_content: str) -> str:
lines = str(md_content or "").splitlines()
html_parts = []
in_ul = False
paragraph_lines = []
def flush_paragraph():
nonlocal paragraph_lines
if paragraph_lines:
text = " ".join(item.strip() for item in paragraph_lines if item.strip())
if text:
html_parts.append(f"<p>{text}</p>")
paragraph_lines = []
def close_ul():
nonlocal in_ul
if in_ul:
html_parts.append("</ul>")
in_ul = False
for raw_line in lines:
line = raw_line.rstrip()
stripped = line.strip()
if not stripped:
flush_paragraph()
close_ul()
continue
if stripped.startswith("# "):
flush_paragraph()
close_ul()
html_parts.append(f"<h1>{stripped[2:].strip()}</h1>")
continue
if stripped.startswith("## "):
flush_paragraph()
close_ul()
html_parts.append(f"<h2>{stripped[3:].strip()}</h2>")
continue
if stripped.startswith("### "):
flush_paragraph()
close_ul()
html_parts.append(f"<h3>{stripped[4:].strip()}</h3>")
continue
if stripped.startswith("- "):
flush_paragraph()
if not in_ul:
html_parts.append("<ul>")
in_ul = True
html_parts.append(f"<li>{stripped[2:].strip()}</li>")
continue
close_ul()
paragraph_lines.append(stripped)
flush_paragraph()
close_ul()
return "\n".join(html_parts)
async def safe_close_browser(browser, timeout: float = 4.0) -> None:
if not browser:
@@ -105,7 +165,10 @@ def _split_hero(html_body: str):
async def md_str_to_html_content(md_content):
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
if markdown is not None:
html_body = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
else:
html_body = _simple_markdown_to_html(md_content)
hero_title, hero_meta, remain_html, hero_enabled = _split_hero(html_body)
css = """