加入斗鱼开播下播提醒

This commit is contained in:
liuwei
2026-01-22 17:15:19 +08:00
parent 78b23767c3
commit 9e81e7e035
6 changed files with 792 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from .main import DouyuPlugin
def get_plugin():
return DouyuPlugin()

View File

@@ -0,0 +1,6 @@
[Douyu]
enable = true
command = ["订阅斗鱼", "取消订阅斗鱼", "斗鱼订阅列表"]
check_interval_minutes = 5
api_url_template = "https://www.douyu.com/betard/{room_id}"
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"

254
plugins/douyu/main.py Normal file
View File

@@ -0,0 +1,254 @@
import asyncio
import json
from typing import Dict, Any, List, Optional, Tuple, Set
import aiohttp
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class DouyuRedisManager:
def __init__(self, db_manager: DBConnectionManager):
self.redis = db_manager.get_redis_connection()
self.prefix = "bot:douyu:"
def add_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.sadd(key, room_id) >= 0
def remove_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.srem(key, room_id) >= 0
def list_group_rooms(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:rooms"
rooms = self.redis.smembers(key) or set()
result = []
for r in rooms:
result.append(r.decode("utf-8") if isinstance(r, bytes) else r)
return sorted(result)
def all_subscribed_rooms(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
rooms: Set[str] = set()
for gid in groups:
for r in self.list_group_rooms(gid):
rooms.add(r)
return rooms
def groups_for_room(self, room_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if room_id in set(self.list_group_rooms(gid)):
res.append(gid)
return res
def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room_status:{room_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool:
key = f"{self.prefix}room_status:{room_id}"
return self.redis.set(key, json.dumps(status, ensure_ascii=False), ex=86400)
class DouyuPlugin(MessagePluginInterface):
FEATURE_KEY = "DOUYU_MONITOR"
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
@property
def name(self) -> str:
return "斗鱼直播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "斗鱼主播开播下播提醒与群订阅管理"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
self.redis_manager: Optional[DouyuRedisManager] = None
self._commands = ["订阅斗鱼", "取消订阅斗鱼", "斗鱼订阅列表"]
self._api_template = "https://www.douyu.com/betard/{room_id}"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
def initialize(self, context: Dict[str, Any]) -> bool:
try:
dbm = DBConnectionManager.get_instance()
self.redis_manager = DouyuRedisManager(dbm)
cfg = self._config.get("Douyu", {})
self._commands = cfg.get("command", self._commands)
self._api_template = cfg.get("api_url_template", self._api_template)
self._user_agent = cfg.get("user_agent", self._user_agent)
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
async_job.every_minutes(self._check_interval)(self._scheduled_check_job)
return True
except Exception as e:
logger.error(f"{self.name} 初始化失败: {e}")
return False
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
content = str(message.get("content", "")).strip()
if not content:
return False
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="斗鱼直播")
@plugin_points_cost(1, "斗鱼提醒消耗积分", FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
if content.startswith("订阅斗鱼"):
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
ok = self.redis_manager.add_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if content.startswith("取消订阅斗鱼"):
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
return True, "取消成功" if ok else "取消失败"
if content.startswith("斗鱼订阅列表"):
rooms = self.redis_manager.list_group_rooms(roomid or sender)
if not rooms:
await self.bot.send_text_message(roomid or sender, "暂无订阅", sender)
return True, "暂无订阅"
text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
return False, None
async def _scheduled_check_job(self):
try:
rooms = self.redis_manager.all_subscribed_rooms()
if not rooms:
return
async with aiohttp.ClientSession() as session:
for room_id in rooms:
try:
url = self._api_template.format(room_id=room_id)
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://www.douyu.com/{room_id}"
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
room_info = data.get("room", {}) if isinstance(data, dict) else {}
show_status = room_info.get("show_status")
nickname = room_info.get("nickname", "")
room_name = room_info.get("room_name", "")
prev = self.redis_manager.get_room_status(room_id) or {}
prev_live = prev.get("is_live")
curr_live = True if show_status == 1 else False
status_obj = {
"is_live": curr_live,
"nickname": nickname,
"room_name": room_name
}
self.redis_manager.set_room_status(room_id, status_obj)
if prev_live is None and curr_live is False:
continue
if prev_live is None and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name)
continue
if prev_live is False and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name)
continue
if prev_live is True and curr_live is False:
await self._notify_groups_offline(room_id, nickname, room_name)
continue
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"斗鱼检查失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼定时任务异常: {e}")
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str):
groups = self.redis_manager.groups_for_room(room_id)
text = f"斗鱼直播提醒:{nickname} 开播了\n标题:{room_name}\n地址https://www.douyu.com/{room_id}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼开播提醒失败: {e}")
continue
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str):
groups = self.redis_manager.groups_for_room(room_id)
text = f"斗鱼直播提醒:{nickname} 下播了\n标题:{room_name}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue