import asyncio import json from datetime import datetime import os import threading import time from typing import Dict, Any, List, Optional, Tuple, Set import aiohttp from loguru import logger import ssl import zlib try: import websocket except ImportError: websocket = None from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML class DouyuDanmuRecorder: def __init__(self, room_id: str, user_agent: str): self.room_id = room_id self.user_agent = user_agent self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._ws: Optional[websocket.WebSocketApp] = None self._buffer: List[str] = [] self._buffer_limit = 10 self._buffer_date: Optional[str] = None self._lock = threading.Lock() self._websocket_available = websocket is not None def _encode(self, msg: str) -> bytes: content = msg.encode("utf-8") + b"\x00" length = len(content) + 8 head = length.to_bytes(4, "little") * 2 head += (689).to_bytes(2, "little") head += b"\x00\x00" return head + content def _on_message(self, ws, message): try: decompressed = zlib.decompress(message, -zlib.MAX_WBITS) data = decompressed.decode("utf-8", errors="ignore") except Exception: data = message.decode("utf-8", errors="ignore") for line in data.split("\x00"): line = line.strip() if not line: continue if "type@=chatmsg" not in line: continue parts: Dict[str, Any] = {} for pair in line.split("/"): if "@=" in pair: key, value = pair.split("@=", 1) parts[key] = value nick = parts.get("nn", "未知") txt = parts.get("txt", "") uid = parts.get("uid", "未知") level = parts.get("level", "0") fan_group = parts.get("bnn", "") fan_level = parts.get("bl", "0") time_stamp = parts.get("cst", "") if time_stamp: try: if time_stamp.isdigit(): ts = int(time_stamp) if ts > 10 ** 12: ts = ts / 1000 dt = datetime.fromtimestamp(ts) time_str = dt.strftime("%Y-%m-%d %H:%M:%S") else: dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S") time_str = dt.strftime("%Y-%m-%d %H:%M:%S") except Exception: time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") else: time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}" if fan_group: output += f" / {fan_group} Lv{fan_level}" output += f"):{txt}" self._append_and_maybe_flush(output) def _flush_locked(self): if not self._buffer or self._buffer_date is None: return dir_path = os.path.join("temp", "douyu_danmu", self._buffer_date) os.makedirs(dir_path, exist_ok=True) file_name = os.path.join(dir_path, f"{self.room_id}_{self._buffer_date}.txt") data = "\n".join(self._buffer) + "\n" with open(file_name, "a", encoding="utf-8") as f: f.write(data) self._buffer.clear() def _append_and_maybe_flush(self, line: str): now = datetime.now() date_str = now.strftime("%Y%m%d") with self._lock: if self._buffer_date is None: self._buffer_date = date_str elif date_str != self._buffer_date: self._flush_locked() self._buffer_date = date_str self._buffer.append(line) if len(self._buffer) >= self._buffer_limit: self._flush_locked() def _flush(self): with self._lock: self._flush_locked() def _on_open(self, ws): ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/")) ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@={self.room_id}/")) def heartbeat(): while ws.sock and ws.sock.connected and not self._stop_event.is_set(): try: ws.send(self._encode("type@=mrkl/")) except Exception: break time.sleep(38) threading.Thread(target=heartbeat, daemon=True).start() def _on_error(self, ws, error): logger.error(f"斗鱼弹幕错误({self.room_id}): {error}") def _on_close(self, ws, code, msg): logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}") def _run(self): if not self._websocket_available: logger.error(f"websocket-client 未安装,无法记录弹幕({self.room_id})") return try: websocket.enableTrace(False) ws_urls = [ "wss://danmuproxy.douyu.com:8501/", "wss://danmuproxy.douyu.com:8502/", "wss://danmuproxy.douyu.com:8503/", "wss://danmuproxy.douyu.com:8504/", "wss://danmuproxy.douyu.com:8505/", "wss://danmuproxy.douyu.com:8506/", ] sslopt = { "cert_reqs": ssl.CERT_NONE, "ssl_version": ssl.PROTOCOL_TLS_CLIENT, "ciphers": "DEFAULT@SECLEVEL=1", } headers = {"User-Agent": self.user_agent} for url in ws_urls: if self._stop_event.is_set(): break try: self._ws = websocket.WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, header=headers, ) self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10) except Exception as e: logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}") continue finally: self._ws = None if self._stop_event.is_set(): break time.sleep(1) finally: self._ws = None def start(self): if self._thread and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self): self._flush() self._stop_event.set() if self._ws: try: self._ws.close() except Exception: pass class DouyuRedisManager: def __init__(self, db_manager: DBConnectionManager): self.redis = db_manager.get_redis_connection() self.prefix = "bot:douyu:" def add_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.sadd(key, room_id) >= 0 def remove_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.srem(key, room_id) >= 0 def list_group_rooms(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:rooms" rooms = self.redis.smembers(key) or set() result = [] for r in rooms: result.append(r.decode("utf-8") if isinstance(r, bytes) else r) return sorted(result) def all_subscribed_rooms(self) -> Set[str]: groups = GroupBotManager.get_group_list() rooms: Set[str] = set() for gid in groups: for r in self.list_group_rooms(gid): rooms.add(r) return rooms def groups_for_room(self, room_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if room_id in set(self.list_group_rooms(gid)): res.append(gid) return res # --- 鱼吧相关方法 --- def add_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.sadd(key, hash_id) >= 0 def remove_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.srem(key, hash_id) >= 0 def list_group_yubas(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:yubas" yubas = self.redis.smembers(key) or set() result = [] for y in yubas: result.append(y.decode("utf-8") if isinstance(y, bytes) else y) return sorted(result) def all_subscribed_yubas(self) -> Set[str]: groups = GroupBotManager.get_group_list() yubas: Set[str] = set() for gid in groups: for y in self.list_group_yubas(gid): yubas.add(y) return yubas def groups_for_yuba(self, hash_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if hash_id in set(self.list_group_yubas(gid)): res.append(gid) return res def get_yuba_last_id(self, hash_id: str) -> Optional[str]: key = f"{self.prefix}yuba_last_id:{hash_id}" data = self.redis.get(key) if not data: return None return data.decode("utf-8") if isinstance(data, bytes) else data def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool: key = f"{self.prefix}yuba_last_id:{hash_id}" return self.redis.set(key, feed_id) # --- 提醒名单方法 --- def add_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.sadd(key, user_id) >= 0 def remove_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.srem(key, user_id) >= 0 def list_group_subscribers(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:subscribers" subs = self.redis.smembers(key) or set() result = [] for s in subs: result.append(s.decode("utf-8") if isinstance(s, bytes) else s) return sorted(result) def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]: key = f"{self.prefix}room_status:{room_id}" data = self.redis.get(key) if not data: return None if isinstance(data, bytes): data = data.decode("utf-8") try: return json.loads(data) except Exception: return None def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool: key = f"{self.prefix}room_status:{room_id}" return self.redis.set(key, json.dumps(status, ensure_ascii=False)) class DouyuPlugin(MessagePluginInterface): FEATURE_KEY = "DOUYU_MONITOR" FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]" @property def name(self) -> str: return "斗鱼直播" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "斗鱼主播开播下播提醒与群订阅管理" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.bot: WechatAPIClient = None self.feature = self.register_feature() self.redis_manager: Optional[DouyuRedisManager] = None self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒", "订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表"] self._api_template = "https://www.douyu.com/betard/{room_id}" self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList" self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" self._check_interval = 5 self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {} async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) async def _scheduled_unified_check_job(self): """统一检查直播和鱼吧动态""" await self._scheduled_check_job() await self._scheduled_yuba_check_job() def initialize(self, context: Dict[str, Any]) -> bool: try: dbm = DBConnectionManager.get_instance() self.redis_manager = DouyuRedisManager(dbm) cfg = self._config.get("Douyu", {}) cfg_cmds = cfg.get("command", []) if isinstance(cfg_cmds, list) and cfg_cmds: self._commands = list(dict.fromkeys(cfg_cmds + self._commands)) self._api_template = cfg.get("api_url_template", self._api_template) self._user_agent = cfg.get("user_agent", self._user_agent) self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval)) return True except Exception as e: logger.error(f"{self.name} 初始化失败: {e}") return False def start(self) -> bool: self.status = PluginStatus.RUNNING return True def stop(self) -> bool: for recorder in getattr(self, "_danmu_recorders", {}).values(): recorder.stop() self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: content = str(message.get("content", "")).strip() if not content: return False first_token = content.split()[0] return first_token in self._commands @plugin_stats_decorator(plugin_name="斗鱼直播") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") self.bot: WechatAPIClient = message.get("bot") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" first_token = content.split()[0] if content == "斗鱼订阅列表": rooms = self.redis_manager.list_group_rooms(roomid or sender) if not rooms: await self.bot.send_text_message(roomid or sender, "暂无订阅", sender) return True, "暂无订阅" text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.add_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender]) return True, "加入提醒名单成功" if ok else "加入提醒名单失败" if first_token == "取消斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.remove_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender]) return True, "取消提醒成功" if ok else "取消提醒失败" if first_token == "斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.add_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.remove_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender) return True, "取消成功" if ok else "取消失败" if content == "鱼吧订阅列表": yubas = self.redis_manager.list_group_yubas(roomid or sender) if not yubas: await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender) return True, "暂无鱼吧订阅" text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:取消订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender) return True, "取消成功" if ok else "取消失败" return False, None async def _scheduled_check_job(self): try: rooms = self.redis_manager.all_subscribed_rooms() if not rooms: return async with aiohttp.ClientSession() as session: for room_id in rooms: try: url = self._api_template.format(room_id=room_id) headers = { "User-Agent": self._user_agent, "Referer": f"https://www.douyu.com/{room_id}" } async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: data = await resp.json(content_type=None) room_info = data.get("room", {}) if isinstance(data, dict) else {} show_status = room_info.get("show_status") nickname = room_info.get("nickname", "") room_name = room_info.get("room_name", "") avatar = room_info.get("avatar", {}) or {} thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip() video_loop_raw = room_info.get("videoLoop", 0) try: video_loop = int(str(video_loop_raw)) except Exception: video_loop = 0 prev = self.redis_manager.get_room_status(room_id) or {} prev_live = prev.get("is_live") curr_live = True if show_status == 1 and video_loop == 0 else False status_obj = { "is_live": curr_live, "nickname": nickname, "room_name": room_name, "is_loop": True if video_loop == 1 else False } self.redis_manager.set_room_status(room_id, status_obj) if prev_live is None and curr_live is False: continue if prev_live is None and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url) continue if prev_live is False and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url) continue if prev_live is True and curr_live is False: await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1) continue if prev_live is True and curr_live is True and room_id not in self._danmu_recorders: try: logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: logger.error(f"补偿启动斗鱼弹幕记录失败({room_id}): {e}") continue await asyncio.sleep(0.1) except Exception as e: logger.error(f"斗鱼检查失败: {e}") continue except Exception as e: logger.error(f"斗鱼定时任务异常: {e}") async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str): groups = self.redis_manager.groups_for_room(room_id) text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看:https://www.douyu.com/{room_id}" xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url) for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: subs = self.redis_manager.list_group_subscribers(gid) if subs: await self.bot.send_at_message(gid, text, subs) else: await self.bot.send_text_message(gid, text) await self.bot.send_link_xml_message(xml_content, gid) except Exception as e: logger.error(f"发送斗鱼开播提醒失败: {e}") continue try: logger.info(f"启动斗鱼弹幕记录({room_id})") self._start_danmu_record(room_id) except Exception as e: logger.error(f"启动斗鱼弹幕记录失败({room_id}): {e}") async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False): groups = self.redis_manager.groups_for_room(room_id) text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}" if is_loop: text += "(当前为轮播)" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼下播提醒失败: {e}") continue try: logger.info(f"停止斗鱼弹幕记录({room_id})") self._stop_danmu_record(room_id) except Exception as e: logger.error(f"停止斗鱼弹幕记录失败({room_id}): {e}") async def _scheduled_yuba_check_job(self): try: yubas = self.redis_manager.all_subscribed_yubas() if not yubas: return async with aiohttp.ClientSession() as session: for hash_id in yubas: try: params = { "filter_type": 1, "hash_id": hash_id, "limit": 10, "offset": 0 } headers = { "User-Agent": self._user_agent, "Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news", } async with session.get(self._yuba_api, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp: data = await resp.json(content_type=None) if data.get("error") != 0: logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}") continue feed_list = data.get("data", {}).get("feed_list", []) # 查找第一条【非置顶】动态 target_feed = None for feed in feed_list: if feed.get("home_feed_top") == 1: continue target_feed = feed break if not target_feed: continue feed_id = str(target_feed.get("feed_id")) last_id = self.redis_manager.get_yuba_last_id(hash_id) if last_id and feed_id == last_id: continue # 发现新动态 nickname = target_feed.get("publisher", {}).get("nickname", "未知主播") content = target_feed.get("text", "") ctime = target_feed.get("ctime") from datetime import datetime publish_time = datetime.fromtimestamp(int(ctime)).strftime( '%Y-%m-%d %H:%M:%S') if ctime else "未知时间" # 限制内容长度 if len(content) > 200: content = content[:200] + "..." full_url = f"https://yuba.douyu.com/feed/{feed_id}" await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time) # 保存标记 self.redis_manager.set_yuba_last_id(hash_id, feed_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}") continue except Exception as e: logger.error(f"斗鱼鱼吧定时任务异常: {e}") async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str, publish_time: str = "未知时间"): groups = self.redis_manager.groups_for_yuba(hash_id) text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}") continue def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder: recorder = self._danmu_recorders.get(room_id) if not recorder: recorder = DouyuDanmuRecorder(room_id, self._user_agent) self._danmu_recorders[room_id] = recorder return recorder def _start_danmu_record(self, room_id: str): recorder = self._get_danmu_recorder(room_id) recorder.start() def _stop_danmu_record(self, room_id: str): recorder = self._danmu_recorders.get(room_id) if recorder: recorder.stop()