Files
abot/plugins/douyu/main.py

682 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from datetime import datetime
import os
import threading
import time
from typing import Dict, Any, List, Optional, Tuple, Set
import aiohttp
from loguru import logger
import ssl
import zlib
try:
import websocket
except ImportError:
websocket = None
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML
class DouyuDanmuRecorder:
def __init__(self, room_id: str, user_agent: str):
self.room_id = room_id
self.user_agent = user_agent
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._ws: Optional[websocket.WebSocketApp] = None
self._buffer: List[str] = []
self._buffer_limit = 50
self._buffer_date: Optional[str] = None
self._lock = threading.Lock()
self._websocket_available = websocket is not None
def _encode(self, msg: str) -> bytes:
content = msg.encode("utf-8") + b"\x00"
length = len(content) + 8
head = length.to_bytes(4, "little") * 2
head += (689).to_bytes(2, "little")
head += b"\x00\x00"
return head + content
def _on_message(self, ws, message):
try:
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
data = decompressed.decode("utf-8", errors="ignore")
except Exception:
data = message.decode("utf-8", errors="ignore")
for line in data.split("\x00"):
line = line.strip()
if not line:
continue
if "type@=chatmsg" not in line:
continue
parts: Dict[str, Any] = {}
for pair in line.split("/"):
if "@=" in pair:
key, value = pair.split("@=", 1)
parts[key] = value
nick = parts.get("nn", "未知")
txt = parts.get("txt", "")
uid = parts.get("uid", "未知")
level = parts.get("level", "0")
fan_group = parts.get("bnn", "")
fan_level = parts.get("bl", "0")
time_stamp = parts.get("cst", "")
if time_stamp:
try:
if time_stamp.isdigit():
ts = int(time_stamp)
if ts > 10 ** 12:
ts = ts / 1000
dt = datetime.fromtimestamp(ts)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
if fan_group:
output += f" / {fan_group} Lv{fan_level}"
output += f"){txt}"
self._append_and_maybe_flush(output)
def _flush_locked(self):
if not self._buffer or self._buffer_date is None:
return
dir_path = os.path.join("temp", "douyu_danmu", self._buffer_date)
os.makedirs(dir_path, exist_ok=True)
file_name = os.path.join(dir_path, f"{self.room_id}_{self._buffer_date}.txt")
data = "\n".join(self._buffer) + "\n"
with open(file_name, "a", encoding="utf-8") as f:
f.write(data)
self._buffer.clear()
def _append_and_maybe_flush(self, line: str):
now = datetime.now()
date_str = now.strftime("%Y%m%d")
with self._lock:
if self._buffer_date is None:
self._buffer_date = date_str
elif date_str != self._buffer_date:
self._flush_locked()
self._buffer_date = date_str
self._buffer.append(line)
if len(self._buffer) >= self._buffer_limit:
self._flush_locked()
def _flush(self):
with self._lock:
self._flush_locked()
def _on_open(self, ws):
ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/"))
ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@={self.room_id}/"))
def heartbeat():
while ws.sock and ws.sock.connected and not self._stop_event.is_set():
try:
ws.send(self._encode("type@=mrkl/"))
except Exception:
break
time.sleep(38)
threading.Thread(target=heartbeat, daemon=True).start()
def _on_error(self, ws, error):
logger.error(f"斗鱼弹幕错误({self.room_id}): {error}")
def _on_close(self, ws, code, msg):
logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}")
def _run(self):
if not self._websocket_available:
logger.error(f"websocket-client 未安装,无法记录弹幕({self.room_id})")
return
try:
websocket.enableTrace(False)
ws_urls = [
"wss://danmuproxy.douyu.com:8501/",
"wss://danmuproxy.douyu.com:8502/",
"wss://danmuproxy.douyu.com:8503/",
"wss://danmuproxy.douyu.com:8504/",
"wss://danmuproxy.douyu.com:8505/",
"wss://danmuproxy.douyu.com:8506/",
]
sslopt = {
"cert_reqs": ssl.CERT_NONE,
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
"ciphers": "DEFAULT@SECLEVEL=1",
}
headers = {"User-Agent": self.user_agent}
for url in ws_urls:
if self._stop_event.is_set():
break
try:
self._ws = websocket.WebSocketApp(
url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
header=headers,
)
self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
except Exception as e:
logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}")
continue
finally:
self._ws = None
if self._stop_event.is_set():
break
time.sleep(1)
finally:
self._ws = None
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._flush()
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
class DouyuRedisManager:
def __init__(self, db_manager: DBConnectionManager):
self.redis = db_manager.get_redis_connection()
self.prefix = "bot:douyu:"
def add_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.sadd(key, room_id) >= 0
def remove_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.srem(key, room_id) >= 0
def list_group_rooms(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:rooms"
rooms = self.redis.smembers(key) or set()
result = []
for r in rooms:
result.append(r.decode("utf-8") if isinstance(r, bytes) else r)
return sorted(result)
def all_subscribed_rooms(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
rooms: Set[str] = set()
for gid in groups:
for r in self.list_group_rooms(gid):
rooms.add(r)
return rooms
def groups_for_room(self, room_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if room_id in set(self.list_group_rooms(gid)):
res.append(gid)
return res
# --- 鱼吧相关方法 ---
def add_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.sadd(key, hash_id) >= 0
def remove_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.srem(key, hash_id) >= 0
def list_group_yubas(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:yubas"
yubas = self.redis.smembers(key) or set()
result = []
for y in yubas:
result.append(y.decode("utf-8") if isinstance(y, bytes) else y)
return sorted(result)
def all_subscribed_yubas(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
yubas: Set[str] = set()
for gid in groups:
for y in self.list_group_yubas(gid):
yubas.add(y)
return yubas
def groups_for_yuba(self, hash_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if hash_id in set(self.list_group_yubas(gid)):
res.append(gid)
return res
def get_yuba_last_id(self, hash_id: str) -> Optional[str]:
key = f"{self.prefix}yuba_last_id:{hash_id}"
data = self.redis.get(key)
if not data:
return None
return data.decode("utf-8") if isinstance(data, bytes) else data
def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool:
key = f"{self.prefix}yuba_last_id:{hash_id}"
return self.redis.set(key, feed_id)
# --- 提醒名单方法 ---
def add_group_subscriber(self, group_id: str, user_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:subscribers"
return self.redis.sadd(key, user_id) >= 0
def remove_group_subscriber(self, group_id: str, user_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:subscribers"
return self.redis.srem(key, user_id) >= 0
def list_group_subscribers(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:subscribers"
subs = self.redis.smembers(key) or set()
result = []
for s in subs:
result.append(s.decode("utf-8") if isinstance(s, bytes) else s)
return sorted(result)
def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room_status:{room_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool:
key = f"{self.prefix}room_status:{room_id}"
return self.redis.set(key, json.dumps(status, ensure_ascii=False))
class DouyuPlugin(MessagePluginInterface):
FEATURE_KEY = "DOUYU_MONITOR"
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
@property
def name(self) -> str:
return "斗鱼直播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "斗鱼主播开播下播提醒与群订阅管理"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
self.redis_manager: Optional[DouyuRedisManager] = None
self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒",
"订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表"]
self._api_template = "https://www.douyu.com/betard/{room_id}"
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {}
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
async def _scheduled_unified_check_job(self):
"""统一检查直播和鱼吧动态"""
await self._scheduled_check_job()
await self._scheduled_yuba_check_job()
def initialize(self, context: Dict[str, Any]) -> bool:
try:
dbm = DBConnectionManager.get_instance()
self.redis_manager = DouyuRedisManager(dbm)
cfg = self._config.get("Douyu", {})
cfg_cmds = cfg.get("command", [])
if isinstance(cfg_cmds, list) and cfg_cmds:
self._commands = list(dict.fromkeys(cfg_cmds + self._commands))
self._api_template = cfg.get("api_url_template", self._api_template)
self._user_agent = cfg.get("user_agent", self._user_agent)
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
return True
except Exception as e:
logger.error(f"{self.name} 初始化失败: {e}")
return False
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
for recorder in getattr(self, "_danmu_recorders", {}).values():
recorder.stop()
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
content = str(message.get("content", "")).strip()
if not content:
return False
first_token = content.split()[0]
return first_token in self._commands
@plugin_stats_decorator(plugin_name="斗鱼直播")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
first_token = content.split()[0]
if content == "斗鱼订阅列表":
rooms = self.redis_manager.list_group_rooms(roomid or sender)
if not rooms:
await self.bot.send_text_message(roomid or sender, "暂无订阅", sender)
return True, "暂无订阅"
text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
if first_token == "斗鱼订阅提醒":
if not roomid:
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
return True, "仅支持群聊"
ok = self.redis_manager.add_group_subscriber(roomid, sender)
await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender])
return True, "加入提醒名单成功" if ok else "加入提醒名单失败"
if first_token == "取消斗鱼订阅提醒":
if not roomid:
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
return True, "仅支持群聊"
ok = self.redis_manager.remove_group_subscriber(roomid, sender)
await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender])
return True, "取消提醒成功" if ok else "取消提醒失败"
if first_token == "斗鱼订阅":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
if not room_id.isdigit():
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender)
return True, "命令格式错误"
ok = self.redis_manager.add_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if first_token == "取消斗鱼订阅":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
if not room_id.isdigit():
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender)
return True, "命令格式错误"
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
return True, "取消成功" if ok else "取消失败"
if content == "鱼吧订阅列表":
yubas = self.redis_manager.list_group_yubas(roomid or sender)
if not yubas:
await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender)
return True, "暂无鱼吧订阅"
text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
if first_token == "订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如订阅鱼吧 PDAP2zEk3nwx", sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if first_token == "取消订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如取消订阅鱼吧 PDAP2zEk3nwx", sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender)
return True, "取消成功" if ok else "取消失败"
return False, None
async def _scheduled_check_job(self):
try:
rooms = self.redis_manager.all_subscribed_rooms()
if not rooms:
return
async with aiohttp.ClientSession() as session:
for room_id in rooms:
try:
url = self._api_template.format(room_id=room_id)
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://www.douyu.com/{room_id}"
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
room_info = data.get("room", {}) if isinstance(data, dict) else {}
show_status = room_info.get("show_status")
nickname = room_info.get("nickname", "")
room_name = room_info.get("room_name", "")
avatar = room_info.get("avatar", {}) or {}
thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip()
video_loop_raw = room_info.get("videoLoop", 0)
try:
video_loop = int(str(video_loop_raw))
except Exception:
video_loop = 0
prev = self.redis_manager.get_room_status(room_id) or {}
prev_live = prev.get("is_live")
curr_live = True if show_status == 1 and video_loop == 0 else False
status_obj = {
"is_live": curr_live,
"nickname": nickname,
"room_name": room_name,
"is_loop": True if video_loop == 1 else False
}
self.redis_manager.set_room_status(room_id, status_obj)
if prev_live is None and curr_live is False:
continue
if prev_live is None and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
continue
if prev_live is False and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
continue
if prev_live is True and curr_live is False:
await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1)
continue
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"斗鱼检查失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼定时任务异常: {e}")
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str):
try:
self._start_danmu_record(room_id)
except Exception as e:
logger.error(f"启动斗鱼弹幕记录失败({room_id}): {e}")
groups = self.redis_manager.groups_for_room(room_id)
text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看https://www.douyu.com/{room_id}"
xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url)
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
subs = self.redis_manager.list_group_subscribers(gid)
if subs:
await self.bot.send_at_message(gid, text, subs)
else:
await self.bot.send_text_message(gid, text)
await self.bot.send_link_xml_message(xml_content, gid)
except Exception as e:
logger.error(f"发送斗鱼开播提醒失败: {e}")
continue
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False):
try:
self._stop_danmu_record(room_id)
except Exception as e:
logger.error(f"停止斗鱼弹幕记录失败({room_id}): {e}")
groups = self.redis_manager.groups_for_room(room_id)
text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}"
if is_loop:
text += "(当前为轮播)"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue
async def _scheduled_yuba_check_job(self):
try:
yubas = self.redis_manager.all_subscribed_yubas()
if not yubas:
return
async with aiohttp.ClientSession() as session:
for hash_id in yubas:
try:
params = {
"filter_type": 1,
"hash_id": hash_id,
"limit": 10,
"offset": 0
}
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news",
}
async with session.get(self._yuba_api, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
if data.get("error") != 0:
logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}")
continue
feed_list = data.get("data", {}).get("feed_list", [])
# 查找第一条【非置顶】动态
target_feed = None
for feed in feed_list:
if feed.get("home_feed_top") == 1:
continue
target_feed = feed
break
if not target_feed:
continue
feed_id = str(target_feed.get("feed_id"))
last_id = self.redis_manager.get_yuba_last_id(hash_id)
if last_id and feed_id == last_id:
continue
# 发现新动态
nickname = target_feed.get("publisher", {}).get("nickname", "未知主播")
content = target_feed.get("text", "")
ctime = target_feed.get("ctime")
from datetime import datetime
publish_time = datetime.fromtimestamp(int(ctime)).strftime('%Y-%m-%d %H:%M:%S') if ctime else "未知时间"
# 限制内容长度
if len(content) > 200:
content = content[:200] + "..."
full_url = f"https://yuba.douyu.com/feed/{feed_id}"
await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time)
# 保存标记
self.redis_manager.set_yuba_last_id(hash_id, feed_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼鱼吧定时任务异常: {e}")
async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str, publish_time: str = "未知时间"):
groups = self.redis_manager.groups_for_yuba(hash_id)
text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}")
continue
def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder:
recorder = self._danmu_recorders.get(room_id)
if not recorder:
recorder = DouyuDanmuRecorder(room_id, self._user_agent)
self._danmu_recorders[room_id] = recorder
return recorder
def _start_danmu_record(self, room_id: str):
recorder = self._get_danmu_recorder(room_id)
recorder.start()
def _stop_danmu_record(self, room_id: str):
recorder = self._danmu_recorders.get(room_id)
if recorder:
recorder.stop()