diff --git a/plugins/douyu/main.py b/plugins/douyu/main.py index ed8e965..78c982f 100644 --- a/plugins/douyu/main.py +++ b/plugins/douyu/main.py @@ -1,9 +1,16 @@ import asyncio import json +from datetime import datetime +import os +import threading +import time from typing import Dict, Any, List, Optional, Tuple, Set import aiohttp from loguru import logger +import ssl +import websocket +import zlib from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus @@ -16,6 +23,149 @@ from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML +class DouyuDanmuRecorder: + def __init__(self, room_id: str, user_agent: str): + self.room_id = room_id + self.user_agent = user_agent + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._ws: Optional[websocket.WebSocketApp] = None + + def _encode(self, msg: str) -> bytes: + content = msg.encode("utf-8") + b"\x00" + length = len(content) + 8 + head = length.to_bytes(4, "little") * 2 + head += (689).to_bytes(2, "little") + head += b"\x00\x00" + return head + content + + def _on_message(self, ws, message): + try: + decompressed = zlib.decompress(message, -zlib.MAX_WBITS) + data = decompressed.decode("utf-8", errors="ignore") + except Exception: + data = message.decode("utf-8", errors="ignore") + for line in data.split("\x00"): + line = line.strip() + if not line: + continue + if "type@=chatmsg" not in line: + continue + parts: Dict[str, Any] = {} + for pair in line.split("/"): + if "@=" in pair: + key, value = pair.split("@=", 1) + parts[key] = value + nick = parts.get("nn", "未知") + txt = parts.get("txt", "") + uid = parts.get("uid", "未知") + level = parts.get("level", "0") + fan_group = parts.get("bnn", "") + fan_level = parts.get("bl", "0") + time_stamp = parts.get("cst", "") + if time_stamp: + try: + if time_stamp.isdigit(): + ts = int(time_stamp) + if ts > 10 ** 12: + ts = ts / 1000 + dt = datetime.fromtimestamp(ts) + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + else: + dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception: + time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + else: + time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}" + if fan_group: + output += f" / {fan_group} Lv{fan_level}" + output += f"):{txt}" + date_str = datetime.now().strftime("%Y%m%d") + dir_path = os.path.join("temp", "douyu_danmu", date_str) + os.makedirs(dir_path, exist_ok=True) + file_name = os.path.join(dir_path, f"{self.room_id}_{date_str}.txt") + with open(file_name, "a", encoding="utf-8") as f: + f.write(output + "\n") + + def _on_open(self, ws): + ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/")) + ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@={self.room_id}/")) + + def heartbeat(): + while ws.sock and ws.sock.connected and not self._stop_event.is_set(): + try: + ws.send(self._encode("type@=mrkl/")) + except Exception: + break + time.sleep(38) + + threading.Thread(target=heartbeat, daemon=True).start() + + def _on_error(self, ws, error): + logger.error(f"斗鱼弹幕错误({self.room_id}): {error}") + + def _on_close(self, ws, code, msg): + logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}") + + def _run(self): + try: + websocket.enableTrace(False) + ws_urls = [ + "wss://danmuproxy.douyu.com:8501/", + "wss://danmuproxy.douyu.com:8502/", + "wss://danmuproxy.douyu.com:8503/", + "wss://danmuproxy.douyu.com:8504/", + "wss://danmuproxy.douyu.com:8505/", + "wss://danmuproxy.douyu.com:8506/", + ] + sslopt = { + "cert_reqs": ssl.CERT_NONE, + "ssl_version": ssl.PROTOCOL_TLS_CLIENT, + "ciphers": "DEFAULT@SECLEVEL=1", + } + headers = {"User-Agent": self.user_agent} + for url in ws_urls: + if self._stop_event.is_set(): + break + try: + self._ws = websocket.WebSocketApp( + url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close, + header=headers, + ) + self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) + except Exception as e: + logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}") + continue + finally: + self._ws = None + if self._stop_event.is_set(): + break + time.sleep(1) + finally: + self._ws = None + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def stop(self): + self._stop_event.set() + if self._ws: + try: + self._ws.close() + except Exception: + pass + + class DouyuRedisManager: def __init__(self, db_manager: DBConnectionManager): self.redis = db_manager.get_redis_connection() @@ -178,6 +328,7 @@ class DouyuPlugin(MessagePluginInterface): self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList" self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" self._check_interval = 5 + self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {} async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) async def _scheduled_unified_check_job(self): @@ -206,6 +357,8 @@ class DouyuPlugin(MessagePluginInterface): return True def stop(self) -> bool: + for recorder in getattr(self, "_danmu_recorders", {}).values(): + recorder.stop() self.status = PluginStatus.STOPPED return True @@ -359,6 +512,7 @@ class DouyuPlugin(MessagePluginInterface): logger.error(f"斗鱼定时任务异常: {e}") async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str): + self._start_danmu_record(room_id) groups = self.redis_manager.groups_for_room(room_id) text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看:https://www.douyu.com/{room_id}" xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url) @@ -376,6 +530,7 @@ class DouyuPlugin(MessagePluginInterface): continue async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False): + self._stop_danmu_record(room_id) groups = self.redis_manager.groups_for_room(room_id) text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}" if is_loop: @@ -466,3 +621,19 @@ class DouyuPlugin(MessagePluginInterface): except Exception as e: logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}") continue + + def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder: + recorder = self._danmu_recorders.get(room_id) + if not recorder: + recorder = DouyuDanmuRecorder(room_id, self._user_agent) + self._danmu_recorders[room_id] = recorder + return recorder + + def _start_danmu_record(self, room_id: str): + recorder = self._get_danmu_recorder(room_id) + recorder.start() + + def _stop_danmu_record(self, room_id: str): + recorder = self._danmu_recorders.get(room_id) + if recorder: + recorder.stop() diff --git a/test/douyu_danmu.py b/test/douyu_danmu.py new file mode 100644 index 0000000..ae9f6d9 --- /dev/null +++ b/test/douyu_danmu.py @@ -0,0 +1,157 @@ +from datetime import datetime + +import websocket +import threading +import time +import zlib +import ssl # 新增 + +# --------------------- 配置 --------------------- +room_id = "52876" # 你的房间号,确认在开播 +ws_urls = [ + "wss://danmuproxy.douyu.com:8501/", + "wss://danmuproxy.douyu.com:8502/", + "wss://danmuproxy.douyu.com:8503/", + "wss://danmuproxy.douyu.com:8504/", + "wss://danmuproxy.douyu.com:8505/", + "wss://danmuproxy.douyu.com:8506/", +] + + +# -------------------------------------------------- + +def encode_douyu(msg: str) -> bytes: + content = msg.encode('utf-8') + b'\x00' + length = len(content) + 8 + head = length.to_bytes(4, 'little') * 2 + head += (689).to_bytes(2, 'little') + head += b'\x00\x00' + return head + content + + +def on_message(ws, message): + try: + decompressed = zlib.decompress(message, -zlib.MAX_WBITS) + data = decompressed.decode('utf-8', errors='ignore') + except: + data = message.decode('utf-8', errors='ignore') + + for line in data.split('\x00'): + line = line.strip() + if not line: + continue + + # 打印原始消息(调试用) + # print(f"原始消息: {line}") + + if 'type@=chatmsg' in line: + parts = {} + for pair in line.split('/'): + if '@=' in pair: + key, value = pair.split('@=', 1) + parts[key] = value + msg_rid = parts.get('rid', '0') + msg_brid = parts.get('brid', '0') + + # print(f" rid={msg_rid} brid={msg_brid} {line[:100]}...") + + # 提取关键字段 + nick = parts.get('nn', '未知') + txt = parts.get('txt', '') + uid = parts.get('uid', '未知') + level = parts.get('level', '0') + fan_group = parts.get('bnn', '') + fan_level = parts.get('bl', '0') + color_hash = parts.get('hc', '') + time_stamp = parts.get('cst', '') + avatar = parts.get('ic', '') + # 处理时间 + time_stamp = parts.get('cst', '') + + if time_stamp: + try: + # 如果是纯数字(毫秒时间戳) + if time_stamp.isdigit(): + ts = int(time_stamp) + + # 判断是毫秒还是秒 + if ts > 10 ** 12: + ts = ts / 1000 + + dt = datetime.fromtimestamp(ts) + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + else: + # 字符串格式时间 + dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + + except Exception: + time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + else: + time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + # 格式化输出 + output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}" + if fan_group: + output += f" / {fan_group} Lv{fan_level}" + output += f"):{txt}" + + # if color_hash: + # output += f" [彩虹弹幕]" + # if avatar: + # output += f" [头像: {avatar}]" + + file_name = f"{room_id}_{datetime.now().strftime('%Y%m%d')}.txt" + with open(file_name, 'a', encoding='utf-8') as f: + f.write(output + '\n') + + print(output) + + +def on_open(ws): + print("连接成功!发送登录和入组...") + ws.send(encode_douyu(f"type@=loginreq/roomid@={room_id}/dmbt@=chrome/dmbv@=0/")) + ws.send(encode_douyu(f"type@=joingroup/rid@={room_id}/gid@={room_id}/")) + + def heartbeat(): + while ws.sock and ws.sock.connected: + ws.send(encode_douyu("type@=mrkl/")) + # print("心跳发送...") + time.sleep(38) + + threading.Thread(target=heartbeat, daemon=True).start() + + +def on_error(ws, error): + print(f"错误: {error}") + + +def on_close(ws, code, msg): + print(f"连接关闭: {code} {msg}") + + +# 主程序 - 强制TLS 1.2/1.3兼容 + 无代理 +websocket.enableTrace(False) # 开启详细日志,便于调试 + +for url in ws_urls: + print(f"\n尝试连接: {url}") + try: + ws = websocket.WebSocketApp( + url, + on_open=on_open, + on_message=on_message, + on_error=on_error, + on_close=on_close, + header={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} + ) + + # 关键:自定义SSL上下文,禁用旧协议,允许更多套件 + sslopt = { + "cert_reqs": ssl.CERT_NONE, # 临时跳过证书验证(生产慎用) + "ssl_version": ssl.PROTOCOL_TLS_CLIENT, + "ciphers": "DEFAULT@SECLEVEL=1" # 降低安全级别兼容旧套件(如果必要) + } + + ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) + except Exception as e: + print(f"连接失败: {e}") + continue