Files
abot/plugins/douyu/danmu_summary.py
2026-04-08 13:17:29 +08:00

840 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import re
from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
class DouyuDanmuSummaryHelper:
"""斗鱼弹幕场次抽取与压缩辅助器。"""
LINE_PATTERN = re.compile(
r"^\[(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]\s+"
r"(?P<nickname>.*?)\s+\(UID:\s*(?P<uid>[^,\)]+)(?P<profile>.*?)\)(?P<content>.*)$"
)
STOPWORDS: Set[str] = {
"哈哈", "哈哈哈", "hh", "hhh", "啊啊", "啊啊啊", "可以", "这个", "那个", "真的", "就是",
"一下", "主播", "兄弟", "老铁", "大家", "我们", "你们", "他们", "不是", "什么", "怎么",
"为啥", "有点", "感觉", "这里", "那里", "然后", "今天", "刚刚", "现在", "一个", "一下子",
}
SHORT_BURST_WORDS: Set[str] = {
"666", "6666", "", "牛逼", "", "", "", "", "", "", "", "", "",
"哈哈", "哈哈哈", "笑死", "卧槽", "wc", "awsl", "nb", "nbl", "c", "6",
}
NOISE_PATTERNS = [
re.compile(r"本条弹幕.*机器人", re.I),
re.compile(r"请不要.*统计机器人数", re.I),
re.compile(r"原来直播间有.*非机器人用户", re.I),
re.compile(r"如果您的直播内容是以观看他人操作为主", re.I),
]
TEMPLATE_HINT_PATTERNS = [
re.compile(r"闭目不语任由"),
re.compile(r"你就忍心一辈"),
re.compile(r"刚刚偷看你直播被老板发现"),
re.compile(r"还好老板是蝙蝠侠"),
re.compile(r"原来直播间有这么多姐妹"),
re.compile(r"之前我一直不敢发弹幕"),
re.compile(r"强子也就是吃了直播的红利"),
re.compile(r"你声音好像强子"),
re.compile(r"怎么回事强子"),
re.compile(r"你是个人吗强"),
]
TEMPLATE_MIN_LENGTH = 14
TEMPLATE_MIN_REPEAT = 4
REPEAT_MIN_COUNT = 3
@classmethod
def parse_danmu_line(cls, line: str) -> Optional[Dict[str, Any]]:
text = str(line or "").strip()
if not text:
return None
match = cls.LINE_PATTERN.match(text)
if not match:
return None
try:
ts = datetime.strptime(match.group("timestamp"), "%Y-%m-%d %H:%M:%S")
except Exception:
return None
return {
"timestamp": ts,
"timestamp_text": match.group("timestamp"),
"nickname": match.group("nickname").strip(),
"uid": str(match.group("uid")).strip(),
**cls._parse_profile_text(str(match.group("profile") or "")),
"content": match.group("content").strip(),
"raw": text,
}
@classmethod
def load_session_messages(cls, room_id: str, session: Dict[str, Any], base_dir: str = "temp") -> List[Dict[str, Any]]:
segments = cls._normalize_segments(session.get("segments", []) or [])
if not room_id or not segments:
return []
date_keys = sorted({segment["start"].strftime("%Y%m%d") for segment in segments} |
{segment["end"].strftime("%Y%m%d") for segment in segments})
collected: List[Dict[str, Any]] = []
for date_key in date_keys:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
continue
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if not parsed:
continue
if cls._in_any_segment(parsed["timestamp"], segments):
collected.append(parsed)
except Exception:
continue
return collected
@classmethod
def load_day_messages(cls, room_id: str, date_key: str, base_dir: str = "temp") -> List[Dict[str, Any]]:
file_path = os.path.join(base_dir, "douyu_danmu", date_key, f"{room_id}_{date_key}.txt")
if not os.path.exists(file_path):
return []
collected: List[Dict[str, Any]] = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
parsed = cls.parse_danmu_line(line)
if parsed:
collected.append(parsed)
return collected
@classmethod
def build_summary_material(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 10,
) -> Dict[str, Any]:
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages(normalized)
organized_messages = prepared["organized_messages"]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
deduped_messages = cls._dedupe_consecutive_messages(organized_messages)
burst_terms = cls._build_burst_terms(organized_messages)
top_terms = cls._extract_top_terms(organized_messages, limit=30)
bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes)
peak_buckets = sorted(bucket_stats, key=lambda item: item.get("message_count", 0), reverse=True)[:top_bucket_count]
return {
"session_id": session.get("session_id", ""),
"room_id": room_id,
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"deduped_message_count": len(deduped_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
"merged_templates": prepared["merged_templates"],
"top_terms": top_terms,
"burst_terms": burst_terms,
"time_buckets": bucket_stats,
"peak_buckets": peak_buckets,
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
}
@classmethod
def build_llm_payload(
cls,
room_id: str,
session: Dict[str, Any],
messages: List[Dict[str, Any]],
bucket_minutes: int = 5,
top_bucket_count: int = 8,
top_repeat_count: int = 24,
) -> Dict[str, Any]:
"""
面向 LLM 的高保真弹幕载荷。
规则:
1. 仅过滤平台/机器人类系统噪音。
2. 相同或高度模板化的内容做聚合,不直接删除。
3. 其他不同内容尽量保留,并按时段/热点组织给模型。
"""
normalized = [item for item in messages if item and item.get("content")]
prepared = cls._prepare_messages(normalized)
organized_messages = prepared["organized_messages"]
bucket_stats = cls._build_time_buckets(organized_messages, minutes=bucket_minutes)
peak_buckets = sorted(
bucket_stats,
key=lambda item: item.get("message_count", 0),
reverse=True,
)[:top_bucket_count]
unique_users = {str(item.get("uid") or "") for item in normalized if str(item.get("uid") or "").strip()}
organized_unique_users = {
str(item.get("uid") or "") for item in organized_messages if str(item.get("uid") or "").strip()
}
return {
"session_meta": {
"room_id": room_id,
"session_id": session.get("session_id", ""),
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"segments": cls._serialize_segments(session.get("segments", []) or []),
"message_count": len(normalized),
"noise_filtered_count": len(prepared["noise_messages"]),
"organized_message_count": len(organized_messages),
"unique_user_count": len(unique_users),
"organized_unique_user_count": len(organized_unique_users),
},
"operator_metrics": cls._build_operator_metrics(normalized, organized_messages),
"cleaning_rules": [
"仅过滤系统噪音、机器人探测、平台提示类弹幕。",
"明显重复的长模板文案按内容聚合,保留出现次数、人数、首末时间。",
"其他相同内容按重复短语归并,但不抹掉不同观点和不同句式。",
"高峰时段补充原始弹幕样本,方便 LLM 还原语境。",
],
"merged_templates": prepared["merged_templates"],
"repeated_messages": cls._build_repeated_messages(
organized_messages,
limit=top_repeat_count,
),
"top_terms": cls._extract_top_terms(organized_messages, limit=30),
"burst_terms": cls._build_burst_terms(organized_messages),
"peak_buckets": cls._simplify_peak_buckets(peak_buckets),
"representative_messages": cls._pick_representative_messages(organized_messages, bucket_stats),
"raw_window_samples": cls._build_raw_window_samples(peak_buckets, per_bucket_limit=8),
}
@staticmethod
def _parse_profile_text(profile_text: str) -> Dict[str, Any]:
text = str(profile_text or "").strip()
room_level = 0
fans_name = ""
fans_level = 0
noble_name = ""
room_match = re.search(r",\s*Lv\s*(\d+)", text, re.I)
if room_match:
try:
room_level = int(room_match.group(1))
except Exception:
room_level = 0
fans_match = re.search(r"/\s*([^/]+?)\s+Lv\s*(\d+)", text, re.I)
if fans_match:
fans_name = str(fans_match.group(1) or "").strip()
try:
fans_level = int(fans_match.group(2))
except Exception:
fans_level = 0
noble_match = re.search(r"/\s*(骑士|子爵|伯爵|公爵|国王|皇帝|游侠|超级皇帝|幻神)\b", text)
if noble_match:
noble_name = str(noble_match.group(1) or "").strip()
return {
"room_level": room_level,
"fans_name": fans_name,
"fans_level": fans_level,
"noble_name": noble_name,
"has_fans_badge": bool(fans_name),
"has_noble": bool(noble_name),
}
@classmethod
def _build_operator_metrics(cls, messages: List[Dict[str, Any]], organized_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
user_profiles: Dict[str, Dict[str, Any]] = {}
user_message_count = Counter()
user_organized_count = Counter()
fans_badge_users: Set[str] = set()
noble_users: Set[str] = set()
high_room_level_users: Set[str] = set()
high_fans_level_users: Set[str] = set()
fans_badge_message_count = 0
noble_message_count = 0
room_level_histogram = Counter()
fans_level_histogram = Counter()
badge_user_counter = Counter()
badge_message_counter = Counter()
for item in messages:
uid = str(item.get("uid") or "").strip()
if not uid:
continue
user_message_count[uid] += 1
profile = user_profiles.setdefault(uid, {
"uid": uid,
"nickname": str(item.get("nickname") or "").strip(),
"room_level": 0,
"fans_name": "",
"fans_level": 0,
"noble_name": "",
})
room_level = int(item.get("room_level") or 0)
fans_level = int(item.get("fans_level") or 0)
fans_name = str(item.get("fans_name") or "").strip()
noble_name = str(item.get("noble_name") or "").strip()
if room_level > int(profile.get("room_level") or 0):
profile["room_level"] = room_level
if fans_level > int(profile.get("fans_level") or 0):
profile["fans_level"] = fans_level
if fans_name and not profile.get("fans_name"):
profile["fans_name"] = fans_name
if noble_name and not profile.get("noble_name"):
profile["noble_name"] = noble_name
if profile.get("nickname") == "" and str(item.get("nickname") or "").strip():
profile["nickname"] = str(item.get("nickname") or "").strip()
if fans_name:
fans_badge_users.add(uid)
fans_badge_message_count += 1
badge_user_counter[fans_name] += 0
badge_message_counter[fans_name] += 1
if noble_name:
noble_users.add(uid)
noble_message_count += 1
for uid, profile in user_profiles.items():
room_level = int(profile.get("room_level") or 0)
fans_level = int(profile.get("fans_level") or 0)
fans_name = str(profile.get("fans_name") or "").strip()
noble_name = str(profile.get("noble_name") or "").strip()
room_level_histogram[cls._level_bucket(room_level)] += 1
if fans_name:
badge_user_counter[fans_name] += 1
fans_level_histogram[cls._fans_level_bucket(fans_level)] += 1
if noble_name:
noble_users.add(uid)
if room_level >= 30:
high_room_level_users.add(uid)
if fans_level >= 10:
high_fans_level_users.add(uid)
for item in organized_messages:
uid = str(item.get("uid") or "").strip()
if uid:
user_organized_count[uid] += 1
active_users_5plus = sum(1 for uid, count in user_message_count.items() if count >= 5)
active_users_10plus = sum(1 for uid, count in user_message_count.items() if count >= 10)
top_active_users = []
for uid, count in user_message_count.most_common(12):
profile = user_profiles.get(uid, {})
top_active_users.append({
"uid": uid,
"nickname": str(profile.get("nickname") or ""),
"message_count": count,
"organized_message_count": int(user_organized_count.get(uid, 0) or 0),
"room_level": int(profile.get("room_level", 0) or 0),
"fans_name": str(profile.get("fans_name") or ""),
"fans_level": int(profile.get("fans_level", 0) or 0),
"noble_name": str(profile.get("noble_name") or ""),
})
top_badges = []
for badge_name, unique_user_count in badge_user_counter.most_common(10):
if not badge_name:
continue
top_badges.append({
"badge_name": badge_name,
"user_count": unique_user_count,
"message_count": int(badge_message_counter.get(badge_name, 0) or 0),
})
total_unique_users = max(len(user_profiles), 1)
return {
"active_unique_users": len(user_profiles),
"active_users_5plus": active_users_5plus,
"active_users_10plus": active_users_10plus,
"fans_badge_user_count": len(fans_badge_users),
"fans_badge_user_ratio": round(len(fans_badge_users) / total_unique_users, 4),
"fans_badge_message_count": fans_badge_message_count,
"high_room_level_user_count": len(high_room_level_users),
"high_room_level_threshold": 30,
"high_fans_level_user_count": len(high_fans_level_users),
"high_fans_level_threshold": 10,
"noble_user_count": len(noble_users),
"noble_message_count": noble_message_count,
"room_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(room_level_histogram.items())
],
"fans_level_distribution": [
{"bucket": bucket, "user_count": count}
for bucket, count in sorted(fans_level_histogram.items())
],
"top_badges": top_badges,
"top_active_users": top_active_users,
}
@staticmethod
def _level_bucket(level: int) -> str:
if level >= 40:
return "40+"
if level >= 30:
return "30-39"
if level >= 20:
return "20-29"
if level >= 10:
return "10-19"
return "1-9"
@staticmethod
def _fans_level_bucket(level: int) -> str:
if level >= 20:
return "20+"
if level >= 15:
return "15-19"
if level >= 10:
return "10-14"
if level >= 5:
return "5-9"
return "1-4"
@classmethod
def infer_sessions_from_messages(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
merge_gap_hours: int = 4,
min_session_messages: int = 50,
) -> List[Dict[str, Any]]:
if not messages:
return []
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
sessions: List[Dict[str, Any]] = []
current_messages: List[Dict[str, Any]] = []
def flush_current():
if len(current_messages) < min_session_messages:
return
session = cls._build_inferred_session(
room_id,
current_messages,
session_cutoff_hour=session_cutoff_hour,
)
if session:
sessions.append(session)
prev_dt: Optional[datetime] = None
for item in ordered:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
if prev_dt is not None:
gap_seconds = (ts - prev_dt).total_seconds()
if gap_seconds > merge_gap_hours * 3600:
flush_current()
current_messages = []
current_messages.append(item)
prev_dt = ts
flush_current()
return sessions
@classmethod
def _build_inferred_session(
cls,
room_id: str,
messages: List[Dict[str, Any]],
*,
session_cutoff_hour: int = 6,
) -> Optional[Dict[str, Any]]:
if not messages:
return None
ordered = sorted(messages, key=lambda item: item.get("timestamp") or datetime.min)
start_dt = ordered[0]["timestamp"]
end_dt = ordered[-1]["timestamp"]
anchor_dt = start_dt
if start_dt.hour < session_cutoff_hour:
from datetime import timedelta
anchor_dt = start_dt - timedelta(days=1)
anchor_day = anchor_dt.strftime("%Y-%m-%d")
segments = []
seg_start = ordered[0]["timestamp"]
prev_dt = ordered[0]["timestamp"]
for item in ordered[1:]:
current_dt = item["timestamp"]
if (current_dt - prev_dt).total_seconds() > 30 * 60:
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": prev_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
seg_start = current_dt
prev_dt = current_dt
segments.append({
"start_time": seg_start.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"),
})
return {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_{start_dt.strftime('%H%M%S')}",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": segments,
"is_live": False,
"source": "inferred_from_danmu",
}
@staticmethod
def _normalize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, datetime]]:
normalized = []
for item in segments:
try:
start_dt = datetime.strptime(str(item.get("start_time") or ""), "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(str(item.get("end_time") or ""), "%Y-%m-%d %H:%M:%S")
except Exception:
continue
if end_dt < start_dt:
continue
normalized.append({"start": start_dt, "end": end_dt})
return normalized
@staticmethod
def _serialize_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, str]]:
result = []
for item in segments:
start_time = str(item.get("start_time") or "").strip()
end_time = str(item.get("end_time") or "").strip()
if start_time and end_time:
result.append({"start_time": start_time, "end_time": end_time})
return result
@staticmethod
def _in_any_segment(target_dt: datetime, segments: List[Dict[str, datetime]]) -> bool:
for segment in segments:
if segment["start"] <= target_dt <= segment["end"]:
return True
return False
@staticmethod
def _dedupe_consecutive_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result: List[Dict[str, Any]] = []
prev_key = None
repeat_count = 0
for item in messages:
current_key = (item.get("uid"), item.get("content"))
if current_key == prev_key:
repeat_count += 1
result[-1]["repeat_count"] = repeat_count
continue
copied = dict(item)
copied["repeat_count"] = 1
result.append(copied)
prev_key = current_key
repeat_count = 1
return result
@classmethod
def _filter_noise_messages(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
result = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
continue
result.append(item)
return result
@classmethod
def _prepare_messages(cls, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
noise_messages: List[Dict[str, Any]] = []
candidate_messages: List[Dict[str, Any]] = []
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
if cls._is_noise_message(content):
noise_messages.append(item)
continue
candidate_messages.append(item)
merged_templates, organized_messages = cls._merge_template_messages(candidate_messages)
return {
"noise_messages": noise_messages,
"merged_templates": merged_templates,
"organized_messages": organized_messages,
}
@classmethod
def _is_noise_message(cls, content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
for pattern in cls.NOISE_PATTERNS:
if pattern.search(text):
return True
if len(text) >= 30 and len(set(text)) <= 6:
return True
return False
@classmethod
def _merge_template_messages(cls, messages: List[Dict[str, Any]]) -> (List[Dict[str, Any]], List[Dict[str, Any]]):
normalized_counter = Counter()
normalized_to_messages: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
normalized = cls._normalize_template_text(str(item.get("content") or ""))
if not normalized:
continue
normalized_counter[normalized] += 1
normalized_to_messages[normalized].append(item)
template_keys = {
key for key, count in normalized_counter.items()
if len(key) >= cls.TEMPLATE_MIN_LENGTH and count >= cls.TEMPLATE_MIN_REPEAT
}
for key in list(normalized_counter.keys()):
if any(pattern.search(key) for pattern in cls.TEMPLATE_HINT_PATTERNS):
template_keys.add(key)
merged_templates: List[Dict[str, Any]] = []
organized_messages: List[Dict[str, Any]] = []
for normalized, items in normalized_to_messages.items():
if normalized in template_keys:
first = items[0]
merged_templates.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({str(item.get('uid') or '') for item in items if str(item.get('uid') or '').strip()}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
else:
organized_messages.extend(items)
merged_templates.sort(key=lambda item: item.get("count", 0), reverse=True)
organized_messages.sort(key=lambda item: item.get("timestamp") or datetime.min)
return merged_templates[:20], organized_messages
@classmethod
def _build_repeated_messages(cls, messages: List[Dict[str, Any]], limit: int = 24) -> List[Dict[str, Any]]:
grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
content = str(item.get("content") or "").strip()
if not content:
continue
normalized = cls._normalize_template_text(content)
if not normalized:
continue
if cls._looks_like_pure_punctuation(content):
continue
grouped[normalized].append(item)
repeated_messages: List[Dict[str, Any]] = []
for items in grouped.values():
if len(items) < cls.REPEAT_MIN_COUNT:
continue
first = items[0]
repeated_messages.append({
"text": str(first.get("content") or "").strip()[:120],
"count": len(items),
"user_count": len({
str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()
}),
"first_time": str(first.get("timestamp_text") or ""),
"last_time": str(items[-1].get("timestamp_text") or ""),
})
repeated_messages.sort(key=lambda item: item.get("count", 0), reverse=True)
return repeated_messages[:limit]
@classmethod
def _build_burst_terms(cls, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
counters: Dict[str, Dict[str, Any]] = {}
for item in messages:
content = str(item.get("content") or "").strip().lower()
if not content:
continue
if content not in cls.SHORT_BURST_WORDS and len(content) > 6:
continue
target = counters.setdefault(content, {"text": content, "count": 0, "users": set()})
target["count"] += 1
target["users"].add(str(item.get("uid") or ""))
result = []
for item in sorted(counters.values(), key=lambda entry: entry["count"], reverse=True)[:15]:
result.append({
"text": item["text"],
"count": item["count"],
"user_count": len([uid for uid in item["users"] if uid]),
})
return result
@classmethod
def _extract_top_terms(cls, messages: List[Dict[str, Any]], limit: int = 30) -> List[Dict[str, Any]]:
counter = Counter()
for item in messages:
for token in cls._tokenize(str(item.get("content") or "")):
counter[token] += 1
result = []
for term, count in counter.most_common(limit):
result.append({"term": term, "count": count})
return result
@classmethod
def _tokenize(cls, content: str) -> List[str]:
text = str(content or "").lower().strip()
if not text:
return []
chinese_terms = re.findall(r"[\u4e00-\u9fff]{2,6}", text)
alpha_terms = re.findall(r"[a-z0-9_\-]{3,20}", text)
tokens = []
for token in chinese_terms + alpha_terms:
normalized = token.strip().lower()
if not normalized or normalized in cls.STOPWORDS:
continue
if normalized.isdigit() and len(normalized) <= 2:
continue
tokens.append(normalized)
return tokens
@staticmethod
def _normalize_template_text(content: str) -> str:
text = str(content or "").strip().lower()
if not text:
return ""
text = re.sub(r"\s+", "", text)
text = re.sub(r"[`~!@#$%^&*()_\-+=\[\]{}\\|;:'\",.<>/?,。!?、…()【】《》“”‘’·]", "", text)
text = re.sub(r"(.)\1{4,}", r"\1\1\1", text)
return text
@classmethod
def _build_time_buckets(cls, messages: List[Dict[str, Any]], minutes: int = 5) -> List[Dict[str, Any]]:
buckets: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for item in messages:
ts = item.get("timestamp")
if not isinstance(ts, datetime):
continue
bucket_minute = (ts.minute // minutes) * minutes
bucket_key = ts.replace(minute=bucket_minute, second=0)
buckets[bucket_key.strftime("%Y-%m-%d %H:%M:%S")].append(item)
results: List[Dict[str, Any]] = []
for bucket_start, items in sorted(buckets.items()):
top_terms = cls._extract_top_terms(items, limit=8)
burst_terms = cls._build_burst_terms(items)[:5]
results.append({
"start_time": bucket_start,
"message_count": len(items),
"user_count": len({str(item.get("uid") or "") for item in items if str(item.get("uid") or "").strip()}),
"top_terms": top_terms,
"burst_terms": burst_terms,
"sample_messages": cls._pick_bucket_samples(items, limit=6),
})
return results
@staticmethod
def _pick_bucket_samples(items: List[Dict[str, Any]], limit: int = 6) -> List[Dict[str, str]]:
if not items:
return []
indexes = sorted({0, len(items) // 3, len(items) // 2, (len(items) * 2) // 3, len(items) - 1})
selected = []
seen = set()
for idx in indexes:
item = items[idx]
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= limit:
break
return selected
@classmethod
def _pick_representative_messages(cls, messages: List[Dict[str, Any]], buckets: List[Dict[str, Any]]) -> List[Dict[str, str]]:
selected: List[Dict[str, str]] = []
seen = set()
for bucket in sorted(buckets, key=lambda item: item.get("message_count", 0), reverse=True)[:6]:
for sample in bucket.get("sample_messages", []):
content = str(sample.get("content") or "").strip()
if not content or content in seen:
continue
selected.append(sample)
seen.add(content)
if len(selected) >= 18:
return selected
for item in messages:
content = str(item.get("content") or "").strip()
if not content or content in seen:
continue
selected.append({
"time": str(item.get("timestamp_text") or ""),
"nickname": str(item.get("nickname") or ""),
"content": content[:80],
})
seen.add(content)
if len(selected) >= 18:
break
return selected
@classmethod
def _build_raw_window_samples(
cls,
peak_buckets: List[Dict[str, Any]],
per_bucket_limit: int = 8,
) -> List[Dict[str, Any]]:
windows: List[Dict[str, Any]] = []
for bucket in peak_buckets:
samples = []
for sample in bucket.get("sample_messages", [])[:per_bucket_limit]:
content = str(sample.get("content") or "").strip()
if not content:
continue
samples.append({
"time": str(sample.get("time") or ""),
"nickname": str(sample.get("nickname") or ""),
"content": content,
})
if not samples:
continue
windows.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"samples": samples,
})
return windows
@staticmethod
def _simplify_peak_buckets(buckets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
simplified = []
for bucket in buckets:
simplified.append({
"start_time": str(bucket.get("start_time") or ""),
"message_count": int(bucket.get("message_count", 0) or 0),
"user_count": int(bucket.get("user_count", 0) or 0),
"top_terms": bucket.get("top_terms", [])[:6],
"burst_terms": bucket.get("burst_terms", [])[:5],
})
return simplified
@staticmethod
def _looks_like_pure_punctuation(content: str) -> bool:
text = str(content or "").strip()
if not text:
return True
return re.fullmatch(r"[\W_]+", text, re.UNICODE) is not None