from datetime import datetime import websocket import threading import time import zlib import ssl # 新增 # --------------------- 配置 --------------------- room_id = "52876" # 你的房间号,确认在开播 ws_urls = [ "wss://danmuproxy.douyu.com:8501/", "wss://danmuproxy.douyu.com:8502/", "wss://danmuproxy.douyu.com:8503/", "wss://danmuproxy.douyu.com:8504/", "wss://danmuproxy.douyu.com:8505/", "wss://danmuproxy.douyu.com:8506/", ] # -------------------------------------------------- def encode_douyu(msg: str) -> bytes: content = msg.encode('utf-8') + b'\x00' length = len(content) + 8 head = length.to_bytes(4, 'little') * 2 head += (689).to_bytes(2, 'little') head += b'\x00\x00' return head + content def on_message(ws, message): try: decompressed = zlib.decompress(message, -zlib.MAX_WBITS) data = decompressed.decode('utf-8', errors='ignore') except: data = message.decode('utf-8', errors='ignore') for line in data.split('\x00'): line = line.strip() if not line: continue # 打印原始消息(调试用) # print(f"原始消息: {line}") if 'type@=chatmsg' in line: parts = {} for pair in line.split('/'): if '@=' in pair: key, value = pair.split('@=', 1) parts[key] = value msg_rid = parts.get('rid', '0') msg_brid = parts.get('brid', '0') # print(f" rid={msg_rid} brid={msg_brid} {line[:100]}...") # 提取关键字段 nick = parts.get('nn', '未知') txt = parts.get('txt', '') uid = parts.get('uid', '未知') level = parts.get('level', '0') fan_group = parts.get('bnn', '') fan_level = parts.get('bl', '0') color_hash = parts.get('hc', '') time_stamp = parts.get('cst', '') avatar = parts.get('ic', '') # 处理时间 time_stamp = parts.get('cst', '') if time_stamp: try: # 如果是纯数字(毫秒时间戳) if time_stamp.isdigit(): ts = int(time_stamp) # 判断是毫秒还是秒 if ts > 10 ** 12: ts = ts / 1000 dt = datetime.fromtimestamp(ts) time_str = dt.strftime("%Y-%m-%d %H:%M:%S") else: # 字符串格式时间 dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") time_str = dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) else: time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 格式化输出 output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}" if fan_group: output += f" / {fan_group} Lv{fan_level}" output += f"):{txt}" # if color_hash: # output += f" [彩虹弹幕]" # if avatar: # output += f" [头像: {avatar}]" file_name = f"{room_id}_{datetime.now().strftime('%Y%m%d')}.txt" with open(file_name, 'a', encoding='utf-8') as f: f.write(output + '\n') print(output) def on_open(ws): print("连接成功!发送登录和入组...") ws.send(encode_douyu(f"type@=loginreq/roomid@={room_id}/dmbt@=chrome/dmbv@=0/")) ws.send(encode_douyu(f"type@=joingroup/rid@={room_id}/gid@={room_id}/")) def heartbeat(): while ws.sock and ws.sock.connected: ws.send(encode_douyu("type@=mrkl/")) # print("心跳发送...") time.sleep(38) threading.Thread(target=heartbeat, daemon=True).start() def on_error(ws, error): print(f"错误: {error}") def on_close(ws, code, msg): print(f"连接关闭: {code} {msg}") # 主程序 - 强制TLS 1.2/1.3兼容 + 无代理 websocket.enableTrace(False) # 开启详细日志,便于调试 for url in ws_urls: print(f"\n尝试连接: {url}") try: ws = websocket.WebSocketApp( url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, header={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} ) # 关键:自定义SSL上下文,禁用旧协议,允许更多套件 sslopt = { "cert_reqs": ssl.CERT_NONE, # 临时跳过证书验证(生产慎用) "ssl_version": ssl.PROTOCOL_TLS_CLIENT, "ciphers": "DEFAULT@SECLEVEL=1" # 降低安全级别兼容旧套件(如果必要) } ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) except Exception as e: print(f"连接失败: {e}") continue