import asyncio import json from typing import Dict, Any, List, Optional, Tuple, Set import aiohttp from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from db.connection import DBConnectionManager from utils.decorator.async_job import async_job from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML class DouyuRedisManager: def __init__(self, db_manager: DBConnectionManager): self.redis = db_manager.get_redis_connection() self.prefix = "bot:douyu:" def add_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.sadd(key, room_id) >= 0 def remove_group_room(self, group_id: str, room_id: str) -> bool: key = f"{self.prefix}group:{group_id}:rooms" return self.redis.srem(key, room_id) >= 0 def list_group_rooms(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:rooms" rooms = self.redis.smembers(key) or set() result = [] for r in rooms: result.append(r.decode("utf-8") if isinstance(r, bytes) else r) return sorted(result) def all_subscribed_rooms(self) -> Set[str]: groups = GroupBotManager.get_group_list() rooms: Set[str] = set() for gid in groups: for r in self.list_group_rooms(gid): rooms.add(r) return rooms def groups_for_room(self, room_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if room_id in set(self.list_group_rooms(gid)): res.append(gid) return res # --- 鱼吧相关方法 --- def add_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.sadd(key, hash_id) >= 0 def remove_group_yuba(self, group_id: str, hash_id: str) -> bool: key = f"{self.prefix}group:{group_id}:yubas" return self.redis.srem(key, hash_id) >= 0 def list_group_yubas(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:yubas" yubas = self.redis.smembers(key) or set() result = [] for y in yubas: result.append(y.decode("utf-8") if isinstance(y, bytes) else y) return sorted(result) def all_subscribed_yubas(self) -> Set[str]: groups = GroupBotManager.get_group_list() yubas: Set[str] = set() for gid in groups: for y in self.list_group_yubas(gid): yubas.add(y) return yubas def groups_for_yuba(self, hash_id: str) -> List[str]: groups = GroupBotManager.get_group_list() res = [] for gid in groups: if hash_id in set(self.list_group_yubas(gid)): res.append(gid) return res def get_yuba_last_id(self, hash_id: str) -> Optional[str]: key = f"{self.prefix}yuba_last_id:{hash_id}" data = self.redis.get(key) if not data: return None return data.decode("utf-8") if isinstance(data, bytes) else data def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool: key = f"{self.prefix}yuba_last_id:{hash_id}" return self.redis.set(key, feed_id) # --- 提醒名单方法 --- def add_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.sadd(key, user_id) >= 0 def remove_group_subscriber(self, group_id: str, user_id: str) -> bool: key = f"{self.prefix}group:{group_id}:subscribers" return self.redis.srem(key, user_id) >= 0 def list_group_subscribers(self, group_id: str) -> List[str]: key = f"{self.prefix}group:{group_id}:subscribers" subs = self.redis.smembers(key) or set() result = [] for s in subs: result.append(s.decode("utf-8") if isinstance(s, bytes) else s) return sorted(result) def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]: key = f"{self.prefix}room_status:{room_id}" data = self.redis.get(key) if not data: return None if isinstance(data, bytes): data = data.decode("utf-8") try: return json.loads(data) except Exception: return None def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool: key = f"{self.prefix}room_status:{room_id}" return self.redis.set(key, json.dumps(status, ensure_ascii=False)) class DouyuPlugin(MessagePluginInterface): FEATURE_KEY = "DOUYU_MONITOR" FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]" @property def name(self) -> str: return "斗鱼直播" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "斗鱼主播开播下播提醒与群订阅管理" @property def author(self) -> str: return "ABOT Team" @property def command_prefix(self) -> Optional[str]: return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.bot: WechatAPIClient = None self.feature = self.register_feature() self.redis_manager: Optional[DouyuRedisManager] = None self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒", "订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表"] self._api_template = "https://www.douyu.com/betard/{room_id}" self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList" self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" self._check_interval = 5 async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job) async def _scheduled_unified_check_job(self): """统一检查直播和鱼吧动态""" await self._scheduled_check_job() await self._scheduled_yuba_check_job() def initialize(self, context: Dict[str, Any]) -> bool: try: dbm = DBConnectionManager.get_instance() self.redis_manager = DouyuRedisManager(dbm) cfg = self._config.get("Douyu", {}) cfg_cmds = cfg.get("command", []) if isinstance(cfg_cmds, list) and cfg_cmds: self._commands = list(dict.fromkeys(cfg_cmds + self._commands)) self._api_template = cfg.get("api_url_template", self._api_template) self._user_agent = cfg.get("user_agent", self._user_agent) self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval)) return True except Exception as e: logger.error(f"{self.name} 初始化失败: {e}") return False def start(self) -> bool: self.status = PluginStatus.RUNNING return True def stop(self) -> bool: self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: content = str(message.get("content", "")).strip() if not content: return False first_token = content.split()[0] return first_token in self._commands @plugin_stats_decorator(plugin_name="斗鱼直播") async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: content = str(message.get("content", "")).strip() sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") self.bot: WechatAPIClient = message.get("bot") if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" first_token = content.split()[0] if content == "斗鱼订阅列表": rooms = self.redis_manager.list_group_rooms(roomid or sender) if not rooms: await self.bot.send_text_message(roomid or sender, "暂无订阅", sender) return True, "暂无订阅" text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.add_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender]) return True, "加入提醒名单成功" if ok else "加入提醒名单失败" if first_token == "取消斗鱼订阅提醒": if not roomid: await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender) return True, "仅支持群聊" ok = self.redis_manager.remove_group_subscriber(roomid, sender) await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender]) return True, "取消提醒成功" if ok else "取消提醒失败" if first_token == "斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.add_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消斗鱼订阅": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender) return True, "命令格式错误" room_id = parts[1].strip() if not room_id.isdigit(): await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender) return True, "命令格式错误" ok = self.redis_manager.remove_group_room(roomid or sender, room_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender) return True, "取消成功" if ok else "取消失败" if content == "鱼吧订阅列表": yubas = self.redis_manager.list_group_yubas(roomid or sender) if not yubas: await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender) return True, "暂无鱼吧订阅" text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas) await self.bot.send_text_message(roomid or sender, text, sender) return True, "列表已发送" if first_token == "订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender) return True, "订阅成功" if ok else "订阅失败" if first_token == "取消订阅鱼吧": parts = content.split() if len(parts) < 2: await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:取消订阅鱼吧 PDAP2zEk3nwx", sender) return True, "命令格式错误" hash_id = parts[1].strip() ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id) await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender) return True, "取消成功" if ok else "取消失败" return False, None async def _scheduled_check_job(self): try: rooms = self.redis_manager.all_subscribed_rooms() if not rooms: return async with aiohttp.ClientSession() as session: for room_id in rooms: try: url = self._api_template.format(room_id=room_id) headers = { "User-Agent": self._user_agent, "Referer": f"https://www.douyu.com/{room_id}" } async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp: data = await resp.json(content_type=None) room_info = data.get("room", {}) if isinstance(data, dict) else {} show_status = room_info.get("show_status") nickname = room_info.get("nickname", "") room_name = room_info.get("room_name", "") avatar = room_info.get("avatar", {}) or {} thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip() video_loop_raw = room_info.get("videoLoop", 0) try: video_loop = int(str(video_loop_raw)) except Exception: video_loop = 0 prev = self.redis_manager.get_room_status(room_id) or {} prev_live = prev.get("is_live") curr_live = True if show_status == 1 and video_loop == 0 else False status_obj = { "is_live": curr_live, "nickname": nickname, "room_name": room_name, "is_loop": True if video_loop == 1 else False } self.redis_manager.set_room_status(room_id, status_obj) if prev_live is None and curr_live is False: continue if prev_live is None and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url) continue if prev_live is False and curr_live is True: await self._notify_groups_live(room_id, nickname, room_name, thumb_url) continue if prev_live is True and curr_live is False: await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1) continue await asyncio.sleep(0.1) except Exception as e: logger.error(f"斗鱼检查失败: {e}") continue except Exception as e: logger.error(f"斗鱼定时任务异常: {e}") async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str): groups = self.redis_manager.groups_for_room(room_id) text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看:https://www.douyu.com/{room_id}" xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url) for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: subs = self.redis_manager.list_group_subscribers(gid) if subs: await self.bot.send_at_message(gid, text, subs) else: await self.bot.send_text_message(gid, text) await self.bot.send_link_xml_message(xml_content, gid) except Exception as e: logger.error(f"发送斗鱼开播提醒失败: {e}") continue async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False): groups = self.redis_manager.groups_for_room(room_id) text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}" if is_loop: text += "(当前为轮播)" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼下播提醒失败: {e}") continue async def _scheduled_yuba_check_job(self): try: yubas = self.redis_manager.all_subscribed_yubas() if not yubas: return async with aiohttp.ClientSession() as session: for hash_id in yubas: try: params = { "filter_type": 1, "hash_id": hash_id, "limit": 10, "offset": 0 } headers = { "User-Agent": self._user_agent, "Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news", } async with session.get(self._yuba_api, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp: data = await resp.json(content_type=None) if data.get("error") != 0: logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}") continue feed_list = data.get("data", {}).get("feed_list", []) # 查找第一条【非置顶】动态 target_feed = None for feed in feed_list: if feed.get("home_feed_top") == 1: continue target_feed = feed break if not target_feed: continue feed_id = str(target_feed.get("feed_id")) last_id = self.redis_manager.get_yuba_last_id(hash_id) if last_id and feed_id == last_id: continue # 发现新动态 nickname = target_feed.get("publisher", {}).get("nickname", "未知主播") content = target_feed.get("text", "") ctime = target_feed.get("ctime") from datetime import datetime publish_time = datetime.fromtimestamp(int(ctime)).strftime('%Y-%m-%d %H:%M:%S') if ctime else "未知时间" # 限制内容长度 if len(content) > 200: content = content[:200] + "..." full_url = f"https://yuba.douyu.com/feed/{feed_id}" await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time) # 保存标记 self.redis_manager.set_yuba_last_id(hash_id, feed_id) await asyncio.sleep(0.5) except Exception as e: logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}") continue except Exception as e: logger.error(f"斗鱼鱼吧定时任务异常: {e}") async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str, publish_time: str = "未知时间"): groups = self.redis_manager.groups_for_yuba(hash_id) text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}" for gid in groups: if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED: try: await self.bot.send_text_message(gid, text) except Exception as e: logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}") continue