斗鱼加入了一个弹幕记录功能。会自动记录开播的弹幕信息

This commit is contained in:
liuwei
2026-02-24 16:36:16 +08:00
parent cd561bbbf9
commit eabea2b4aa
2 changed files with 328 additions and 0 deletions

View File

@@ -1,9 +1,16 @@
import asyncio
import json
from datetime import datetime
import os
import threading
import time
from typing import Dict, Any, List, Optional, Tuple, Set
import aiohttp
from loguru import logger
import ssl
import websocket
import zlib
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
@@ -16,6 +23,149 @@ from wechat_ipad import WechatAPIClient
from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML
class DouyuDanmuRecorder:
def __init__(self, room_id: str, user_agent: str):
self.room_id = room_id
self.user_agent = user_agent
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._ws: Optional[websocket.WebSocketApp] = None
def _encode(self, msg: str) -> bytes:
content = msg.encode("utf-8") + b"\x00"
length = len(content) + 8
head = length.to_bytes(4, "little") * 2
head += (689).to_bytes(2, "little")
head += b"\x00\x00"
return head + content
def _on_message(self, ws, message):
try:
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
data = decompressed.decode("utf-8", errors="ignore")
except Exception:
data = message.decode("utf-8", errors="ignore")
for line in data.split("\x00"):
line = line.strip()
if not line:
continue
if "type@=chatmsg" not in line:
continue
parts: Dict[str, Any] = {}
for pair in line.split("/"):
if "@=" in pair:
key, value = pair.split("@=", 1)
parts[key] = value
nick = parts.get("nn", "未知")
txt = parts.get("txt", "")
uid = parts.get("uid", "未知")
level = parts.get("level", "0")
fan_group = parts.get("bnn", "")
fan_level = parts.get("bl", "0")
time_stamp = parts.get("cst", "")
if time_stamp:
try:
if time_stamp.isdigit():
ts = int(time_stamp)
if ts > 10 ** 12:
ts = ts / 1000
dt = datetime.fromtimestamp(ts)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
if fan_group:
output += f" / {fan_group} Lv{fan_level}"
output += f"){txt}"
date_str = datetime.now().strftime("%Y%m%d")
dir_path = os.path.join("temp", "douyu_danmu", date_str)
os.makedirs(dir_path, exist_ok=True)
file_name = os.path.join(dir_path, f"{self.room_id}_{date_str}.txt")
with open(file_name, "a", encoding="utf-8") as f:
f.write(output + "\n")
def _on_open(self, ws):
ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/"))
ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@={self.room_id}/"))
def heartbeat():
while ws.sock and ws.sock.connected and not self._stop_event.is_set():
try:
ws.send(self._encode("type@=mrkl/"))
except Exception:
break
time.sleep(38)
threading.Thread(target=heartbeat, daemon=True).start()
def _on_error(self, ws, error):
logger.error(f"斗鱼弹幕错误({self.room_id}): {error}")
def _on_close(self, ws, code, msg):
logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}")
def _run(self):
try:
websocket.enableTrace(False)
ws_urls = [
"wss://danmuproxy.douyu.com:8501/",
"wss://danmuproxy.douyu.com:8502/",
"wss://danmuproxy.douyu.com:8503/",
"wss://danmuproxy.douyu.com:8504/",
"wss://danmuproxy.douyu.com:8505/",
"wss://danmuproxy.douyu.com:8506/",
]
sslopt = {
"cert_reqs": ssl.CERT_NONE,
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
"ciphers": "DEFAULT@SECLEVEL=1",
}
headers = {"User-Agent": self.user_agent}
for url in ws_urls:
if self._stop_event.is_set():
break
try:
self._ws = websocket.WebSocketApp(
url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
header=headers,
)
self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
except Exception as e:
logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}")
continue
finally:
self._ws = None
if self._stop_event.is_set():
break
time.sleep(1)
finally:
self._ws = None
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
class DouyuRedisManager:
def __init__(self, db_manager: DBConnectionManager):
self.redis = db_manager.get_redis_connection()
@@ -178,6 +328,7 @@ class DouyuPlugin(MessagePluginInterface):
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {}
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
async def _scheduled_unified_check_job(self):
@@ -206,6 +357,8 @@ class DouyuPlugin(MessagePluginInterface):
return True
def stop(self) -> bool:
for recorder in getattr(self, "_danmu_recorders", {}).values():
recorder.stop()
self.status = PluginStatus.STOPPED
return True
@@ -359,6 +512,7 @@ class DouyuPlugin(MessagePluginInterface):
logger.error(f"斗鱼定时任务异常: {e}")
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str):
self._start_danmu_record(room_id)
groups = self.redis_manager.groups_for_room(room_id)
text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看https://www.douyu.com/{room_id}"
xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url)
@@ -376,6 +530,7 @@ class DouyuPlugin(MessagePluginInterface):
continue
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False):
self._stop_danmu_record(room_id)
groups = self.redis_manager.groups_for_room(room_id)
text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}"
if is_loop:
@@ -466,3 +621,19 @@ class DouyuPlugin(MessagePluginInterface):
except Exception as e:
logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}")
continue
def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder:
recorder = self._danmu_recorders.get(room_id)
if not recorder:
recorder = DouyuDanmuRecorder(room_id, self._user_agent)
self._danmu_recorders[room_id] = recorder
return recorder
def _start_danmu_record(self, room_id: str):
recorder = self._get_danmu_recorder(room_id)
recorder.start()
def _stop_danmu_record(self, room_id: str):
recorder = self._danmu_recorders.get(room_id)
if recorder:
recorder.stop()

