斗鱼鱼吧订阅功能

This commit is contained in:
liuwei
2026-01-30 14:57:53 +08:00
parent 34230e60ab
commit 0c667d8ba3
2 changed files with 254 additions and 2 deletions

View File

@@ -53,6 +53,51 @@ class DouyuRedisManager:
res.append(gid)
return res
# --- 鱼吧相关方法 ---
def add_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.sadd(key, hash_id) >= 0
def remove_group_yuba(self, group_id: str, hash_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:yubas"
return self.redis.srem(key, hash_id) >= 0
def list_group_yubas(self, group_id: str) -> List[str]:
key = f"{self.prefix}group:{group_id}:yubas"
yubas = self.redis.smembers(key) or set()
result = []
for y in yubas:
result.append(y.decode("utf-8") if isinstance(y, bytes) else y)
return sorted(result)
def all_subscribed_yubas(self) -> Set[str]:
groups = GroupBotManager.get_group_list()
yubas: Set[str] = set()
for gid in groups:
for y in self.list_group_yubas(gid):
yubas.add(y)
return yubas
def groups_for_yuba(self, hash_id: str) -> List[str]:
groups = GroupBotManager.get_group_list()
res = []
for gid in groups:
if hash_id in set(self.list_group_yubas(gid)):
res.append(gid)
return res
def get_yuba_last_id(self, hash_id: str) -> Optional[str]:
key = f"{self.prefix}yuba_last_id:{hash_id}"
data = self.redis.get(key)
if not data:
return None
return data.decode("utf-8") if isinstance(data, bytes) else data
def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool:
key = f"{self.prefix}yuba_last_id:{hash_id}"
return self.redis.set(key, feed_id)
# --- 提醒名单方法 ---
def add_group_subscriber(self, group_id: str, user_id: str) -> bool:
key = f"{self.prefix}group:{group_id}:subscribers"
return self.redis.sadd(key, user_id) >= 0
@@ -127,11 +172,18 @@ class DouyuPlugin(MessagePluginInterface):
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
self.redis_manager: Optional[DouyuRedisManager] = None
self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒"]
self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒",
"订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表"]
self._api_template = "https://www.douyu.com/betard/{room_id}"
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
self._check_interval = 5
async_job.every_minutes(self._check_interval)(self._scheduled_check_job)
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
async def _scheduled_unified_check_job(self):
"""统一检查直播和鱼吧动态"""
await self._scheduled_check_job()
await self._scheduled_yuba_check_job()
def initialize(self, context: Dict[str, Any]) -> bool:
try:
@@ -220,6 +272,36 @@ class DouyuPlugin(MessagePluginInterface):
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
return True, "取消成功" if ok else "取消失败"
if content == "鱼吧订阅列表":
yubas = self.redis_manager.list_group_yubas(roomid or sender)
if not yubas:
await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender)
return True, "暂无鱼吧订阅"
text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas)
await self.bot.send_text_message(roomid or sender, text, sender)
return True, "列表已发送"
if first_token == "订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如订阅鱼吧 PDAP2zEk3nwx", sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender)
return True, "订阅成功" if ok else "订阅失败"
if first_token == "取消订阅鱼吧":
parts = content.split()
if len(parts) < 2:
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id例如取消订阅鱼吧 PDAP2zEk3nwx", sender)
return True, "命令格式错误"
hash_id = parts[1].strip()
ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id)
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender)
return True, "取消成功" if ok else "取消失败"
return False, None
async def _scheduled_check_job(self):
@@ -305,3 +387,82 @@ class DouyuPlugin(MessagePluginInterface):
except Exception as e:
logger.error(f"发送斗鱼下播提醒失败: {e}")
continue
async def _scheduled_yuba_check_job(self):
try:
yubas = self.redis_manager.all_subscribed_yubas()
if not yubas:
return
async with aiohttp.ClientSession() as session:
for hash_id in yubas:
try:
params = {
"filter_type": 1,
"hash_id": hash_id,
"limit": 10,
"offset": 0
}
headers = {
"User-Agent": self._user_agent,
"Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news",
}
async with session.get(self._yuba_api, headers=headers, params=params, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json(content_type=None)
if data.get("error") != 0:
logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}")
continue
feed_list = data.get("data", {}).get("feed_list", [])
# 查找第一条【非置顶】动态
target_feed = None
for feed in feed_list:
if feed.get("home_feed_top") == 1:
continue
target_feed = feed
break
if not target_feed:
continue
feed_id = str(target_feed.get("feed_id"))
last_id = self.redis_manager.get_yuba_last_id(hash_id)
if last_id and feed_id == last_id:
continue
# 发现新动态
nickname = target_feed.get("publisher", {}).get("nickname", "未知主播")
content = target_feed.get("text", "")
# 限制内容长度
if len(content) > 200:
content = content[:200] + "..."
full_url = f"https://yuba.douyu.com/feed/{feed_id}"
await self._notify_groups_yuba(hash_id, nickname, content, full_url)
# 保存标记
self.redis_manager.set_yuba_last_id(hash_id, feed_id)
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}")
continue
except Exception as e:
logger.error(f"斗鱼鱼吧定时任务异常: {e}")
async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str):
groups = self.redis_manager.groups_for_yuba(hash_id)
text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n📝 内容:{content}\n🔗 链接:{url}"
for gid in groups:
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
try:
subs = self.redis_manager.list_group_subscribers(gid)
if subs:
await self.bot.send_at_message(gid, text, subs)
else:
await self.bot.send_text_message(gid, text)
except Exception as e:
logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}")
continue

