696 lines
30 KiB
Python
696 lines
30 KiB
Python
import asyncio
|
||
import json
|
||
from datetime import datetime
|
||
import os
|
||
import threading
|
||
import time
|
||
from typing import Dict, Any, List, Optional, Tuple, Set
|
||
|
||
import aiohttp
|
||
from loguru import logger
|
||
import ssl
|
||
import zlib
|
||
|
||
try:
|
||
import websocket
|
||
except ImportError:
|
||
websocket = None
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from db.connection import DBConnectionManager
|
||
from utils.decorator.async_job import async_job
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.appmsg_xml import DOUYU_MESSAGE_XML
|
||
|
||
|
||
class DouyuDanmuRecorder:
|
||
def __init__(self, room_id: str, user_agent: str):
|
||
self.room_id = room_id
|
||
self.user_agent = user_agent
|
||
self._thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
self._ws: Optional[websocket.WebSocketApp] = None
|
||
self._buffer: List[str] = []
|
||
self._buffer_limit = 10
|
||
self._buffer_date: Optional[str] = None
|
||
self._lock = threading.Lock()
|
||
self._websocket_available = websocket is not None
|
||
|
||
def _encode(self, msg: str) -> bytes:
|
||
content = msg.encode("utf-8") + b"\x00"
|
||
length = len(content) + 8
|
||
head = length.to_bytes(4, "little") * 2
|
||
head += (689).to_bytes(2, "little")
|
||
head += b"\x00\x00"
|
||
return head + content
|
||
|
||
def _on_message(self, ws, message):
|
||
try:
|
||
decompressed = zlib.decompress(message, -zlib.MAX_WBITS)
|
||
data = decompressed.decode("utf-8", errors="ignore")
|
||
except Exception:
|
||
data = message.decode("utf-8", errors="ignore")
|
||
for line in data.split("\x00"):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
if "type@=chatmsg" not in line:
|
||
continue
|
||
parts: Dict[str, Any] = {}
|
||
for pair in line.split("/"):
|
||
if "@=" in pair:
|
||
key, value = pair.split("@=", 1)
|
||
parts[key] = value
|
||
nick = parts.get("nn", "未知")
|
||
txt = parts.get("txt", "")
|
||
uid = parts.get("uid", "未知")
|
||
level = parts.get("level", "0")
|
||
fan_group = parts.get("bnn", "")
|
||
fan_level = parts.get("bl", "0")
|
||
time_stamp = parts.get("cst", "")
|
||
if time_stamp:
|
||
try:
|
||
if time_stamp.isdigit():
|
||
ts = int(time_stamp)
|
||
if ts > 10 ** 12:
|
||
ts = ts / 1000
|
||
dt = datetime.fromtimestamp(ts)
|
||
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
dt = datetime.strptime(time_stamp, "%Y-%m-%d %H:%M:%S")
|
||
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except Exception:
|
||
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
output = f"[{time_str}] {nick} (UID: {uid}, Lv{level}"
|
||
if fan_group:
|
||
output += f" / {fan_group} Lv{fan_level}"
|
||
output += f"):{txt}"
|
||
self._append_and_maybe_flush(output)
|
||
|
||
def _flush_locked(self):
|
||
if not self._buffer or self._buffer_date is None:
|
||
return
|
||
dir_path = os.path.join("temp", "douyu_danmu", self._buffer_date)
|
||
os.makedirs(dir_path, exist_ok=True)
|
||
file_name = os.path.join(dir_path, f"{self.room_id}_{self._buffer_date}.txt")
|
||
data = "\n".join(self._buffer) + "\n"
|
||
with open(file_name, "a", encoding="utf-8") as f:
|
||
f.write(data)
|
||
self._buffer.clear()
|
||
|
||
def _append_and_maybe_flush(self, line: str):
|
||
now = datetime.now()
|
||
date_str = now.strftime("%Y%m%d")
|
||
with self._lock:
|
||
if self._buffer_date is None:
|
||
self._buffer_date = date_str
|
||
elif date_str != self._buffer_date:
|
||
self._flush_locked()
|
||
self._buffer_date = date_str
|
||
self._buffer.append(line)
|
||
if len(self._buffer) >= self._buffer_limit:
|
||
self._flush_locked()
|
||
|
||
def _flush(self):
|
||
with self._lock:
|
||
self._flush_locked()
|
||
|
||
def _on_open(self, ws):
|
||
ws.send(self._encode(f"type@=loginreq/roomid@={self.room_id}/dmbt@=chrome/dmbv@=0/"))
|
||
ws.send(self._encode(f"type@=joingroup/rid@={self.room_id}/gid@={self.room_id}/"))
|
||
|
||
def heartbeat():
|
||
while ws.sock and ws.sock.connected and not self._stop_event.is_set():
|
||
try:
|
||
ws.send(self._encode("type@=mrkl/"))
|
||
except Exception:
|
||
break
|
||
time.sleep(38)
|
||
|
||
threading.Thread(target=heartbeat, daemon=True).start()
|
||
|
||
def _on_error(self, ws, error):
|
||
logger.error(f"斗鱼弹幕错误({self.room_id}): {error}")
|
||
|
||
def _on_close(self, ws, code, msg):
|
||
logger.info(f"斗鱼弹幕连接关闭({self.room_id}): {code} {msg}")
|
||
|
||
def _run(self):
|
||
if not self._websocket_available:
|
||
logger.error(f"websocket-client 未安装,无法记录弹幕({self.room_id})")
|
||
return
|
||
try:
|
||
websocket.enableTrace(False)
|
||
ws_urls = [
|
||
"wss://danmuproxy.douyu.com:8501/",
|
||
"wss://danmuproxy.douyu.com:8502/",
|
||
"wss://danmuproxy.douyu.com:8503/",
|
||
"wss://danmuproxy.douyu.com:8504/",
|
||
"wss://danmuproxy.douyu.com:8505/",
|
||
"wss://danmuproxy.douyu.com:8506/",
|
||
]
|
||
sslopt = {
|
||
"cert_reqs": ssl.CERT_NONE,
|
||
"ssl_version": ssl.PROTOCOL_TLS_CLIENT,
|
||
"ciphers": "DEFAULT@SECLEVEL=1",
|
||
}
|
||
headers = {"User-Agent": self.user_agent}
|
||
for url in ws_urls:
|
||
if self._stop_event.is_set():
|
||
break
|
||
try:
|
||
self._ws = websocket.WebSocketApp(
|
||
url,
|
||
on_open=self._on_open,
|
||
on_message=self._on_message,
|
||
on_error=self._on_error,
|
||
on_close=self._on_close,
|
||
header=headers,
|
||
)
|
||
self._ws.run_forever(sslopt=sslopt, ping_interval=30, ping_timeout=10)
|
||
except Exception as e:
|
||
logger.error(f"斗鱼弹幕连接失败({self.room_id}): {e}")
|
||
continue
|
||
finally:
|
||
self._ws = None
|
||
if self._stop_event.is_set():
|
||
break
|
||
time.sleep(1)
|
||
finally:
|
||
self._ws = None
|
||
|
||
def start(self):
|
||
if self._thread and self._thread.is_alive():
|
||
return
|
||
self._stop_event.clear()
|
||
self._thread = threading.Thread(target=self._run, daemon=True)
|
||
self._thread.start()
|
||
|
||
def stop(self):
|
||
self._flush()
|
||
self._stop_event.set()
|
||
if self._ws:
|
||
try:
|
||
self._ws.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
class DouyuRedisManager:
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
self.redis = db_manager.get_redis_connection()
|
||
self.prefix = "bot:douyu:"
|
||
|
||
def add_group_room(self, group_id: str, room_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:rooms"
|
||
return self.redis.sadd(key, room_id) >= 0
|
||
|
||
def remove_group_room(self, group_id: str, room_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:rooms"
|
||
return self.redis.srem(key, room_id) >= 0
|
||
|
||
def list_group_rooms(self, group_id: str) -> List[str]:
|
||
key = f"{self.prefix}group:{group_id}:rooms"
|
||
rooms = self.redis.smembers(key) or set()
|
||
result = []
|
||
for r in rooms:
|
||
result.append(r.decode("utf-8") if isinstance(r, bytes) else r)
|
||
return sorted(result)
|
||
|
||
def all_subscribed_rooms(self) -> Set[str]:
|
||
groups = GroupBotManager.get_group_list()
|
||
rooms: Set[str] = set()
|
||
for gid in groups:
|
||
for r in self.list_group_rooms(gid):
|
||
rooms.add(r)
|
||
return rooms
|
||
|
||
def groups_for_room(self, room_id: str) -> List[str]:
|
||
groups = GroupBotManager.get_group_list()
|
||
res = []
|
||
for gid in groups:
|
||
if room_id in set(self.list_group_rooms(gid)):
|
||
res.append(gid)
|
||
return res
|
||
|
||
# --- 鱼吧相关方法 ---
|
||
def add_group_yuba(self, group_id: str, hash_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:yubas"
|
||
return self.redis.sadd(key, hash_id) >= 0
|
||
|
||
def remove_group_yuba(self, group_id: str, hash_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:yubas"
|
||
return self.redis.srem(key, hash_id) >= 0
|
||
|
||
def list_group_yubas(self, group_id: str) -> List[str]:
|
||
key = f"{self.prefix}group:{group_id}:yubas"
|
||
yubas = self.redis.smembers(key) or set()
|
||
result = []
|
||
for y in yubas:
|
||
result.append(y.decode("utf-8") if isinstance(y, bytes) else y)
|
||
return sorted(result)
|
||
|
||
def all_subscribed_yubas(self) -> Set[str]:
|
||
groups = GroupBotManager.get_group_list()
|
||
yubas: Set[str] = set()
|
||
for gid in groups:
|
||
for y in self.list_group_yubas(gid):
|
||
yubas.add(y)
|
||
return yubas
|
||
|
||
def groups_for_yuba(self, hash_id: str) -> List[str]:
|
||
groups = GroupBotManager.get_group_list()
|
||
res = []
|
||
for gid in groups:
|
||
if hash_id in set(self.list_group_yubas(gid)):
|
||
res.append(gid)
|
||
return res
|
||
|
||
def get_yuba_last_id(self, hash_id: str) -> Optional[str]:
|
||
key = f"{self.prefix}yuba_last_id:{hash_id}"
|
||
data = self.redis.get(key)
|
||
if not data:
|
||
return None
|
||
return data.decode("utf-8") if isinstance(data, bytes) else data
|
||
|
||
def set_yuba_last_id(self, hash_id: str, feed_id: str) -> bool:
|
||
key = f"{self.prefix}yuba_last_id:{hash_id}"
|
||
return self.redis.set(key, feed_id)
|
||
|
||
# --- 提醒名单方法 ---
|
||
def add_group_subscriber(self, group_id: str, user_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:subscribers"
|
||
return self.redis.sadd(key, user_id) >= 0
|
||
|
||
def remove_group_subscriber(self, group_id: str, user_id: str) -> bool:
|
||
key = f"{self.prefix}group:{group_id}:subscribers"
|
||
return self.redis.srem(key, user_id) >= 0
|
||
|
||
def list_group_subscribers(self, group_id: str) -> List[str]:
|
||
key = f"{self.prefix}group:{group_id}:subscribers"
|
||
subs = self.redis.smembers(key) or set()
|
||
result = []
|
||
for s in subs:
|
||
result.append(s.decode("utf-8") if isinstance(s, bytes) else s)
|
||
return sorted(result)
|
||
|
||
def get_room_status(self, room_id: str) -> Optional[Dict[str, Any]]:
|
||
key = f"{self.prefix}room_status:{room_id}"
|
||
data = self.redis.get(key)
|
||
if not data:
|
||
return None
|
||
if isinstance(data, bytes):
|
||
data = data.decode("utf-8")
|
||
try:
|
||
return json.loads(data)
|
||
except Exception:
|
||
return None
|
||
|
||
def set_room_status(self, room_id: str, status: Dict[str, Any]) -> bool:
|
||
key = f"{self.prefix}room_status:{room_id}"
|
||
return self.redis.set(key, json.dumps(status, ensure_ascii=False))
|
||
|
||
|
||
class DouyuPlugin(MessagePluginInterface):
|
||
FEATURE_KEY = "DOUYU_MONITOR"
|
||
FEATURE_DESCRIPTION = "🎮 斗鱼开播提醒 [订阅斗鱼 房间号, 取消订阅斗鱼 房间号]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "斗鱼直播"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "斗鱼主播开播下播提醒与群订阅管理"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "ABOT Team"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return ""
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.bot: WechatAPIClient = None
|
||
self.feature = self.register_feature()
|
||
self.redis_manager: Optional[DouyuRedisManager] = None
|
||
self._commands = ["斗鱼订阅", "取消斗鱼订阅", "斗鱼订阅列表", "斗鱼订阅提醒", "取消斗鱼订阅提醒",
|
||
"订阅鱼吧", "取消订阅鱼吧", "鱼吧订阅列表"]
|
||
self._api_template = "https://www.douyu.com/betard/{room_id}"
|
||
self._yuba_api = "https://yuba.douyu.com/wgapi/yubanc/api/feed/getUserFeedList"
|
||
self._user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
||
self._check_interval = 5
|
||
self._danmu_recorders: Dict[str, DouyuDanmuRecorder] = {}
|
||
async_job.every_minutes(self._check_interval)(self._scheduled_unified_check_job)
|
||
|
||
async def _scheduled_unified_check_job(self):
|
||
"""统一检查直播和鱼吧动态"""
|
||
await self._scheduled_check_job()
|
||
await self._scheduled_yuba_check_job()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
try:
|
||
dbm = DBConnectionManager.get_instance()
|
||
self.redis_manager = DouyuRedisManager(dbm)
|
||
cfg = self._config.get("Douyu", {})
|
||
cfg_cmds = cfg.get("command", [])
|
||
if isinstance(cfg_cmds, list) and cfg_cmds:
|
||
self._commands = list(dict.fromkeys(cfg_cmds + self._commands))
|
||
self._api_template = cfg.get("api_url_template", self._api_template)
|
||
self._user_agent = cfg.get("user_agent", self._user_agent)
|
||
self._check_interval = int(cfg.get("check_interval_minutes", self._check_interval))
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"{self.name} 初始化失败: {e}")
|
||
return False
|
||
|
||
def start(self) -> bool:
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
for recorder in getattr(self, "_danmu_recorders", {}).values():
|
||
recorder.stop()
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
content = str(message.get("content", "")).strip()
|
||
if not content:
|
||
return False
|
||
first_token = content.split()[0]
|
||
return first_token in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="斗鱼直播")
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
content = str(message.get("content", "")).strip()
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
self.bot: WechatAPIClient = message.get("bot")
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
first_token = content.split()[0]
|
||
if content == "斗鱼订阅列表":
|
||
rooms = self.redis_manager.list_group_rooms(roomid or sender)
|
||
if not rooms:
|
||
await self.bot.send_text_message(roomid or sender, "暂无订阅", sender)
|
||
return True, "暂无订阅"
|
||
text = "当前订阅的斗鱼房间:\n" + "\n".join(rooms)
|
||
await self.bot.send_text_message(roomid or sender, text, sender)
|
||
return True, "列表已发送"
|
||
if first_token == "斗鱼订阅提醒":
|
||
if not roomid:
|
||
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
|
||
return True, "仅支持群聊"
|
||
ok = self.redis_manager.add_group_subscriber(roomid, sender)
|
||
await self.bot.send_at_message(roomid, "已加入斗鱼订阅提醒名单", [sender])
|
||
return True, "加入提醒名单成功" if ok else "加入提醒名单失败"
|
||
if first_token == "取消斗鱼订阅提醒":
|
||
if not roomid:
|
||
await self.bot.send_text_message(sender, "请在群聊中使用该命令", sender)
|
||
return True, "仅支持群聊"
|
||
ok = self.redis_manager.remove_group_subscriber(roomid, sender)
|
||
await self.bot.send_at_message(roomid, "已取消斗鱼订阅提醒", [sender])
|
||
return True, "取消提醒成功" if ok else "取消提醒失败"
|
||
if first_token == "斗鱼订阅":
|
||
parts = content.split()
|
||
if len(parts) < 2:
|
||
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:订阅斗鱼 7718843", sender)
|
||
return True, "命令格式错误"
|
||
room_id = parts[1].strip()
|
||
if not room_id.isdigit():
|
||
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:斗鱼订阅 52876", sender)
|
||
return True, "命令格式错误"
|
||
ok = self.redis_manager.add_group_room(roomid or sender, room_id)
|
||
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼房间 {room_id}", sender)
|
||
return True, "订阅成功" if ok else "订阅失败"
|
||
if first_token == "取消斗鱼订阅":
|
||
parts = content.split()
|
||
if len(parts) < 2:
|
||
await self.bot.send_text_message(roomid or sender, "请提供房间号,例如:取消订阅斗鱼 7718843", sender)
|
||
return True, "命令格式错误"
|
||
room_id = parts[1].strip()
|
||
if not room_id.isdigit():
|
||
await self.bot.send_text_message(roomid or sender, "房间号必须是数字,例如:取消斗鱼订阅 52876", sender)
|
||
return True, "命令格式错误"
|
||
ok = self.redis_manager.remove_group_room(roomid or sender, room_id)
|
||
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼房间 {room_id}", sender)
|
||
return True, "取消成功" if ok else "取消失败"
|
||
|
||
if content == "鱼吧订阅列表":
|
||
yubas = self.redis_manager.list_group_yubas(roomid or sender)
|
||
if not yubas:
|
||
await self.bot.send_text_message(roomid or sender, "暂无鱼吧订阅", sender)
|
||
return True, "暂无鱼吧订阅"
|
||
text = "当前订阅的斗鱼鱼吧:\n" + "\n".join(yubas)
|
||
await self.bot.send_text_message(roomid or sender, text, sender)
|
||
return True, "列表已发送"
|
||
|
||
if first_token == "订阅鱼吧":
|
||
parts = content.split()
|
||
if len(parts) < 2:
|
||
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:订阅鱼吧 PDAP2zEk3nwx",
|
||
sender)
|
||
return True, "命令格式错误"
|
||
hash_id = parts[1].strip()
|
||
ok = self.redis_manager.add_group_yuba(roomid or sender, hash_id)
|
||
await self.bot.send_text_message(roomid or sender, f"✅ 已订阅斗鱼鱼吧 {hash_id}", sender)
|
||
return True, "订阅成功" if ok else "订阅失败"
|
||
|
||
if first_token == "取消订阅鱼吧":
|
||
parts = content.split()
|
||
if len(parts) < 2:
|
||
await self.bot.send_text_message(roomid or sender, "请提供鱼吧 hash_id,例如:取消订阅鱼吧 PDAP2zEk3nwx",
|
||
sender)
|
||
return True, "命令格式错误"
|
||
hash_id = parts[1].strip()
|
||
ok = self.redis_manager.remove_group_yuba(roomid or sender, hash_id)
|
||
await self.bot.send_text_message(roomid or sender, f"✅ 已取消订阅斗鱼鱼吧 {hash_id}", sender)
|
||
return True, "取消成功" if ok else "取消失败"
|
||
|
||
return False, None
|
||
|
||
async def _scheduled_check_job(self):
|
||
try:
|
||
rooms = self.redis_manager.all_subscribed_rooms()
|
||
if not rooms:
|
||
return
|
||
async with aiohttp.ClientSession() as session:
|
||
for room_id in rooms:
|
||
try:
|
||
url = self._api_template.format(room_id=room_id)
|
||
headers = {
|
||
"User-Agent": self._user_agent,
|
||
"Referer": f"https://www.douyu.com/{room_id}"
|
||
}
|
||
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
||
data = await resp.json(content_type=None)
|
||
room_info = data.get("room", {}) if isinstance(data, dict) else {}
|
||
show_status = room_info.get("show_status")
|
||
nickname = room_info.get("nickname", "")
|
||
room_name = room_info.get("room_name", "")
|
||
avatar = room_info.get("avatar", {}) or {}
|
||
thumb_url = str(avatar.get("small", "") or "").strip().strip("`").strip()
|
||
video_loop_raw = room_info.get("videoLoop", 0)
|
||
try:
|
||
video_loop = int(str(video_loop_raw))
|
||
except Exception:
|
||
video_loop = 0
|
||
prev = self.redis_manager.get_room_status(room_id) or {}
|
||
prev_live = prev.get("is_live")
|
||
curr_live = True if show_status == 1 and video_loop == 0 else False
|
||
status_obj = {
|
||
"is_live": curr_live,
|
||
"nickname": nickname,
|
||
"room_name": room_name,
|
||
"is_loop": True if video_loop == 1 else False
|
||
}
|
||
self.redis_manager.set_room_status(room_id, status_obj)
|
||
if prev_live is None and curr_live is False:
|
||
continue
|
||
if prev_live is None and curr_live is True:
|
||
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
|
||
continue
|
||
if prev_live is False and curr_live is True:
|
||
await self._notify_groups_live(room_id, nickname, room_name, thumb_url)
|
||
continue
|
||
if prev_live is True and curr_live is False:
|
||
await self._notify_groups_offline(room_id, nickname, room_name, video_loop == 1)
|
||
continue
|
||
if prev_live is True and curr_live is True and room_id not in self._danmu_recorders:
|
||
try:
|
||
logger.info(f"检测到持续直播状态,补偿启动斗鱼弹幕记录({room_id})")
|
||
self._start_danmu_record(room_id)
|
||
except Exception as e:
|
||
logger.error(f"补偿启动斗鱼弹幕记录失败({room_id}): {e}")
|
||
continue
|
||
await asyncio.sleep(0.1)
|
||
except Exception as e:
|
||
logger.error(f"斗鱼检查失败: {e}")
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"斗鱼定时任务异常: {e}")
|
||
|
||
async def _notify_groups_live(self, room_id: str, nickname: str, room_name: str, thumb_url: str):
|
||
groups = self.redis_manager.groups_for_room(room_id)
|
||
text = f"🚀 斗鱼开播通知 \n🎤 {nickname} 正在直播中!\n 📌 房间标题:{room_name} \n 👉 点击观看:https://www.douyu.com/{room_id}"
|
||
xml_content = DOUYU_MESSAGE_XML.format(title=room_name, liver=nickname, roomid=room_id, thumburl=thumb_url)
|
||
for gid in groups:
|
||
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
|
||
try:
|
||
subs = self.redis_manager.list_group_subscribers(gid)
|
||
if subs:
|
||
await self.bot.send_at_message(gid, text, subs)
|
||
else:
|
||
await self.bot.send_text_message(gid, text)
|
||
await self.bot.send_link_xml_message(xml_content, gid)
|
||
except Exception as e:
|
||
logger.error(f"发送斗鱼开播提醒失败: {e}")
|
||
continue
|
||
try:
|
||
logger.info(f"启动斗鱼弹幕记录({room_id})")
|
||
self._start_danmu_record(room_id)
|
||
except Exception as e:
|
||
logger.error(f"启动斗鱼弹幕记录失败({room_id}): {e}")
|
||
|
||
async def _notify_groups_offline(self, room_id: str, nickname: str, room_name: str, is_loop: bool = False):
|
||
groups = self.redis_manager.groups_for_room(room_id)
|
||
text = f"🔔 斗鱼提醒:{nickname} 下播啦~\n 🏷️ {room_name}"
|
||
if is_loop:
|
||
text += "(当前为轮播)"
|
||
for gid in groups:
|
||
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
|
||
try:
|
||
await self.bot.send_text_message(gid, text)
|
||
except Exception as e:
|
||
logger.error(f"发送斗鱼下播提醒失败: {e}")
|
||
continue
|
||
try:
|
||
logger.info(f"停止斗鱼弹幕记录({room_id})")
|
||
self._stop_danmu_record(room_id)
|
||
except Exception as e:
|
||
logger.error(f"停止斗鱼弹幕记录失败({room_id}): {e}")
|
||
|
||
async def _scheduled_yuba_check_job(self):
|
||
try:
|
||
yubas = self.redis_manager.all_subscribed_yubas()
|
||
if not yubas:
|
||
return
|
||
async with aiohttp.ClientSession() as session:
|
||
for hash_id in yubas:
|
||
try:
|
||
params = {
|
||
"filter_type": 1,
|
||
"hash_id": hash_id,
|
||
"limit": 10,
|
||
"offset": 0
|
||
}
|
||
headers = {
|
||
"User-Agent": self._user_agent,
|
||
"Referer": f"https://yuba.douyu.com/member/{hash_id}/main/news",
|
||
}
|
||
async with session.get(self._yuba_api, headers=headers, params=params,
|
||
timeout=aiohttp.ClientTimeout(total=10)) as resp:
|
||
data = await resp.json(content_type=None)
|
||
|
||
if data.get("error") != 0:
|
||
logger.error(f"斗鱼鱼吧 API 错误 ({hash_id}): {data.get('msg')}")
|
||
continue
|
||
|
||
feed_list = data.get("data", {}).get("feed_list", [])
|
||
# 查找第一条【非置顶】动态
|
||
target_feed = None
|
||
for feed in feed_list:
|
||
if feed.get("home_feed_top") == 1:
|
||
continue
|
||
target_feed = feed
|
||
break
|
||
|
||
if not target_feed:
|
||
continue
|
||
|
||
feed_id = str(target_feed.get("feed_id"))
|
||
last_id = self.redis_manager.get_yuba_last_id(hash_id)
|
||
|
||
if last_id and feed_id == last_id:
|
||
continue
|
||
|
||
# 发现新动态
|
||
nickname = target_feed.get("publisher", {}).get("nickname", "未知主播")
|
||
content = target_feed.get("text", "")
|
||
ctime = target_feed.get("ctime")
|
||
from datetime import datetime
|
||
publish_time = datetime.fromtimestamp(int(ctime)).strftime(
|
||
'%Y-%m-%d %H:%M:%S') if ctime else "未知时间"
|
||
|
||
# 限制内容长度
|
||
if len(content) > 200:
|
||
content = content[:200] + "..."
|
||
|
||
full_url = f"https://yuba.douyu.com/feed/{feed_id}"
|
||
|
||
await self._notify_groups_yuba(hash_id, nickname, content, full_url, publish_time)
|
||
|
||
# 保存标记
|
||
self.redis_manager.set_yuba_last_id(hash_id, feed_id)
|
||
|
||
await asyncio.sleep(0.5)
|
||
except Exception as e:
|
||
logger.error(f"检查斗鱼鱼吧 ({hash_id}) 失败: {e}")
|
||
continue
|
||
except Exception as e:
|
||
logger.error(f"斗鱼鱼吧定时任务异常: {e}")
|
||
|
||
async def _notify_groups_yuba(self, hash_id: str, nickname: str, content: str, url: str,
|
||
publish_time: str = "未知时间"):
|
||
groups = self.redis_manager.groups_for_yuba(hash_id)
|
||
text = f"🌟 斗鱼鱼吧动态提醒 \n👤 主播:{nickname}\n⏰ 时间:{publish_time}\n📝 内容:{content}\n🔗 链接:{url}"
|
||
for gid in groups:
|
||
if GroupBotManager.get_group_permission(gid, self.feature) == PermissionStatus.ENABLED:
|
||
try:
|
||
await self.bot.send_text_message(gid, text)
|
||
except Exception as e:
|
||
logger.error(f"发送斗鱼鱼吧动态提醒失败: {e}")
|
||
continue
|
||
|
||
def _get_danmu_recorder(self, room_id: str) -> DouyuDanmuRecorder:
|
||
recorder = self._danmu_recorders.get(room_id)
|
||
if not recorder:
|
||
recorder = DouyuDanmuRecorder(room_id, self._user_agent)
|
||
self._danmu_recorders[room_id] = recorder
|
||
return recorder
|
||
|
||
def _start_danmu_record(self, room_id: str):
|
||
recorder = self._get_danmu_recorder(room_id)
|
||
recorder.start()
|
||
|
||
def _stop_danmu_record(self, room_id: str):
|
||
recorder = self._danmu_recorders.get(room_id)
|
||
if recorder:
|
||
recorder.stop()
|