Files
abot/test/douyu_danmu.py

158 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
import websocket
import threading
import time
import zlib
import ssl # 新增
# --------------------- 配置 ---------------------
room_id = "52876" # 你的房间号,确认在开播
ws_urls = [
"wss://danmuproxy.douyu.com:8501/",
"wss://danmuproxy.douyu.com:8502/",
"wss://danmuproxy.douyu.com:8503/",
"wss://danmuproxy.douyu.com:8504/",
"wss://danmuproxy.douyu.com:8505/",
"wss://danmuproxy.douyu.com:8506/",
]
# --------------------------------------------------
def encode_douyu(msg: str) -> bytes:
content = msg.encode('utf-8') + b'\x00'
length = len(content) + 8
head = length.to_bytes(4, 'little') * 2
head += (689).to_bytes(2, 'little')
head += b'\x00\x00'
return head + content
def on_message(ws, message):
try:
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
data = decompressed.decode('utf-8', errors='ignore')
except:
data = message.decode('utf-8', errors='ignore')
for line in data.split('\x00'):
line = line.strip()
if not line:
continue
# 打印原始消息(调试用)
# print(f"原始消息: {line}")
if 'type@=chatmsg' in line:
parts = {}
for pair in line.split('/'):
if '@=' in pair:
key, value = pair.split('@=', 1)
parts[key] = value
msg_rid = parts.get('rid', '0')
msg_brid = parts.get('brid', '0')
# print(f" rid={msg_rid} brid={msg_brid} {line[:100]}...")
# 提取关键字段
nick = parts.get('nn', '未知')
txt = parts.get('txt', '')
uid = parts.get('uid', '未知')
level = parts.get('level', '0')
fan_group = parts.get('bnn', '')
fan_level = parts.get('bl', '0')
color_hash = parts.get('hc', '')
time_stamp = parts.get('cst', '')
avatar = parts.get('ic', '')
# 处理时间
time_stamp = parts.get('cst', '')
if time_stamp:
try:
# 如果是纯数字(毫秒时间戳)
if time_stamp.isdigit():
ts = int(time_stamp)
# 判断是毫秒还是秒
if ts > 10 ** 12:
ts = ts / 1000
dt = datetime.fromtimestamp(ts)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
# 字符串格式时间
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
else:
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 格式化输出
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
if fan_group:
output += f" / {fan_group} Lv{fan_level}"
output += f"){txt}"
# if color_hash:
# output += f" [彩虹弹幕]"
# if avatar:
# output += f" [头像: {avatar}]"
file_name = f"{room_id}_{datetime.now().strftime('%Y%m%d')}.txt"
with open(file_name, 'a', encoding='utf-8') as f:
f.write(output + '\n')
print(output)
def on_open(ws):
print("连接成功!发送登录和入组...")
ws.send(encode_douyu(f"type@=loginreq/roomid@={room_id}/dmbt@=chrome/dmbv@=0/"))
ws.send(encode_douyu(f"type@=joingroup/rid@={room_id}/gid@={room_id}/"))
def heartbeat():
while ws.sock and ws.sock.connected:
ws.send(encode_douyu("type@=mrkl/"))
# print("心跳发送...")
time.sleep(38)
threading.Thread(target=heartbeat, daemon=True).start()
def on_error(ws, error):
print(f"错误: {error}")
def on_close(ws, code, msg):
print(f"连接关闭: {code} {msg}")
# 主程序 - 强制TLS 1.2/1.3兼容 + 无代理
websocket.enableTrace(False) # 开启详细日志,便于调试
for url in ws_urls:
print(f"\n尝试连接: {url}")
try:
ws = websocket.WebSocketApp(
url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
header={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
)
# 关键自定义SSL上下文禁用旧协议允许更多套件
sslopt = {
"cert_reqs": ssl.CERT_NONE, # 临时跳过证书验证(生产慎用)
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
"ciphers": "DEFAULT@SECLEVEL=1" # 降低安全级别兼容旧套件(如果必要)
}
ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
except Exception as e:
print(f"连接失败: {e}")
continue