91
test/douyu_news.py Normal file
View File

@@ -0,0 +1,91 @@
import requests
import os
from datetime import datetime
class DouyuYubaMonitor:
def __init__(self, hash_id):
self.hash_id = hash_id
self.api_url = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
self.record_file = f"last_processed_{hash_id}.txt"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news",
}
def get_last_id(self):
if os.path.exists(self.record_file):
with open(self.record_file, "r") as f:
return f.read().strip()
return None
def save_last_id(self, feed_id):
with open(self.record_file, "w") as f:
f.write(str(feed_id))
def fetch_and_process(self):
params = {
"filter_type": 1,
"hash_id": self.hash_id,
"limit": 10,
"offset": 0
}
try:
response = requests.get(self.api_url, headers=self.headers, params=params, timeout=10)
data = response.json()
if data.get("error") != 0:
print(f"API Error: {data.get('msg')}")
return
feed_list = data.get("data", {}).get("feed_list", [])
# 查找第一条【非置顶】动态
target_feed = None
for feed in feed_list:
if feed.get("home_feed_top") == 1:
continue
target_feed = feed
break
if not target_feed:
print("未发现有效动态(可能全是置顶)。")
return
feed_id = str(target_feed.get("feed_id"))
last_id = self.get_last_id()
# 标记检查
if feed_id == last_id:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 监控中,暂无最新非置顶消息。")
return
# 提取参数拼接精准链接
# 根据你提供的样本origin 固定为 9, scode 可以从 publisher 逻辑或固定获取(通常 scode 是动态的,这里采用 feed 中的核心参数)
group_id = target_feed.get("group", {}).get("group_id", "0")
# 斗鱼 Web 端scode通常在分享链接中生成这里优先匹配你给出的格式
# 如果接口没直接给 scode可以根据业务需求固定或留空这里保留你给出的示例参数
full_url = f"https://yuba.douyu.com/feed/{feed_id}"
# 输出结果
print("\n" + "" * 50)
print(f"【提取到最新动态】")
print(f"发布时间: {datetime.fromtimestamp(int(target_feed.get('ctime'))).strftime('%Y-%m-%d %H:%M:%S')}")
print(f"正文内容: {target_feed.get('text', '')[:200]}")
print(f"精准链接: {full_url}")
print("" * 50 + "\n")
# 保存标记
self.save_last_id(feed_id)
except Exception as e:
print(f"运行异常: {e}")
if __name__ == "__main__":
# 使用你提供的 hash_id
monitor = DouyuYubaMonitor("PDAP2zEk3nwx")
monitor.fetch_and_process()