Files
abot/plugins/douyu/main.py

1824 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from collections import Counter
from datetime import datetime, timedelta
import os
from pathlib import Path
import threading
import time
from typing import Dict, Any, List, Optional, Tuple, Set
import aiohttp
from loguru import logger
import ssl
import zlib
try:
import websocket
except ImportError:
websocket = None
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from plugins.ai_auto_response.llm_client import LLMClient
from plugins.douyu.danmu_summary import DouyuDanmuSummaryHelper
from plugins.douyu.report_template import render_daily_report_html
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML
class DouyuDanmuRecorder:
def __init__(self, room_id: str, user_agent: str, stats_callback=None, stats_sample_interval_seconds: int = 60):
self.room_id = room_id
self.user_agent = user_agent
self.stats_callback = stats_callback
self.stats_sample_interval_seconds = max(0, int(stats_sample_interval_seconds or 0))
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._ws: Optional[websocket.WebSocketApp] = None
self._buffer: List[str] = []
self._buffer_limit = 10
self._buffer_date: Optional[str] = None
self._lock = threading.Lock()
self._websocket_available = websocket is not None
self._latest_vip_count: Optional[int] = None
self._latest_diamond_count: Optional[int] = None
self._last_stats_signature: Tuple[Optional[int], Optional[int]] = (None, None)
def _encode(self, msg: str) -> bytes:
content = msg.encode("utf-8") + b"\x00"
length = len(content) + 8
head = length.to_bytes(4, "little") * 2
head += (689).to_bytes(2, "little")
head += b"\x00\x00"
return head + content
@staticmethod
def _parse_parts(line: str) -> Dict[str, Any]:
parts: Dict[str, Any] = {}
for pair in line.split("/"):
if "@=" in pair:
key, value = pair.split("@=", 1)
parts[key] = value
return parts
@staticmethod
def _safe_int(value: Any, default: Optional[int] = None) -> Optional[int]:
try:
return int(str(value))
except Exception:
return default
def _maybe_emit_stats(self, force: bool = False) -> None:
if not self.stats_callback:
return
if self._latest_vip_count is None and self._latest_diamond_count is None:
return
signature = (self._latest_vip_count, self._latest_diamond_count)
if not force:
if signature == self._last_stats_signature:
return
point = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"vip_count": self._latest_vip_count,
"diamond_count": self._latest_diamond_count,
}
try:
self.stats_callback(self.room_id, point)
self._last_stats_signature = signature
except Exception as e:
logger.warning(f"斗鱼人数采样回调失败({self.room_id}): {e}")
def _on_message(self, ws, message):
try:
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
data = decompressed.decode("utf-8", errors="ignore")
except Exception:
data = message.decode("utf-8", errors="ignore")
for line in data.split("\x00"):
line = line.strip()
if not line:
continue
parts = self._parse_parts(line)
msg_type = str(parts.get("type") or "").strip()
if msg_type == "oni":
vip_count = self._safe_int(parts.get("vn"))
if vip_count is not None:
self._latest_vip_count = vip_count
self._maybe_emit_stats()
continue
if msg_type == "dfnum":
diamond_count = self._safe_int(parts.get("dfc"))
if diamond_count is not None:
self._latest_diamond_count = diamond_count
self._maybe_emit_stats()
continue
if msg_type != "chatmsg":
continue
nick = parts.get("nn", "未知")
txt = parts.get("txt", "")
uid = parts.get("uid", "未知")
level = parts.get("level", "0")
fan_group = parts.get("bnn", "")
fan_level = parts.get("bl", "0")
time_stamp = parts.get("cst", "")
if time_stamp:
try:
if time_stamp.isdigit():
ts = int(time_stamp)
if ts > 10 ** 12:
ts = ts / 1000
dt = datetime.fromtimestamp(ts)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
if fan_group:
output += f" / {fan_group} Lv{fan_level}"
output += f"){txt}"
self._append_and_maybe_flush(output)
def _flush_locked(self):
if not self._buffer or self._buffer_date is None:
return
dir_path = os.path.join("temp", "douyu_danmu", self._buffer_date)
os.makedirs(dir_path, exist_ok=True)
file_name = os.path.join(dir_path, f"{self.room_id}_{self._buffer_date}.txt")
data = "\n".join(self._buffer) + "\n"
with open(file_name, "a", encoding="utf-8") as f:
f.write(data)
self._buffer.clear()
def _append_and_maybe_flush(self, line: str):
now = datetime.now()
date_str = now.strftime("%Y%m%d")
with self._lock:
if self._buffer_date is None:
self._buffer_date = date_str
elif date_str != self._buffer_date:
self._flush_locked()
self._buffer_date = date_str
self._buffer.append(line)
if len(self._buffer) >= self._buffer_limit:
self._flush_locked()
def _flush(self):
with self._lock:
self._flush_locked()
def _on_open(self, ws):
ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/"))
ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@=-9999/"))
def heartbeat():
while ws.sock and ws.sock.connected and not self._stop_event.is_set():
try:
ws.send(self._encode("type@=mrkl/"))
except Exception:
break
time.sleep(38)
threading.Thread(target=heartbeat, daemon=True).start()
def _on_error(self, ws, error):
logger.error(f"斗鱼弹幕错误({self.room_id}): {error}")
def _on_close(self, ws, code, msg):
logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}")
def _run(self):
if not self._websocket_available:
logger.error(f"websocket-client 未安装,无法记录弹幕({self.room_id})")
return
try:
websocket.enableTrace(False)
ws_urls = [
"wss://danmuproxy.douyu.com:8501/",
"wss://danmuproxy.douyu.com:8502/",
"wss://danmuproxy.douyu.com:8503/",
"wss://danmuproxy.douyu.com:8504/",
"wss://danmuproxy.douyu.com:8505/",
"wss://danmuproxy.douyu.com:8506/",
]
sslopt = {
"cert_reqs": ssl.CERT_NONE,
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
"ciphers": "DEFAULT@SECLEVEL=1",
}
headers = {"User-Agent": self.user_agent}
for url in ws_urls:
if self._stop_event.is_set():
break
try:
self._ws = websocket.WebSocketApp(
url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
header=headers,
)
self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
except Exception as e:
logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}")
continue
finally:
self._ws = None
if self._stop_event.is_set():
break
time.sleep(1)
finally:
self._ws = None
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._maybe_emit_stats(force=True)
self._flush()
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
class DouyuRedisManager:
def __init__(self, db_manager: DBConnectionManager):
self.redis = db_manager.get_redis_connection()
self.prefix = "bot:douyu:"
def add_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.sadd(key, room_id) >= 0
def remove_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.srem(key, room_id) >= 0
def list_group_rooms(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:rooms"
rooms = self.redis.smembers(key) or set()
result = []
for r in rooms:
result.append(r.decode("utf-8") if isinstance(r, bytes) else r)
return sorted(result)
def all_subscribed_rooms(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
rooms: Set[str] = set()
for gid in groups:
for r in self.list_group_rooms(gid):
rooms.add(r)
return rooms
def groups_for_room(self, room_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if room_id in set(self.list_group_rooms(gid)):
res.append(gid)
return res
# --- 鱼吧相关方法 ---
def add_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.sadd(key, hash_id) >= 0
def remove_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.srem(key, hash_id) >= 0
def list_group_yubas(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:yubas"
yubas = self.redis.smembers(key) or set()
result = []
for y in yubas:
result.append(y.decode("utf-8") if isinstance(y, bytes) else y)
return sorted(result)
def all_subscribed_yubas(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
yubas: Set[str] = set()
for gid in groups:
for y in self.list_group_yubas(gid):
yubas.add(y)
return yubas
def groups_for_yuba(self, hash_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if hash_id in set(self.list_group_yubas(gid)):
res.append(gid)
return res
def get_yuba_last_id(self, hash_id: str) -> Optional[str]:
key = f"{self.prefix}yuba_last_id:{hash_id}"
data = self.redis.get(key)
if not data:
return None
return data.decode("utf-8") if isinstance(data, bytes) else data
def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool:
key = f"{self.prefix}yuba_last_id:{hash_id}"
return self.redis.set(key, feed_id)
# --- 提醒名单方法 ---
def add_group_subscriber(self, group_id: str, user_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:subscribers"
return self.redis.sadd(key, user_id) >= 0
def remove_group_subscriber(self, group_id: str, user_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:subscribers"
return self.redis.srem(key, user_id) >= 0
def list_group_subscribers(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:subscribers"
subs = self.redis.smembers(key) or set()
result = []
for s in subs:
result.append(s.decode("utf-8") if isinstance(s, bytes) else s)
return sorted(result)
def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room_status:{room_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool:
key = f"{self.prefix}room_status:{room_id}"
return self.redis.set(key, json.dumps(status, ensure_ascii=False))
def get_room_session(self, room_id: str, session_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room:{room_id}:session:{session_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def save_room_session(self, room_id: str, session: Dict[str, Any]) -> bool:
session_id = str(session.get("session_id") or "").strip()
if not session_id:
return False
payload = json.dumps(session, ensure_ascii=False)
session_key = f"{self.prefix}room:{room_id}:session:{session_id}"
latest_key = f"{self.prefix}room:{room_id}:latest_session"
index_key = f"{self.prefix}room:{room_id}:session_ids"
pipe = self.redis.pipeline()
pipe.set(session_key, payload)
pipe.set(latest_key, session_id)
pipe.lrem(index_key, 0, session_id)
pipe.lpush(index_key, session_id)
pipe.ltrim(index_key, 0, 29)
result = pipe.execute()
return bool(result)
def get_latest_room_session(self, room_id: str) -> Optional[Dict[str, Any]]:
latest_key = f"{self.prefix}room:{room_id}:latest_session"
session_id = self.redis.get(latest_key)
if not session_id:
return None
if isinstance(session_id, bytes):
session_id = session_id.decode("utf-8")
return self.get_room_session(room_id, str(session_id))
def list_room_session_ids(self, room_id: str, limit: int = 10) -> List[str]:
key = f"{self.prefix}room:{room_id}:session_ids"
rows = self.redis.lrange(key, 0, max(limit - 1, 0)) or []
result = []
for row in rows:
result.append(row.decode("utf-8") if isinstance(row, bytes) else str(row))
return result
def get_text_value(self, key: str) -> Optional[str]:
data = self.redis.get(key)
if not data:
return None
return data.decode("utf-8") if isinstance(data, bytes) else str(data)
def set_text_value(self, key: str, value: str) -> bool:
return bool(self.redis.set(key, value))
class DouyuPlugin(MessagePluginInterface):
_DAILY_REPORT_CACHE_VERSION = 3
FEATURE_KEY = "DOUYU_MONITOR"
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
@property
def name(self) -> str:
return "斗鱼直播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "斗鱼主播开播下播提醒与群订阅管理"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
self.redis_manager: Optional[DouyuRedisManager] = None
self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒",
"订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表", "#斗鱼弹幕日报", "斗鱼弹幕日报"]
self._api_template = "https://www.douyu.com/betard/{room_id}"
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
self._session_cutoff_hour = 6
self._merge_gap_hours = 4
self._daily_report_enable = True
self._daily_report_time = "10:05"
self._daily_report_min_messages = 120
self._daily_report_use_llm = False
self._daily_report_max_sessions = 4
self._daily_report_max_length = 1800
self._daily_report_send_image = True
self._audience_stats_sample_interval_seconds = 60
self._daily_report_llm_client: Optional[LLMClient] = None
self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {}
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
async_job.every_minutes(5)(self._scheduled_daily_report_tick)
async def _scheduled_unified_check_job(self):
"""统一检查直播和鱼吧动态"""
await self._scheduled_check_job()
await self._scheduled_yuba_check_job()
async def _scheduled_daily_report_tick(self):
"""每 5 分钟检查一次,命中配置时间后发送前一天日报。"""
if not self._daily_report_enable or not self.redis_manager or not self.bot:
return
now_dt = datetime.now()
if not self._should_run_daily_report(now_dt):
return
anchor_day = (now_dt - timedelta(days=1)).strftime("%Y-%m-%d")
try:
await self._send_daily_reports(anchor_day)
self.redis_manager.set_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")), now_dt.strftime("%Y-%m-%d %H:%M:%S"))
except Exception as e:
logger.error(f"斗鱼每日报告任务失败(anchor_day={anchor_day}): {e}")
def initialize(self, context: Dict[str, Any]) -> bool:
try:
dbm = DBConnectionManager.get_instance()
self.redis_manager = DouyuRedisManager(dbm)
self.bot = context.get("bot", self.bot)
cfg = self._config.get("Douyu", {})
cfg_cmds = cfg.get("command", [])
if isinstance(cfg_cmds, list) and cfg_cmds:
self._commands = list(dict.fromkeys(cfg_cmds + self._commands))
self._api_template = cfg.get("api_url_template", self._api_template)
self._user_agent = cfg.get("user_agent", self._user_agent)
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
self._session_cutoff_hour = int(cfg.get("session_cutoff_hour", self._session_cutoff_hour))
self._merge_gap_hours = int(cfg.get("merge_gap_hours", self._merge_gap_hours))
self._daily_report_enable = bool(cfg.get("daily_report_enable", self._daily_report_enable))
self._daily_report_time = str(cfg.get("daily_report_time", self._daily_report_time) or self._daily_report_time)
self._daily_report_min_messages = int(
cfg.get("daily_report_min_messages", self._daily_report_min_messages)
)
self._daily_report_use_llm = bool(cfg.get("daily_report_use_llm", self._daily_report_use_llm))
self._daily_report_max_sessions = int(cfg.get("daily_report_max_sessions", self._daily_report_max_sessions))
self._daily_report_max_length = int(cfg.get("daily_report_max_length", self._daily_report_max_length))
self._daily_report_send_image = bool(cfg.get("daily_report_send_image", self._daily_report_send_image))
self._audience_stats_sample_interval_seconds = int(
cfg.get("audience_stats_sample_interval_seconds", self._audience_stats_sample_interval_seconds)
)
report_api_cfg = cfg.get("report_api", {}) or {}
if report_api_cfg:
self._daily_report_llm_client = LLMClient(report_api_cfg)
return True
except Exception as e:
logger.error(f"{self.name} 初始化失败: {e}")
return False
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
for recorder in getattr(self, "_danmu_recorders", {}).values():
recorder.stop()
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
content = str(message.get("content", "")).strip()
if not content:
return False
first_token = content.split()[0]
return first_token in self._commands
@plugin_stats_decorator(plugin_name="斗鱼直播")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
first_token = content.split()[0]
if content == "斗鱼订阅列表":
rooms = self.redis_manager.list_group_rooms(roomid or sender)
if not rooms:
await self.bot.send_text_message(roomid or sender, "暂无订阅", sender)
return True, "暂无订阅"
text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
if first_token == "斗鱼订阅提醒":
if not roomid:
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
return True, "仅支持群聊"
ok = self.redis_manager.add_group_subscriber(roomid, sender)
await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender])
return True, "加入提醒名单成功" if ok else "加入提醒名单失败"
if first_token == "取消斗鱼订阅提醒":
if not roomid:
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
return True, "仅支持群聊"
ok = self.redis_manager.remove_group_subscriber(roomid, sender)
await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender])
return True, "取消提醒成功" if ok else "取消提醒失败"
if first_token == "斗鱼订阅":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
if not room_id.isdigit():
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender)
return True, "命令格式错误"
ok = self.redis_manager.add_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if first_token == "取消斗鱼订阅":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
if not room_id.isdigit():
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender)
return True, "命令格式错误"
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
return True, "取消成功" if ok else "取消失败"
if content == "鱼吧订阅列表":
yubas = self.redis_manager.list_group_yubas(roomid or sender)
if not yubas:
await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender)
return True, "暂无鱼吧订阅"
text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
if first_token == "订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如订阅鱼吧 PDAP2zEk3nwx",
sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if first_token == "取消订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如取消订阅鱼吧 PDAP2zEk3nwx",
sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender)
return True, "取消成功" if ok else "取消失败"
if first_token in {"#斗鱼弹幕日报", "斗鱼弹幕日报"}:
if not roomid:
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
return True, "仅支持群聊"
parts = content.split()
anchor_day = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
if len(parts) >= 2:
day_text = parts[1].strip()
try:
anchor_day = datetime.strptime(day_text, "%Y-%m-%d").strftime("%Y-%m-%d")
except Exception:
await self.bot.send_text_message(roomid, "日期格式错误,请使用:#斗鱼弹幕日报 2026-04-07", sender)
return True, "日期格式错误"
await self.bot.send_text_message(roomid, f"⏳ 正在生成斗鱼弹幕日报:{anchor_day}", sender)
delivered = await self._send_daily_reports(anchor_day, target_group_id=roomid, force=True)
if delivered:
return True, f"斗鱼弹幕日报已发送:{anchor_day}"
await self.bot.send_text_message(roomid, f"暂无可发送的斗鱼弹幕日报:{anchor_day}", sender)
return True, "暂无日报"
return False, None
async def _scheduled_check_job(self):
try:
rooms = self.redis_manager.all_subscribed_rooms()
if not rooms:
return
async with aiohttp.ClientSession() as session:
for room_id in rooms:
try:
url = self._api_template.format(room_id=room_id)
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://www.douyu.com/{room_id}"
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
room_info = data.get("room", {}) if isinstance(data, dict) else {}
show_status = room_info.get("show_status")
nickname = room_info.get("nickname", "")
room_name = room_info.get("room_name", "")
avatar = room_info.get("avatar", {}) or {}
thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip()
video_loop_raw = room_info.get("videoLoop", 0)
try:
video_loop = int(str(video_loop_raw))
except Exception:
video_loop = 0
prev = self.redis_manager.get_room_status(room_id) or {}
prev_live = prev.get("is_live")
curr_live = True if show_status == 1 and video_loop == 0 else False
status_obj = {
"is_live": curr_live,
"nickname": nickname,
"room_name": room_name,
"is_loop": True if video_loop == 1 else False
}
self.redis_manager.set_room_status(room_id, status_obj)
if prev_live is None and curr_live is False:
continue
if prev_live is None and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
continue
if prev_live is False and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
continue
if prev_live is True and curr_live is False:
await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1)
continue
if prev_live is True and curr_live is True and room_id not in self._danmu_recorders:
try:
room_session = self._open_or_resume_session(room_id, nickname, room_name)
if room_session:
logger.info(
f"检测到持续直播状态,续接斗鱼直播会话({room_id}): "
f"session={room_session.get('session_id')}"
)
logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})")
self._start_danmu_record(room_id)
except Exception as e:
logger.error(f"补偿启动斗鱼弹幕记录失败({room_id}): {e}")
continue
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"斗鱼检查失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼定时任务异常: {e}")
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str):
groups = self.redis_manager.groups_for_room(room_id)
text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看https://www.douyu.com/{room_id}"
xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url)
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
subs = self.redis_manager.list_group_subscribers(gid)
if subs:
await self.bot.send_at_message(gid, text, subs)
else:
await self.bot.send_text_message(gid, text)
await self.bot.send_link_xml_message(xml_content, gid)
except Exception as e:
logger.error(f"发送斗鱼开播提醒失败: {e}")
continue
try:
session = self._open_or_resume_session(room_id, nickname, room_name)
if session:
logger.info(
f"斗鱼直播会话开启/续接: room={room_id}, session={session.get('session_id')}, "
f"segments={len(session.get('segments', []))}, anchor_day={session.get('anchor_day')}"
)
logger.info(f"启动斗鱼弹幕记录({room_id})")
self._start_danmu_record(room_id)
except Exception as e:
logger.error(f"启动斗鱼弹幕记录失败({room_id}): {e}")
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False):
groups = self.redis_manager.groups_for_room(room_id)
text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}"
if is_loop:
text += "(当前为轮播)"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue
try:
session = self._close_active_session(room_id, nickname, room_name)
if session:
logger.info(
f"斗鱼直播会话关闭片段: room={room_id}, session={session.get('session_id')}, "
f"segments={len(session.get('segments', []))}, is_live={session.get('is_live')}"
)
logger.info(f"停止斗鱼弹幕记录({room_id})")
self._stop_danmu_record(room_id)
except Exception as e:
logger.error(f"停止斗鱼弹幕记录失败({room_id}): {e}")
async def _scheduled_yuba_check_job(self):
try:
yubas = self.redis_manager.all_subscribed_yubas()
if not yubas:
return
async with aiohttp.ClientSession() as session:
for hash_id in yubas:
try:
params = {
"filter_type": 1,
"hash_id": hash_id,
"limit": 10,
"offset": 0
}
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news",
}
async with session.get(self._yuba_api, headers=headers, params=params,
timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
if data.get("error") != 0:
logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}")
continue
feed_list = data.get("data", {}).get("feed_list", [])
# 查找第一条【非置顶】动态
target_feed = None
for feed in feed_list:
if feed.get("home_feed_top") == 1:
continue
target_feed = feed
break
if not target_feed:
continue
feed_id = str(target_feed.get("feed_id"))
last_id = self.redis_manager.get_yuba_last_id(hash_id)
if last_id and feed_id == last_id:
continue
# 发现新动态
nickname = target_feed.get("publisher", {}).get("nickname", "未知主播")
content = target_feed.get("text", "")
ctime = target_feed.get("ctime")
from datetime import datetime
publish_time = datetime.fromtimestamp(int(ctime)).strftime(
'%Y-%m-%d %H:%M:%S') if ctime else "未知时间"
# 限制内容长度
if len(content) > 200:
content = content[:200] + "..."
full_url = f"https://yuba.douyu.com/feed/{feed_id}"
await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time)
# 保存标记
self.redis_manager.set_yuba_last_id(hash_id, feed_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼鱼吧定时任务异常: {e}")
async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str,
publish_time: str = "未知时间"):
groups = self.redis_manager.groups_for_yuba(hash_id)
text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}")
continue
def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder:
recorder = self._danmu_recorders.get(room_id)
if not recorder:
recorder = DouyuDanmuRecorder(
room_id,
self._user_agent,
stats_callback=self._record_room_audience_point,
stats_sample_interval_seconds=self._audience_stats_sample_interval_seconds,
)
self._danmu_recorders[room_id] = recorder
return recorder
@staticmethod
def _normalize_audience_points(points: List[Dict[str, Any]], limit: int = 720) -> List[Dict[str, Any]]:
normalized: List[Dict[str, Any]] = []
seen = set()
for item in points or []:
timestamp = str(item.get("timestamp") or "").strip()
if not timestamp or timestamp in seen:
continue
seen.add(timestamp)
normalized.append({
"timestamp": timestamp,
"vip_count": int(item.get("vip_count", 0) or 0),
"diamond_count": int(item.get("diamond_count", 0) or 0),
})
normalized.sort(key=lambda row: row.get("timestamp", ""))
if len(normalized) > limit:
normalized = normalized[-limit:]
return normalized
def _record_room_audience_point(self, room_id: str, point: Dict[str, Any]) -> None:
if not self.redis_manager or not room_id:
return
session = self.redis_manager.get_latest_room_session(room_id)
if not session or not bool(session.get("is_live")):
return
current_points = self._normalize_audience_points(list(session.get("audience_points", []) or []))
merged_points = self._normalize_audience_points(current_points + [point])
if merged_points == current_points:
return
session["audience_points"] = merged_points
session["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.redis_manager.save_room_session(room_id, session)
def _resolve_anchor_day(self, target_dt: datetime) -> str:
if target_dt.hour < self._session_cutoff_hour:
target_dt = target_dt - timedelta(days=1)
return target_dt.strftime("%Y-%m-%d")
@staticmethod
def _parse_session_time(value: str) -> Optional[datetime]:
if not value:
return None
try:
return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S")
except Exception:
return None
@staticmethod
def _find_open_segment(session: Dict[str, Any]) -> Optional[Dict[str, Any]]:
for segment in reversed(session.get("segments", []) or []):
if not str(segment.get("end_time") or "").strip():
return segment
return None
def _should_merge_with_latest_session(self, latest_session: Optional[Dict[str, Any]], now_dt: datetime) -> bool:
if not latest_session:
return False
if latest_session.get("is_live"):
return True
segments = latest_session.get("segments", []) or []
if not segments:
return False
last_segment = segments[-1]
end_dt = self._parse_session_time(last_segment.get("end_time", ""))
if not end_dt:
return False
gap_seconds = (now_dt - end_dt).total_seconds()
return 0 <= gap_seconds <= self._merge_gap_hours * 3600
def _open_or_resume_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]:
if not self.redis_manager:
return None
now_dt = datetime.now()
now_str = now_dt.strftime("%Y-%m-%d %H:%M:%S")
latest_session = self.redis_manager.get_latest_room_session(room_id) or {}
if self._should_merge_with_latest_session(latest_session, now_dt):
session = dict(latest_session)
open_segment = self._find_open_segment(session)
if not open_segment:
segments = list(session.get("segments", []) or [])
segments.append({"start_time": now_str, "end_time": ""})
session["segments"] = segments
else:
anchor_day = self._resolve_anchor_day(now_dt)
session = {
"session_id": f"{room_id}_{anchor_day.replace('-', '')}_{now_dt.strftime('%H%M%S')}",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": nickname,
"room_name": room_name,
"segments": [{"start_time": now_str, "end_time": ""}],
"audience_points": [],
"is_live": True,
"summary_status": "pending",
"summary_generated_at": "",
"created_at": now_str,
}
session["nickname"] = nickname or session.get("nickname", "")
session["room_name"] = room_name or session.get("room_name", "")
session["audience_points"] = self._normalize_audience_points(list(session.get("audience_points", []) or []))
session["is_live"] = True
session["updated_at"] = now_str
session["last_live_at"] = now_str
self.redis_manager.save_room_session(room_id, session)
return session
def _close_active_session(self, room_id: str, nickname: str, room_name: str) -> Optional[Dict[str, Any]]:
if not self.redis_manager:
return None
session = self.redis_manager.get_latest_room_session(room_id)
if not session:
return None
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
open_segment = self._find_open_segment(session)
if open_segment:
open_segment["end_time"] = now_str
session["nickname"] = nickname or session.get("nickname", "")
session["room_name"] = room_name or session.get("room_name", "")
session["is_live"] = False
session["updated_at"] = now_str
session["last_offline_at"] = now_str
self.redis_manager.save_room_session(room_id, session)
return session
def get_room_session(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
if not self.redis_manager or not room_id:
return None
if session_id:
return self.redis_manager.get_room_session(room_id, session_id)
return self.redis_manager.get_latest_room_session(room_id)
def build_session_danmu_material(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
旁路能力:从已有日文件中按直播 session 抽取有效弹幕,并压缩成可供后续总结使用的材料。
当前不影响提醒、采集、群消息发送主流程。
"""
session = self.get_room_session(room_id, session_id=session_id)
if not session:
return None
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
material = DouyuDanmuSummaryHelper.build_summary_material(room_id, session, messages)
material["session"] = {
"session_id": session.get("session_id", ""),
"anchor_day": session.get("anchor_day", ""),
"nickname": session.get("nickname", ""),
"room_name": session.get("room_name", ""),
"is_live": bool(session.get("is_live")),
}
return material
def build_session_llm_payload(self, room_id: str, session_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""
旁路能力:构造可直接发送给 LLM 的弹幕总结载荷。
不改变现有弹幕采集和通知主流程。
"""
session = self.get_room_session(room_id, session_id=session_id)
if not session:
return None
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
return DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages)
def _daily_report_job_key(self, day_key: str) -> str:
return f"{self.redis_manager.prefix}daily_report_job:{day_key}"
def _daily_report_room_key(self, room_id: str, anchor_day: str) -> str:
return f"{self.redis_manager.prefix}daily_report:{room_id}:{anchor_day}"
@staticmethod
def _daily_report_cache_dir() -> str:
path = os.path.join("temp", "douyu_materials")
os.makedirs(path, exist_ok=True)
return path
def _daily_report_cache_path(self, room_id: str, anchor_day: str) -> str:
return os.path.join(
self._daily_report_cache_dir(),
f"{room_id}_{anchor_day.replace('-', '')}_daily_report_result.json",
)
def _load_daily_report_cache(self, room_id: str, anchor_day: str) -> Optional[Dict[str, Any]]:
cache_path = self._daily_report_cache_path(room_id, anchor_day)
if not os.path.exists(cache_path):
return None
try:
with open(cache_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
return data
except Exception as e:
logger.warning(f"读取斗鱼每日报告缓存失败(room={room_id}, day={anchor_day}): {e}")
return None
def _save_daily_report_cache(self, room_id: str, anchor_day: str, data: Dict[str, Any]) -> None:
cache_path = self._daily_report_cache_path(room_id, anchor_day)
try:
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"保存斗鱼每日报告缓存失败(room={room_id}, day={anchor_day}): {e}")
@staticmethod
def _resolve_existing_report_image(image_path: Optional[str]) -> Optional[str]:
path = str(image_path or "").strip()
if not path:
return None
return path if os.path.exists(path) else None
def _should_run_daily_report(self, now_dt: datetime) -> bool:
time_text = str(self._daily_report_time or "").strip()
try:
target_hour, target_minute = [int(part) for part in time_text.split(":", 1)]
except Exception:
return False
target_dt = now_dt.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
if now_dt < target_dt or now_dt > target_dt + timedelta(minutes=4, seconds=59):
return False
last_run = self.redis_manager.get_text_value(self._daily_report_job_key(now_dt.strftime("%Y-%m-%d")))
return not last_run
def _load_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]:
if not self.redis_manager:
return []
sessions = []
for session_id in self.redis_manager.list_room_session_ids(room_id, limit=30):
session = self.redis_manager.get_room_session(room_id, session_id)
if not session:
continue
if str(session.get("anchor_day") or "") != anchor_day:
continue
sessions.append(session)
sessions.sort(
key=lambda item: str(((item.get("segments") or [{}])[0]).get("start_time", "")),
)
if sessions:
return sessions[:self._daily_report_max_sessions]
inferred_sessions = self._infer_sessions_for_anchor_day(room_id, anchor_day)
if inferred_sessions:
logger.info(
f"斗鱼每日报告使用弹幕文件回推 session: room={room_id}, day={anchor_day}, "
f"count={len(inferred_sessions)}"
)
return inferred_sessions[:self._daily_report_max_sessions]
return []
def _infer_sessions_for_anchor_day(self, room_id: str, anchor_day: str) -> List[Dict[str, Any]]:
date_key = anchor_day.replace("-", "")
day_messages = DouyuDanmuSummaryHelper.load_day_messages(room_id, date_key)
if not day_messages:
return []
inferred_sessions = DouyuDanmuSummaryHelper.infer_sessions_from_messages(
room_id,
day_messages,
session_cutoff_hour=self._session_cutoff_hour,
merge_gap_hours=self._merge_gap_hours,
min_session_messages=min(50, self._daily_report_min_messages),
)
inferred_sessions = [
item for item in inferred_sessions
if str(item.get("anchor_day") or "") == anchor_day
]
if inferred_sessions:
return inferred_sessions
if len(day_messages) < self._daily_report_min_messages:
return []
ordered = sorted(day_messages, key=lambda item: item.get("timestamp") or datetime.min)
start_dt = ordered[0].get("timestamp")
end_dt = ordered[-1].get("timestamp")
if not isinstance(start_dt, datetime) or not isinstance(end_dt, datetime):
return []
return [{
"session_id": f"{room_id}_{date_key}_fallback",
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": "",
"room_name": "",
"segments": [{
"start_time": start_dt.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": end_dt.strftime("%Y-%m-%d %H:%M:%S"),
}],
"is_live": False,
"source": "fallback_full_day",
}]
def _build_audience_trend(self, sessions: List[Dict[str, Any]]) -> Dict[str, Any]:
points: List[Dict[str, Any]] = []
for session in sessions:
for item in session.get("audience_points", []) or []:
point = {
"timestamp": str(item.get("timestamp") or "").strip(),
"vip_count": int(item.get("vip_count", 0) or 0),
"diamond_count": int(item.get("diamond_count", 0) or 0),
}
if point["timestamp"]:
points.append(point)
points = self._normalize_audience_points(points, limit=1440)
if not points:
return {"points": [], "summary": {}}
vip_values = [int(item.get("vip_count", 0) or 0) for item in points]
diamond_values = [int(item.get("diamond_count", 0) or 0) for item in points]
labels = [str(item.get("timestamp") or "")[-8:-3] for item in points]
return {
"points": points,
"summary": {
"point_count": len(points),
"vip_min": min(vip_values),
"vip_max": max(vip_values),
"vip_latest": vip_values[-1],
"diamond_min": min(diamond_values),
"diamond_max": max(diamond_values),
"diamond_latest": diamond_values[-1],
"labels": labels,
},
}
def _build_daily_report_payload(self, room_id: str, anchor_day: str, sessions: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not sessions:
return None
session_payloads: List[Dict[str, Any]] = []
total_message_count = 0
total_noise_filtered_count = 0
total_organized_message_count = 0
total_unique_users: Set[str] = set()
merged_templates: List[Dict[str, Any]] = []
repeated_messages: List[Dict[str, Any]] = []
peak_buckets: List[Dict[str, Any]] = []
representative_messages: List[Dict[str, Any]] = []
raw_window_samples: List[Dict[str, Any]] = []
top_terms_counter = Counter()
burst_terms_counter = Counter()
operator_totals = {
"fans_badge_user_count": 0,
"fans_badge_message_count": 0,
"high_room_level_user_count": 0,
"high_fans_level_user_count": 0,
"noble_user_count": 0,
"noble_message_count": 0,
"active_users_5plus": 0,
"active_users_10plus": 0,
}
top_badge_counter = Counter()
top_badge_message_counter = Counter()
top_active_user_map: Dict[str, Dict[str, Any]] = {}
nickname = ""
room_name = ""
for session in sessions:
messages = DouyuDanmuSummaryHelper.load_session_messages(room_id, session)
if len(messages) < self._daily_report_min_messages:
continue
payload = DouyuDanmuSummaryHelper.build_llm_payload(room_id, session, messages)
session_payloads.append(payload)
meta = payload.get("session_meta", {}) or {}
operator_metrics = payload.get("operator_metrics", {}) or {}
total_message_count += int(meta.get("message_count", 0) or 0)
total_noise_filtered_count += int(meta.get("noise_filtered_count", 0) or 0)
total_organized_message_count += int(meta.get("organized_message_count", 0) or 0)
nickname = nickname or str(meta.get("nickname") or session.get("nickname") or "")
room_name = room_name or str(meta.get("room_name") or session.get("room_name") or "")
for key in operator_totals:
operator_totals[key] += int(operator_metrics.get(key, 0) or 0)
for item in payload.get("merged_templates", []) or []:
merged_templates.append(dict(item))
for item in payload.get("repeated_messages", []) or []:
repeated_messages.append(dict(item))
for item in payload.get("peak_buckets", []) or []:
peak_buckets.append(dict(item))
for item in payload.get("representative_messages", []) or []:
representative_messages.append(dict(item))
for item in payload.get("raw_window_samples", []) or []:
raw_window_samples.append(dict(item))
for item in payload.get("top_terms", []) or []:
term = str(item.get("term") or "").strip()
if term:
top_terms_counter[term] += int(item.get("count", 0) or 0)
for item in payload.get("burst_terms", []) or []:
term = str(item.get("text") or "").strip()
if term:
burst_terms_counter[term] += int(item.get("count", 0) or 0)
for item in operator_metrics.get("top_badges", []) or []:
badge_name = str(item.get("badge_name") or "").strip()
if badge_name:
top_badge_counter[badge_name] += int(item.get("user_count", 0) or 0)
top_badge_message_counter[badge_name] += int(item.get("message_count", 0) or 0)
for item in operator_metrics.get("top_active_users", []) or []:
uid = str(item.get("uid") or "").strip()
if not uid:
continue
existing = top_active_user_map.get(uid)
message_count = int(item.get("message_count", 0) or 0)
organized_message_count = int(item.get("organized_message_count", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
fans_level = int(item.get("fans_level", 0) or 0)
if not existing:
top_active_user_map[uid] = {
"uid": uid,
"nickname": str(item.get("nickname") or "").strip(),
"message_count": message_count,
"organized_message_count": organized_message_count,
"room_level": room_level,
"fans_name": str(item.get("fans_name") or "").strip(),
"fans_level": fans_level,
"noble_name": str(item.get("noble_name") or "").strip(),
}
continue
existing["message_count"] = int(existing.get("message_count", 0) or 0) + message_count
existing["organized_message_count"] = int(existing.get("organized_message_count", 0) or 0) + organized_message_count
if not str(existing.get("nickname") or "").strip():
existing["nickname"] = str(item.get("nickname") or "").strip()
if room_level > int(existing.get("room_level", 0) or 0):
existing["room_level"] = room_level
if fans_level > int(existing.get("fans_level", 0) or 0):
existing["fans_level"] = fans_level
if not str(existing.get("fans_name") or "").strip():
existing["fans_name"] = str(item.get("fans_name") or "").strip()
if not str(existing.get("noble_name") or "").strip():
existing["noble_name"] = str(item.get("noble_name") or "").strip()
for session_message in messages:
uid = str(session_message.get("uid") or "").strip()
if uid:
total_unique_users.add(uid)
if not session_payloads:
return None
merged_templates.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True)
repeated_messages.sort(key=lambda item: int(item.get("count", 0) or 0), reverse=True)
peak_buckets.sort(key=lambda item: int(item.get("message_count", 0) or 0), reverse=True)
artifact_dir = os.path.join("temp", "douyu_materials")
os.makedirs(artifact_dir, exist_ok=True)
audience_trend = self._build_audience_trend(sessions)
payload = {
"report_meta": {
"room_id": room_id,
"anchor_day": anchor_day,
"nickname": nickname,
"room_name": room_name,
"session_count": len(session_payloads),
"message_count": total_message_count,
"noise_filtered_count": total_noise_filtered_count,
"organized_message_count": total_organized_message_count,
"unique_user_count": len(total_unique_users),
},
"operator_metrics": {
**operator_totals,
"fans_badge_user_ratio": round(operator_totals["fans_badge_user_count"] / max(len(total_unique_users), 1), 4),
"top_badges": [
{
"badge_name": badge_name,
"user_count": user_count,
"message_count": int(top_badge_message_counter.get(badge_name, 0) or 0),
}
for badge_name, user_count in top_badge_counter.most_common(10)
],
"top_active_users": sorted(
top_active_user_map.values(),
key=lambda item: (
int(item.get("message_count", 0) or 0),
int(item.get("organized_message_count", 0) or 0),
),
reverse=True,
)[:12],
},
"sessions": [
{
"session_id": (item.get("session_meta", {}) or {}).get("session_id", ""),
"segments": (item.get("session_meta", {}) or {}).get("segments", []),
"message_count": (item.get("session_meta", {}) or {}).get("message_count", 0),
"organized_message_count": (item.get("session_meta", {}) or {}).get("organized_message_count", 0),
}
for item in session_payloads
],
"audience_trend": audience_trend,
"merged_templates": merged_templates[:24],
"repeated_messages": repeated_messages[:24],
"top_terms": [{"term": term, "count": count} for term, count in top_terms_counter.most_common(24)],
"burst_terms": [{"text": term, "count": count} for term, count in burst_terms_counter.most_common(16)],
"peak_buckets": peak_buckets[:10],
"representative_messages": representative_messages[:24],
"raw_window_samples": raw_window_samples[:10],
}
artifact_path = os.path.join(artifact_dir, f"{room_id}_{anchor_day.replace('-', '')}_daily_report_payload.json")
with open(artifact_path, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
return payload
def _build_daily_report_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
meta = payload.get("report_meta", {}) or {}
system_prompt = (
"你是斗鱼直播日报助手。请基于给定的结构化弹幕材料,输出一份适合发群的中文日报。"
"要求简洁、自然、信息密度高,不要编造,不要使用代码块。"
)
user_prompt = (
"请输出一份斗鱼每日报告,格式要求:\n"
"1. 第一行写标题,包含主播名和日期。\n"
"2. 用 3-5 条概括直播主线、弹幕情绪、观众关注点。\n"
"3. 单独补充运营视角观察,比如带牌活跃用户、高等级用户、核心发言用户、活跃牌子分布。\n"
"4. 单独列出高频梗/复读内容(不超过 5 条)。\n"
"5. 单独列出 2-3 个热点时段。\n"
"6. 整体控制在 600 字以内。\n\n"
f"材料如下:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
return system_prompt, user_prompt
def _build_danmu_summary_prompt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
meta = payload.get("report_meta", {}) or {}
system_prompt = (
"你是直播弹幕总结助手。请只根据给定材料,总结这场直播的弹幕内容与氛围。"
"不要输出运营数据,不要编造,不要写空话套话。"
)
user_prompt = (
"请输出一段适合放在日报图片上半部分的弹幕总结,要求:\n"
"1. 先用 1 段总述直播氛围与主线。\n"
"2. 再用 5 条要点总结观众关注点、情绪变化、反复出现的梗、节奏变化和额外反馈,每条只写一句。\n"
"3. 语言像运营复盘,简洁自然。\n"
"4. 不要写标题,不要写“根据数据”。\n\n"
f"主播:{meta.get('nickname') or meta.get('room_name') or meta.get('room_id')}\n"
f"日期:{meta.get('anchor_day', '')}\n"
f"材料:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
)
return system_prompt, user_prompt
def _build_fallback_daily_report(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
lines = [
f"斗鱼每日报告 | {title_name} | {meta.get('anchor_day', '')}",
f"{meta.get('session_count', 0)} 场,弹幕 {meta.get('message_count', 0)} 条,参与用户 {meta.get('unique_user_count', 0)} 人。",
]
operator_metrics = payload.get("operator_metrics", {}) or {}
sessions = payload.get("sessions", []) or []
if sessions:
session_parts = []
for item in sessions[:4]:
segments = item.get("segments", []) or []
if not segments:
continue
start_time = str(segments[0].get("start_time", ""))[-8:-3]
end_time = str(segments[-1].get("end_time", ""))[-8:-3]
session_parts.append(f"{start_time}-{end_time}")
if session_parts:
lines.append("场次时间:" + " / ".join(session_parts))
top_terms = payload.get("top_terms", []) or []
if top_terms:
lines.append("关注焦点:" + "".join([str(item.get("term") or "") for item in top_terms[:8] if str(item.get("term") or "").strip()]))
if operator_metrics:
op_parts = []
fans_badge_user_count = int(operator_metrics.get("fans_badge_user_count", 0) or 0)
high_room_level_user_count = int(operator_metrics.get("high_room_level_user_count", 0) or 0)
high_fans_level_user_count = int(operator_metrics.get("high_fans_level_user_count", 0) or 0)
active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0)
if fans_badge_user_count:
op_parts.append(f"带牌活跃用户 {fans_badge_user_count}")
if high_room_level_user_count:
op_parts.append(f"30级+活跃用户 {high_room_level_user_count}")
if high_fans_level_user_count:
op_parts.append(f"10级+粉丝牌用户 {high_fans_level_user_count}")
if active_users_10plus:
op_parts.append(f"高活跃核心用户 {active_users_10plus}")
if op_parts:
lines.append("运营侧:" + "".join(op_parts))
top_badges = operator_metrics.get("top_badges", []) or []
if top_badges:
lines.append("活跃粉丝牌:")
for item in top_badges[:5]:
badge_name = str(item.get("badge_name") or "").strip()
user_count = int(item.get("user_count", 0) or 0)
message_count = int(item.get("message_count", 0) or 0)
if badge_name:
lines.append(f"- {badge_name}{user_count}人,{message_count}")
merged_templates = payload.get("merged_templates", []) or []
if merged_templates:
lines.append("高频梗:")
for item in merged_templates[:5]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
lines.append(f"- {text[:42]}{count}次)")
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
lines.append("热点时段:")
for item in peak_buckets[:3]:
start_time = str(item.get("start_time") or "")[-8:-3]
message_count = int(item.get("message_count", 0) or 0)
terms = "".join(
[str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(f"- {start_time}{message_count}条,关键词:{terms}")
representative_messages = payload.get("representative_messages", []) or []
if representative_messages:
lines.append("代表弹幕:")
for item in representative_messages[:4]:
nickname = str(item.get("nickname") or "").strip()
content = str(item.get("content") or "").strip()
if content:
lines.append(f"- {nickname}{content[:60]}")
text = "\n".join(lines).strip()
if len(text) > self._daily_report_max_length:
text = text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)"
return text
def _build_fallback_danmu_summary(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
top_terms = [str(item.get("term") or "").strip() for item in (payload.get("top_terms", []) or [])[:6] if str(item.get("term") or "").strip()]
merged_templates = payload.get("merged_templates", []) or []
peak_buckets = payload.get("peak_buckets", []) or []
representative_messages = payload.get("representative_messages", []) or []
lines = [
f"{meta.get('anchor_day', '')} 这场直播弹幕整体比较密集,讨论重心主要围绕 {''.join(top_terms[:4]) or '对局过程'} 展开,观众互动意愿较强,梗和复读内容持续出现。"
]
if merged_templates:
sample_templates = "".join(
[str(item.get("text") or "").strip()[:26] for item in merged_templates[:3] if str(item.get("text") or "").strip()]
)
if sample_templates:
lines.append(f"- 主线观察:直播间共识梗很强,重复刷屏内容主要集中在 {sample_templates}")
for item in merged_templates[:4]:
break
if peak_buckets:
top_bucket = peak_buckets[0]
terms = "".join(
[str(term.get("term") or "") for term in (top_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- 节奏变化:高峰集中在 {str(top_bucket.get('start_time') or '')[-8:-3]} 前后,单时段弹幕 {int(top_bucket.get('message_count', 0) or 0)} 条,关键词偏向 {terms}"
)
if len(peak_buckets) > 1:
second_bucket = peak_buckets[1]
second_terms = "".join(
[str(term.get("term") or "") for term in (second_bucket.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- 热点补充:{str(second_bucket.get('start_time') or '')[-8:-3]} 也出现明显抬升,弹幕讨论继续围绕 {second_terms} 展开。"
)
if representative_messages:
lines.append("- 情绪特点:代表性发言里既有对操作和决策的即时反馈,也有大量玩梗、调侃和情绪宣泄。")
if top_terms:
lines.append(f"- 关注焦点:高频词主要落在 {''.join(top_terms[:6])},说明观众注意力相对集中。")
return "\n".join(lines).strip()
def _build_operator_summary_text(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
operator_metrics = payload.get("operator_metrics", {}) or {}
total_users = int(meta.get("unique_user_count", 0) or 0)
fans_badge_users = int(operator_metrics.get("fans_badge_user_count", 0) or 0)
high_room_users = int(operator_metrics.get("high_room_level_user_count", 0) or 0)
high_fans_users = int(operator_metrics.get("high_fans_level_user_count", 0) or 0)
active_users_5plus = int(operator_metrics.get("active_users_5plus", 0) or 0)
active_users_10plus = int(operator_metrics.get("active_users_10plus", 0) or 0)
fans_badge_ratio = float(operator_metrics.get("fans_badge_user_ratio", 0) or 0)
lines = [
f"- 活跃用户规模:{total_users} 人,其中发言 5 次以上 {active_users_5plus}10 次以上 {active_users_10plus} 人。",
f"- 粉丝粘性:带粉丝牌活跃用户 {fans_badge_users} 人,占活跃用户 {fans_badge_ratio * 100:.1f}%10 级以上粉丝牌用户 {high_fans_users} 人。",
f"- 用户质量:房间等级 30 级以上活跃用户 {high_room_users} 人,说明高等级老观众参与度不低。",
]
audience_summary = (payload.get("audience_trend", {}) or {}).get("summary", {}) or {}
if audience_summary:
vip_min = int(audience_summary.get("vip_min", 0) or 0)
vip_max = int(audience_summary.get("vip_max", 0) or 0)
diamond_latest = int(audience_summary.get("diamond_latest", 0) or 0)
point_count = int(audience_summary.get("point_count", 0) or 0)
lines.append(
f"- 人数走势WS 侧共采样 {point_count} 个时间点,贵宾在 {vip_min}-{vip_max} 区间波动,钻粉收盘约 {diamond_latest}"
)
top_badges = payload.get("operator_metrics", {}).get("top_badges", []) or []
if top_badges:
badge_parts = []
for item in top_badges[:5]:
badge_name = str(item.get("badge_name") or "").strip()
if not badge_name:
continue
badge_parts.append(f"{badge_name} {int(item.get('user_count', 0) or 0)}人/{int(item.get('message_count', 0) or 0)}")
if badge_parts:
lines.append(f"- 活跃牌子分布:{''.join(badge_parts)}")
top_active_users = payload.get("operator_metrics", {}).get("top_active_users", []) or []
if top_active_users:
core_parts = []
for item in top_active_users[:5]:
nickname = str(item.get("nickname") or item.get("uid") or "").strip()
msg_count = int(item.get("message_count", 0) or 0)
fans_name = str(item.get("fans_name") or "").strip()
fans_level = int(item.get("fans_level", 0) or 0)
room_level = int(item.get("room_level", 0) or 0)
tags = []
if fans_name:
if fans_level > 0:
tags.append(f"{fans_name} Lv{fans_level}")
else:
tags.append(fans_name)
if room_level > 0:
tags.append(f"平台 Lv{room_level}")
tags.append(f"{msg_count}")
core_parts.append(f"{nickname}{''.join(tags)}")
if core_parts:
lines.append(f"- 核心发言用户:{''.join(core_parts)}")
return "\n".join(lines).strip()
def _build_operator_summary_lines(self, payload: Dict[str, Any]) -> List[str]:
return [line.strip()[2:].strip() for line in self._build_operator_summary_text(payload).splitlines() if line.strip().startswith("- ")]
async def _generate_danmu_summary_text(self, payload: Dict[str, Any]) -> str:
if self._daily_report_use_llm and self._daily_report_llm_client:
system_prompt, user_prompt = self._build_danmu_summary_prompt(payload)
result = await asyncio.to_thread(
self._daily_report_llm_client.chat,
system_prompt,
user_prompt,
f"douyu_danmu_summary_{(payload.get('report_meta', {}) or {}).get('room_id', '')}",
)
if result:
return result.strip()
logger.warning(
f"斗鱼弹幕总结 LLM 生成失败: model={self._daily_report_llm_client.model}, "
f"last_error={self._daily_report_llm_client.last_error}"
)
return self._build_fallback_danmu_summary(payload)
async def _build_daily_report_markdown(self, payload: Dict[str, Any]) -> str:
meta = payload.get("report_meta", {}) or {}
title_name = str(meta.get("nickname") or meta.get("room_name") or meta.get("room_id") or "主播")
danmu_summary = await self._generate_danmu_summary_text(payload)
operator_summary = self._build_operator_summary_text(payload)
lines = [
f"# {title_name} 直播每日报告",
f"{meta.get('anchor_day', '')}|场次 {meta.get('session_count', 0)}|弹幕 {meta.get('message_count', 0)}|活跃用户 {meta.get('unique_user_count', 0)}",
"",
"## 弹幕总结",
danmu_summary,
"",
"## 运营数据总结",
operator_summary,
]
peak_buckets = payload.get("peak_buckets", []) or []
if peak_buckets:
lines.extend([
"",
"## 热点时段",
])
for item in peak_buckets[:3]:
terms = "".join(
[str(term.get("term") or "") for term in (item.get("top_terms", []) or [])[:4] if str(term.get("term") or "").strip()]
)
lines.append(
f"- `{str(item.get('start_time') or '')[-8:-3]}` 弹幕 {int(item.get('message_count', 0) or 0)} 条,关键词:{terms}"
)
merged_templates = payload.get("merged_templates", []) or []
if merged_templates:
lines.extend([
"",
"## 高频梗",
])
for item in merged_templates[:5]:
text = str(item.get("text") or "").strip()
count = int(item.get("count", 0) or 0)
if text:
lines.append(f"- {text[:72]}{count}次)")
return "\n".join(lines).strip()
async def _render_daily_report_image(self, payload: Dict[str, Any]) -> Optional[str]:
markdown = await self._build_daily_report_markdown(payload)
room_id = str((payload.get("report_meta", {}) or {}).get("room_id", "") or "room")
anchor_day = str((payload.get("report_meta", {}) or {}).get("anchor_day", "") or "").replace("-", "")
filename = f"douyu_daily_report_{room_id}_{anchor_day}.png"
try:
danmu_summary = await self._generate_danmu_summary_text(payload)
html_content = render_daily_report_html(
payload=payload,
danmu_summary=danmu_summary,
operator_summary_lines=self._build_operator_summary_lines(payload),
)
output_dir = os.path.join(os.getcwd(), "temp", "md2image")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
await html_to_image(html_content, output_path)
return str(Path(output_path).resolve())
except Exception as e:
logger.error(f"斗鱼专用模板图片生成失败(room={room_id}, day={anchor_day}): {e}")
try:
return await convert_md_str_to_image(markdown, filename)
except Exception as e:
logger.error(f"斗鱼每日报告图片生成失败(room={room_id}, day={anchor_day}): {e}")
return None
async def _generate_daily_report_text(self, payload: Dict[str, Any]) -> str:
if self._daily_report_use_llm and self._daily_report_llm_client:
system_prompt, user_prompt = self._build_daily_report_prompt(payload)
result = await asyncio.to_thread(
self._daily_report_llm_client.chat,
system_prompt,
user_prompt,
f"douyu_daily_report_{(payload.get('report_meta', {}) or {}).get('room_id', '')}",
)
if result:
text = result.strip()
if len(text) > self._daily_report_max_length:
return text[: self._daily_report_max_length - 20].rstrip() + "\n...(已截断)"
return text
logger.warning(
f"斗鱼每日报告 LLM 生成失败: model={self._daily_report_llm_client.model}, "
f"last_error={self._daily_report_llm_client.last_error}"
)
return self._build_fallback_daily_report(payload)
async def _get_or_create_daily_report_result(self, room_id: str, anchor_day: str, payload: Dict[str, Any]) -> Dict[str, Any]:
cached = self._load_daily_report_cache(room_id, anchor_day) or {}
cached_image = self._resolve_existing_report_image(cached.get("report_image"))
cached_text = str(cached.get("report_text") or "").strip()
cached_version = int(cached.get("cache_version", 0) or 0)
if cached_version >= self._DAILY_REPORT_CACHE_VERSION and (cached_image or cached_text):
return {
"report_text": cached_text,
"report_image": cached_image,
"cached": True,
}
report_text = await self._generate_daily_report_text(payload)
report_image = None
if self._daily_report_send_image:
report_image = await self._render_daily_report_image(payload)
result = {
"room_id": room_id,
"anchor_day": anchor_day,
"cache_version": self._DAILY_REPORT_CACHE_VERSION,
"report_text": report_text,
"report_image": report_image,
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
self._save_daily_report_cache(room_id, anchor_day, result)
result["cached"] = False
return result
async def _send_daily_reports(
self,
anchor_day: str,
target_group_id: Optional[str] = None,
force: bool = False,
) -> bool:
rooms = (
set(self.redis_manager.list_group_rooms(target_group_id))
if target_group_id
else self.redis_manager.all_subscribed_rooms()
)
if not rooms:
logger.info(
f"斗鱼每日报告无可处理房间: day={anchor_day}, target_group={target_group_id or 'ALL'}"
)
return False
delivered_any = False
for room_id in rooms:
if not force and self.redis_manager.get_text_value(self._daily_report_room_key(room_id, anchor_day)):
logger.info(f"斗鱼每日报告已发送过,跳过: room={room_id}, day={anchor_day}")
continue
sessions = self._load_sessions_for_anchor_day(room_id, anchor_day)
if not sessions:
logger.info(f"斗鱼每日报告无 session: room={room_id}, day={anchor_day}")
continue
if any(bool(session.get("is_live")) for session in sessions):
logger.info(f"斗鱼每日报告存在直播中场次,跳过: room={room_id}, day={anchor_day}")
continue
payload = self._build_daily_report_payload(room_id, anchor_day, sessions)
if not payload:
logger.info(
f"斗鱼每日报告 payload 为空: room={room_id}, day={anchor_day}, "
f"sessions={len(sessions)}, min_messages={self._daily_report_min_messages}"
)
continue
report_result = await self._get_or_create_daily_report_result(room_id, anchor_day, payload)
report_text = str(report_result.get("report_text") or "").strip()
report_image = self._resolve_existing_report_image(report_result.get("report_image"))
groups = [target_group_id] if target_group_id else self.redis_manager.groups_for_room(room_id)
delivered = False
for gid in groups:
if not gid:
continue
if GroupBotManager.get_group_permission(gid, self.feature) != PermissionStatus.ENABLED:
continue
try:
if report_image:
await self.bot.send_image_message(gid, Path(report_image))
else:
await self.bot.send_text_message(gid, report_text)
delivered = True
delivered_any = True
except Exception as e:
logger.error(f"发送斗鱼每日报告失败(room={room_id}, group={gid}): {e}")
if delivered:
self.redis_manager.set_text_value(
self._daily_report_room_key(room_id, anchor_day),
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)
return delivered_any
def _start_danmu_record(self, room_id: str):
recorder = self._get_danmu_recorder(room_id)
recorder.start()
def _stop_danmu_record(self, room_id: str):
recorder = self._danmu_recorders.get(room_id)
if recorder:
recorder.stop()