551 lines
24 KiB
Python
551 lines
24 KiB
Python
import base64
|
||
import json
|
||
import random
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
|
||
from loguru import logger
|
||
import requests
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
|
||
from utils.revoke.message_auto_revoke import MessageAutoRevoke
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from utils.decorator.points_decorator import plugin_points_cost, points_reward_decorator
|
||
from wechat_ipad import WechatAPIClient
|
||
from db.connection import DBConnectionManager
|
||
|
||
import aiohttp
|
||
from pydub import AudioSegment
|
||
from io import BytesIO
|
||
|
||
from wechat_ipad.models.appmsg_xml import MUSIC_XML
|
||
|
||
|
||
class GuessSongRedisDB:
|
||
"""猜歌名游戏Redis相关操作"""
|
||
|
||
def __init__(self, db_manager: DBConnectionManager):
|
||
self.db_manager = db_manager
|
||
self.prefix = "group:guess_song:"
|
||
|
||
def get_redis_connection(self):
|
||
"""获取Redis连接"""
|
||
return self.db_manager.get_redis_connection()
|
||
|
||
def save_game_session(self, room_id: str, game_data: Dict[str, Any]) -> bool:
|
||
"""保存游戏会话数据"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
logger.info(f"保存猜歌游戏数据{game_data}")
|
||
# 设置过期时间为10分钟
|
||
redis_client.setex(
|
||
f'{self.prefix}{room_id}',
|
||
600, # 10分钟过期
|
||
json.dumps(game_data, ensure_ascii=False)
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"保存猜歌游戏数据出错: {e}")
|
||
return False
|
||
|
||
def get_game_session(self, room_id: str) -> Optional[Dict[str, Any]]:
|
||
"""获取游戏会话数据"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
data = redis_client.get(f'{self.prefix}{room_id}')
|
||
if data:
|
||
if isinstance(data, bytes):
|
||
data = data.decode('utf-8')
|
||
return json.loads(data)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取猜歌游戏数据出错: {e}")
|
||
return None
|
||
|
||
def delete_game_session(self, room_id: str) -> bool:
|
||
"""删除游戏会话数据"""
|
||
try:
|
||
with self.get_redis_connection() as redis_client:
|
||
redis_client.delete(f'{self.prefix}{room_id}')
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"删除猜歌游戏数据出错: {e}")
|
||
return False
|
||
|
||
|
||
class GuessSongPlugin(MessagePluginInterface):
|
||
"""猜歌名游戏插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "GUESS_MUSIC"
|
||
FEATURE_DESCRIPTION = "🎤 猜歌名游戏 [猜歌名 - 开始 | 猜歌名 歌手名 - 指定歌手 | 猜歌名 歌名 - 提交答案]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "猜歌名游戏"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供猜歌名游戏功能,支持指定歌手和猜歌名"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "水牛&AI"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.revoke = None
|
||
self.feature = self.register_feature()
|
||
self.redis_db = None
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
self.db_manager = DBConnectionManager.get_instance()
|
||
|
||
# 初始化Redis操作类
|
||
if self.db_manager:
|
||
self.redis_db = GuessSongRedisDB(self.db_manager)
|
||
|
||
# 从配置文件加载设置
|
||
guess_song_config = self._config.get("GuessSong", {})
|
||
self._commands = guess_song_config.get("command", ["猜歌名"])
|
||
self.command_format = guess_song_config.get("command-format", "猜歌名 [歌手名]/[歌名]")
|
||
self.enable = guess_song_config.get("enable", True)
|
||
|
||
# 加载API配置
|
||
api_config = guess_song_config.get("api", {})
|
||
self.music_api = api_config.get("music_api", "")
|
||
|
||
# 加载游戏配置
|
||
game_config = guess_song_config.get("game", {})
|
||
self.clip_duration = game_config.get("clip_duration", 10)
|
||
self.hint_timeout = game_config.get("hint_timeout", 30)
|
||
|
||
# 加载热门歌手列表
|
||
singers_config = guess_song_config.get("popular_singers", {})
|
||
self.popular_singers = singers_config.get("list", [
|
||
"周杰伦", "林俊杰", "薛之谦", "陈奕迅", "邓紫棋", "李荣浩", "张学友", "刘德华",
|
||
"张国荣", "梅艳芳", "Beyond", "容祖儿", "谢霆锋",
|
||
"王菲", "张惠妹", "孙燕姿", "蔡依林", "五月天"
|
||
])
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="猜歌名游戏")
|
||
@group_feature_rate_limit(max_per_minute=5, feature_key=FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
|
||
# 简化命令处理,只检查开头是否包含命令前缀
|
||
command_found = False
|
||
for cmd in self._commands:
|
||
if content.startswith(cmd):
|
||
command_found = True
|
||
content = content[len(cmd):].strip()
|
||
break
|
||
|
||
if not command_found:
|
||
return False, "不匹配的命令"
|
||
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
self.revoke: MessageAutoRevoke = message.get("revoke")
|
||
|
||
# 使用roomid或sender作为游戏会话ID
|
||
session_id = roomid if roomid else sender
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# 获取当前游戏会话
|
||
current_game = None
|
||
if self.redis_db:
|
||
current_game = self.redis_db.get_game_session(session_id)
|
||
|
||
# 检查是否是"下一首"命令
|
||
if content == "下一首" and current_game and current_game.get("status") == "playing":
|
||
return await self._next_song(bot, session_id, sender)
|
||
|
||
# 如果游戏进行中,且有内容,则视为答案
|
||
if current_game and current_game.get("status") == "playing":
|
||
if content: # 有内容,视为答案
|
||
return await self._check_answer(message)
|
||
else: # 没有内容,提示已有游戏在进行中
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"⚠️ 当前已有猜歌游戏在进行中,请直接回复 [猜歌名 歌名] 进行猜测\n或回复 [猜歌名 下一首] 跳过当前歌曲",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
return True, "已有游戏进行中"
|
||
|
||
# 否则开始新游戏(可以指定歌手或随机)
|
||
return await self._start_new_game(bot, session_id, sender, content if content else None)
|
||
|
||
async def _start_new_game(self, bot: WechatAPIClient, session_id: str, sender: str, singer_name: Optional[str]) -> \
|
||
Tuple[bool, str]:
|
||
"""开始新游戏"""
|
||
try:
|
||
# 搜索歌曲
|
||
song_info = await self._get_random_song(singer_name)
|
||
if not song_info or not song_info.get("play_url"):
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"❌未找到{singer_name or '随机'}歌曲,请重试",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
return False, "未找到歌曲"
|
||
|
||
# 保存游戏会话
|
||
game_data = {
|
||
"status": "playing",
|
||
"song_name": song_info.get("song_name", ""),
|
||
"singer_name": song_info.get("singer_name", ""),
|
||
"start_time": time.time(),
|
||
"hint_given": False,
|
||
"play_url": song_info.get("play_url"),
|
||
"singer_pic": song_info.get('singer_pic', '') or song_info.get('singer_pic', ''),
|
||
"data_url": song_info.get('data_url', '')
|
||
}
|
||
|
||
if self.redis_db:
|
||
self.redis_db.save_game_session(session_id, game_data)
|
||
|
||
# 发送游戏开始消息
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"🎵 猜歌名游戏开始!\n请听10秒钟歌曲片段,然后回复[猜歌名 歌名]来猜测歌曲名称。\n回复[猜歌名 下一首]可跳过当前歌曲。\n歌手: {song_info.get('singer_name', '未知')}",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 30)
|
||
|
||
# 发送10秒音频片段
|
||
await self._send_song_clip(bot, song_info, session_id)
|
||
|
||
return True, "游戏开始"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"开始猜歌游戏出错: {e}")
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"❌开始猜歌游戏出错,请稍后重试",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 5)
|
||
return False, f"处理出错: {e}"
|
||
|
||
@points_reward_decorator(5, "game", "猜歌名游戏", FEATURE_KEY)
|
||
async def _check_answer(self, message: Dict[str, Any]) -> Tuple[bool, str]:
|
||
"""检查答案"""
|
||
try:
|
||
# 从message中提取所需参数
|
||
sender = message.get("sender", "")
|
||
session_id = message.get("roomid", "") or sender # 如果没有roomid,使用sender作为session_id
|
||
content = message.get("content", "").strip() # 从消息内容中获取答案
|
||
bot = message.get("bot")
|
||
answer = None
|
||
for cmd in self._commands:
|
||
if content.startswith(cmd):
|
||
answer = content[len(cmd):].strip()
|
||
break
|
||
|
||
# 从Redis获取游戏数据
|
||
game_data = None
|
||
if self.redis_db:
|
||
game_data = self.redis_db.get_game_session(session_id)
|
||
|
||
if not game_data:
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id, "没有进行中的游戏",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 5)
|
||
return False, "没有进行中的游戏"
|
||
|
||
correct_answer = game_data.get("song_name", "")
|
||
singer_name = game_data.get("singer_name", "")
|
||
# 检查答案是否正确(简单比较,可以改进为模糊匹配)
|
||
if answer.lower() in correct_answer.lower():
|
||
# 游戏结束,发送成功消息
|
||
await bot.send_text_message(session_id, f"🎉 恭喜你猜对了!\n歌曲:{correct_answer}\n歌手:{singer_name}",
|
||
sender)
|
||
|
||
# 删除游戏会话
|
||
if self.redis_db:
|
||
self.redis_db.delete_game_session(session_id)
|
||
await self._send_music_message(bot, game_data, session_id)
|
||
return True, "猜对了"
|
||
else:
|
||
# 答案错误
|
||
# 检查是否需要给提示(超过指定时间且未给过提示)
|
||
current_time = time.time()
|
||
start_time = game_data.get("start_time", 0)
|
||
hint_given = game_data.get("hint_given", False)
|
||
|
||
if not hint_given and (current_time - start_time) > self.hint_timeout:
|
||
# 给出提示(显示歌名的第一个字)
|
||
hint = correct_answer[0] + "*" * (len(correct_answer) - 1)
|
||
await bot.send_text_message(session_id, f"💡 提示:歌名以 '{correct_answer[0]}' 开头")
|
||
|
||
# 更新游戏会话,标记已给出提示
|
||
game_data["hint_given"] = True
|
||
if self.redis_db:
|
||
self.redis_db.save_game_session(session_id, game_data)
|
||
|
||
# 告知用户答案错误
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"❌ 答案错误,请继续猜测!", sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
|
||
return False, "答案错误"
|
||
except Exception as e:
|
||
self.LOG.error(f"检查答案出错: {e}")
|
||
session_id = message.get("roomid", "") or message.get("sender", "")
|
||
sender = message.get("sender", "")
|
||
bot = message.get("bot")
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"❌检查答案出错,请稍后重试", sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
return False, f"处理出错: {e}"
|
||
|
||
async def _get_random_song(self, singer_name: Optional[str]) -> Dict[str, Any]:
|
||
"""获取随机歌曲信息"""
|
||
try:
|
||
# 构建API请求URL - 使用新的聚合搜索API
|
||
if singer_name:
|
||
# 如果指定了歌手,搜索该歌手的歌曲
|
||
api_url = f"{self.music_api}".format(singer_name=singer_name)
|
||
else:
|
||
# 如果没有指定歌手,使用热门歌手列表
|
||
random_singer = random.choice(self.popular_singers)
|
||
api_url = f"{self.music_api}".format(singer_name=random_singer)
|
||
|
||
self.LOG.info(f"请求歌曲API: {api_url}")
|
||
|
||
# 发送请求获取歌曲列表
|
||
response = requests.get(api_url, verify=False)
|
||
|
||
if response.status_code != 200:
|
||
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
|
||
return {}
|
||
|
||
# 解析响应数据
|
||
json_data = response.json()
|
||
if json_data.get("code") != 200:
|
||
self.LOG.error(f"API 返回错误: {json_data.get('message')}")
|
||
return {}
|
||
|
||
# 从歌曲列表中随机选择一首
|
||
song_list = json_data.get("data", {}).get("results", [])
|
||
if not song_list or not isinstance(song_list, list) or len(song_list) == 0:
|
||
self.LOG.error(f"未找到歌曲列表或列表为空")
|
||
return {}
|
||
|
||
# 随机选择一首歌曲
|
||
random_song = random.choice(song_list)
|
||
|
||
# 从API响应中提取所需字段
|
||
song_name = random_song.get('name', '')
|
||
singer_name = random_song.get('artist', '')
|
||
play_url = random_song.get('url', '')
|
||
singer_pic = random_song.get('pic', '')
|
||
data_url = random_song.get('url', '') # 使用相同的URL作为数据链接
|
||
|
||
if not play_url:
|
||
self.LOG.error(f"歌曲播放链接为空")
|
||
return {}
|
||
|
||
# 获取真实的播放链接(跟随重定向)
|
||
real_play_url = self._get_real_url(play_url)
|
||
real_data_url = self._get_real_url(data_url)
|
||
real_singer_pic = self._get_real_url(singer_pic)
|
||
|
||
# 返回完整的歌曲信息
|
||
self.LOG.info(f"成功获取歌曲: {singer_name} - {song_name}")
|
||
self.LOG.debug(f"原始URL: {play_url}")
|
||
self.LOG.debug(f"真实URL: {real_play_url}")
|
||
|
||
return {
|
||
"song_name": song_name,
|
||
"singer_name": singer_name,
|
||
"play_url": real_play_url,
|
||
"singer_pic": real_singer_pic,
|
||
"data_url": real_data_url
|
||
}
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"获取随机歌曲出错: {e}")
|
||
return {}
|
||
|
||
def _get_real_url(self, redirect_url: str) -> str:
|
||
"""跟随重定向获取真实URL"""
|
||
try:
|
||
if not redirect_url:
|
||
return ""
|
||
|
||
# 发送HEAD请求,不跟随重定向,获取真实URL
|
||
response = requests.head(redirect_url, allow_redirects=True, verify=False, timeout=10)
|
||
|
||
# 返回最终的URL(重定向后的真实地址)
|
||
if response.status_code in [200, 301, 302, 303, 307, 308]:
|
||
real_url = response.url
|
||
self.LOG.debug(f"URL重定向: {redirect_url} -> {real_url}")
|
||
return real_url
|
||
else:
|
||
self.LOG.warning(f"获取真实URL失败,状态码: {response.status_code}")
|
||
return redirect_url
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"获取真实URL出错: {e}")
|
||
return redirect_url
|
||
|
||
async def _send_song_clip(self, bot: WechatAPIClient, song_info: Dict[str, Any], session_id: str) -> bool:
|
||
"""发送歌曲片段"""
|
||
try:
|
||
play_url = song_info.get("play_url", "")
|
||
if not play_url:
|
||
return False
|
||
|
||
# 下载歌曲
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(play_url) as resp:
|
||
if resp.status != 200:
|
||
self.LOG.error(f"下载歌曲失败,状态码: {resp.status}")
|
||
return False
|
||
|
||
# 读取音频数据
|
||
audio_data = await resp.read()
|
||
|
||
try:
|
||
# 使用pydub处理音频
|
||
audio = AudioSegment.from_file(BytesIO(audio_data))
|
||
|
||
# 截取前N秒
|
||
clip_duration = self.clip_duration * 1000 # 转换为毫秒
|
||
audio_clip = audio[:clip_duration]
|
||
|
||
# 将音频片段转换为字节
|
||
output = BytesIO()
|
||
audio_clip.export(output, format="mp3")
|
||
clip_bytes = output.getvalue()
|
||
|
||
await bot.send_voice_message(session_id, clip_bytes, "mp3")
|
||
return True
|
||
|
||
except Exception as audio_error:
|
||
self.LOG.error(f"处理音频出错: {audio_error}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"发送歌曲片段出错: {e}")
|
||
return False
|
||
|
||
async def _send_music_message(self, bot: WechatAPIClient, song_info: Dict[str, Any], receiver: str) -> bool:
|
||
"""发送音乐消息"""
|
||
try:
|
||
song_name = song_info.get("song_name", "")
|
||
singer_name = song_info.get("singer_name", "")
|
||
play_url = song_info.get("play_url", "")
|
||
singer_pic = song_info.get("singer_pic", "")
|
||
data_url = song_info.get("data_url", "")
|
||
|
||
xml_message = f"{MUSIC_XML}".format(song_name=song_name, singer_name=singer_name, play_url=play_url,
|
||
data_url=data_url, singer_pic=singer_pic)
|
||
self.LOG.debug(f"发送音乐消息:{xml_message}")
|
||
res = await bot.send_app_message(wxid=receiver, xml=xml_message, type=0)
|
||
self.LOG.debug(f"发送音乐消息 res:{res}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"发送音乐消息出错: {e}")
|
||
return False
|
||
|
||
async def _next_song(self, bot: WechatAPIClient, session_id: str, sender: str) -> Tuple[bool, str]:
|
||
"""跳过当前歌曲,播放下一首"""
|
||
try:
|
||
# 获取当前游戏数据
|
||
current_game = None
|
||
if self.redis_db:
|
||
current_game = self.redis_db.get_game_session(session_id)
|
||
|
||
if not current_game or current_game.get("status") != "playing":
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"⚠️ 当前没有进行中的猜歌游戏",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
return False, "没有进行中的游戏"
|
||
|
||
# 显示当前歌曲答案
|
||
correct_answer = current_game.get("song_name", "")
|
||
singer_name = current_game.get("singer_name", "")
|
||
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"⏭️ 已跳过当前歌曲\n歌曲:{correct_answer}\n歌手:{singer_name}",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 10)
|
||
# 发送完整音乐
|
||
await self._send_music_message(bot, current_game, session_id)
|
||
|
||
# 删除当前游戏会话
|
||
if self.redis_db:
|
||
self.redis_db.delete_game_session(session_id)
|
||
|
||
# 开始新游戏
|
||
return await self._start_new_game(bot, session_id, sender, None)
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"跳过当前歌曲出错: {e}")
|
||
client_msg_id, create_time, new_msg_id = await bot.send_text_message(session_id,
|
||
f"❌跳过当前歌曲出错,请稍后重试",
|
||
sender)
|
||
self.revoke.add_message_to_revoke(session_id, client_msg_id, create_time, new_msg_id, 4)
|
||
return False, f"处理出错: {e}"
|