157
test/douyu_danmu.py Normal file
View File

@@ -0,0 +1,157 @@
from datetime import datetime
import websocket
import threading
import time
import zlib
import ssl # 新增
# --------------------- 配置 ---------------------
room_id = "52876" # 你的房间号,确认在开播
ws_urls = [
"wss://danmuproxy.douyu.com:8501/",
"wss://danmuproxy.douyu.com:8502/",
"wss://danmuproxy.douyu.com:8503/",
"wss://danmuproxy.douyu.com:8504/",
"wss://danmuproxy.douyu.com:8505/",
"wss://danmuproxy.douyu.com:8506/",
]
# --------------------------------------------------
def encode_douyu(msg: str) -> bytes:
content = msg.encode('utf-8') + b'\x00'
length = len(content) + 8
head = length.to_bytes(4, 'little') * 2
head += (689).to_bytes(2, 'little')
head += b'\x00\x00'
return head + content
def on_message(ws, message):
try:
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
data = decompressed.decode('utf-8', errors='ignore')
except:
data = message.decode('utf-8', errors='ignore')
for line in data.split('\x00'):
line = line.strip()
if not line:
continue
# 打印原始消息(调试用)
# print(f"原始消息: {line}")
if 'type@=chatmsg' in line:
parts = {}
for pair in line.split('/'):
if '@=' in pair:
key, value = pair.split('@=', 1)
parts[key] = value
msg_rid = parts.get('rid', '0')
msg_brid = parts.get('brid', '0')
# print(f" rid={msg_rid} brid={msg_brid} {line[:100]}...")
# 提取关键字段
nick = parts.get('nn', '未知')
txt = parts.get('txt', '')
uid = parts.get('uid', '未知')
level = parts.get('level', '0')
fan_group = parts.get('bnn', '')
fan_level = parts.get('bl', '0')
color_hash = parts.get('hc', '')
time_stamp = parts.get('cst', '')
avatar = parts.get('ic', '')
# 处理时间
time_stamp = parts.get('cst', '')
if time_stamp:
try:
# 如果是纯数字(毫秒时间戳)
if time_stamp.isdigit():
ts = int(time_stamp)
# 判断是毫秒还是秒
if ts > 10 ** 12:
ts = ts / 1000
dt = datetime.fromtimestamp(ts)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
# 字符串格式时间
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
else:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 格式化输出
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
if fan_group:
output += f" / {fan_group} Lv{fan_level}"
output += f"){txt}"
# if color_hash:
# output += f" [彩虹弹幕]"
# if avatar:
# output += f" [头像: {avatar}]"
file_name = f"{room_id}_{datetime.now().strftime('%Y%m%d')}.txt"
with open(file_name, 'a', encoding='utf-8') as f:
f.write(output + '\n')
print(output)
def on_open(ws):
print("连接成功!发送登录和入组...")
ws.send(encode_douyu(f"type@=loginreq/roomid@={room_id}/dmbt@=chrome/dmbv@=0/"))
ws.send(encode_douyu(f"type@=joingroup/rid@={room_id}/gid@={room_id}/"))
def heartbeat():
while ws.sock and ws.sock.connected:
ws.send(encode_douyu("type@=mrkl/"))
# print("心跳发送...")
time.sleep(38)
threading.Thread(target=heartbeat, daemon=True).start()
def on_error(ws, error):
print(f"错误: {error}")
def on_close(ws, code, msg):
print(f"连接关闭: {code} {msg}")
# 主程序 - 强制TLS 1.2/1.3兼容 + 无代理
websocket.enableTrace(False) # 开启详细日志,便于调试
for url in ws_urls:
print(f"\n尝试连接: {url}")
try:
ws = websocket.WebSocketApp(
url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
header={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
)
# 关键自定义SSL上下文禁用旧协议允许更多套件
sslopt = {
"cert_reqs": ssl.CERT_NONE, # 临时跳过证书验证(生产慎用)
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
"ciphers": "DEFAULT@SECLEVEL=1" # 降低安全级别兼容旧套件(如果必要)
}
ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
except Exception as e:
print(f"连接失败: {e}")
continue