Files
abot/plugins/douyu/main.py
2026-01-22 17:22:22 +08:00

254 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
from typing import Dict, Any, List, Optional, Tuple, Set
import aiohttp
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from db.connection import DBConnectionManager
from utils.decorator.async_job import async_job
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class DouyuRedisManager:
def __init__(self, db_manager: DBConnectionManager):
self.redis = db_manager.get_redis_connection()
self.prefix = "bot:douyu:"
def add_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.sadd(key, room_id) >= 0
def remove_group_room(self, group_id: str, room_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:rooms"
return self.redis.srem(key, room_id) >= 0
def list_group_rooms(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:rooms"
rooms = self.redis.smembers(key) or set()
result = []
for r in rooms:
result.append(r.decode("utf-8") if isinstance(r, bytes) else r)
return sorted(result)
def all_subscribed_rooms(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
rooms: Set[str] = set()
for gid in groups:
for r in self.list_group_rooms(gid):
rooms.add(r)
return rooms
def groups_for_room(self, room_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if room_id in set(self.list_group_rooms(gid)):
res.append(gid)
return res
def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]:
key = f"{self.prefix}room_status:{room_id}"
data = self.redis.get(key)
if not data:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
try:
return json.loads(data)
except Exception:
return None
def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool:
key = f"{self.prefix}room_status:{room_id}"
return self.redis.set(key, json.dumps(status, ensure_ascii=False), ex=86400)
class DouyuPlugin(MessagePluginInterface):
FEATURE_KEY = "DOUYU_MONITOR"
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
@property
def name(self) -> str:
return "斗鱼直播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "斗鱼主播开播下播提醒与群订阅管理"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return ""
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
self.redis_manager: Optional[DouyuRedisManager] = None
self._commands = ["订阅斗鱼", "取消订阅斗鱼", "斗鱼订阅列表"]
self._api_template = "https://www.douyu.com/betard/{room_id}"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
async_job.every_minutes(self._check_interval)(self._scheduled_check_job)
def initialize(self, context: Dict[str, Any]) -> bool:
try:
dbm = DBConnectionManager.get_instance()
self.redis_manager = DouyuRedisManager(dbm)
cfg = self._config.get("Douyu", {})
self._commands = cfg.get("command", self._commands)
self._api_template = cfg.get("api_url_template", self._api_template)
self._user_agent = cfg.get("user_agent", self._user_agent)
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
return True
except Exception as e:
logger.error(f"{self.name} 初始化失败: {e}")
return False
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
content = str(message.get("content", "")).strip()
if not content:
return False
for cmd in self._commands:
if content.startswith(cmd):
return True
return False
@plugin_stats_decorator(plugin_name="斗鱼直播")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
content = str(message.get("content", "")).strip()
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
if content.startswith("订阅斗鱼"):
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
ok = self.redis_manager.add_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if content.startswith("取消订阅斗鱼"):
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender)
return True, "命令格式错误"
room_id = parts[1].strip()
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
return True, "取消成功" if ok else "取消失败"
if content.startswith("斗鱼订阅列表"):
rooms = self.redis_manager.list_group_rooms(roomid or sender)
if not rooms:
await self.bot.send_text_message(roomid or sender, "暂无订阅", sender)
return True, "暂无订阅"
text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
return False, None
async def _scheduled_check_job(self):
try:
rooms = self.redis_manager.all_subscribed_rooms()
if not rooms:
return
async with aiohttp.ClientSession() as session:
for room_id in rooms:
try:
url = self._api_template.format(room_id=room_id)
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://www.douyu.com/{room_id}"
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
room_info = data.get("room", {}) if isinstance(data, dict) else {}
show_status = room_info.get("show_status")
nickname = room_info.get("nickname", "")
room_name = room_info.get("room_name", "")
prev = self.redis_manager.get_room_status(room_id) or {}
prev_live = prev.get("is_live")
curr_live = True if show_status == 1 else False
status_obj = {
"is_live": curr_live,
"nickname": nickname,
"room_name": room_name
}
self.redis_manager.set_room_status(room_id, status_obj)
if prev_live is None and curr_live is False:
continue
if prev_live is None and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name)
continue
if prev_live is False and curr_live is True:
await self._notify_groups_live(room_id, nickname, room_name)
continue
if prev_live is True and curr_live is False:
await self._notify_groups_offline(room_id, nickname, room_name)
continue
await asyncio.sleep(0.1)
except Exception as e:
logger.error(f"斗鱼检查失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼定时任务异常: {e}")
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str):
groups = self.redis_manager.groups_for_room(room_id)
text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看https://www.douyu.com/{room_id}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼开播提醒失败: {e}")
continue
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str):
groups = self.redis_manager.groups_for_room(room_id)
text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue