import base64
import json
import random
import time
from datetime import datetime, timedelta
from loguru import logger
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost, points_reward_decorator
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
import aiohttp
from pydub import AudioSegment
from io import BytesIO
class GuessSongRedisDB:
"""猜歌名游戏Redis相关操作"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.prefix = "group:guess_song:"
def get_redis_connection(self):
"""获取Redis连接"""
return self.db_manager.get_redis_connection()
def save_game_session(self, room_id: str, game_data: Dict[str, Any]) -> bool:
"""保存游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
logger.info(f"保存猜歌游戏数据{game_data}")
# 设置过期时间为10分钟
redis_client.setex(
f'{self.prefix}{room_id}',
600, # 10分钟过期
json.dumps(game_data, ensure_ascii=False)
)
return True
except Exception as e:
logger.error(f"保存猜歌游戏数据出错: {e}")
return False
def get_game_session(self, room_id: str) -> Optional[Dict[str, Any]]:
"""获取游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
data = redis_client.get(f'{self.prefix}{room_id}')
if data:
if isinstance(data, bytes):
data = data.decode('utf-8')
return json.loads(data)
return None
except Exception as e:
logger.error(f"获取猜歌游戏数据出错: {e}")
return None
def delete_game_session(self, room_id: str) -> bool:
"""删除游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
redis_client.delete(f'{self.prefix}{room_id}')
return True
except Exception as e:
logger.error(f"删除猜歌游戏数据出错: {e}")
return False
class GuessSongPlugin(MessagePluginInterface):
"""猜歌名游戏插件"""
@property
def name(self) -> str:
return "猜歌名游戏"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供猜歌名游戏功能,支持指定歌手和猜歌名"
@property
def author(self) -> str:
return "水牛&AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.redis_db = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = DBConnectionManager.get_instance()
# 初始化Redis操作类
if self.db_manager:
self.redis_db = GuessSongRedisDB(self.db_manager)
self._commands = self._config.get("GuessSong", {}).get("command", ["猜歌名"])
self.command_format = self._config.get("GuessSong", {}).get("command-format", "猜歌名 [歌手名]/[歌名]")
self.enable = self._config.get("GuessSong", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="猜歌名游戏")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}:{content}")
# 简化命令处理,只检查开头是否包含命令前缀
command_found = False
for cmd in self._commands:
if content.startswith(cmd):
command_found = True
content = content[len(cmd):].strip()
break
if not command_found:
return False, "不匹配的命令"
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 使用roomid或sender作为游戏会话ID
session_id = roomid if roomid else sender
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.GUESS_MUSIC) == PermissionStatus.DISABLED:
return False, "没有权限"
# 获取当前游戏会话
current_game = None
if self.redis_db:
current_game = self.redis_db.get_game_session(session_id)
# 检查是否是"下一首"命令
if content == "下一首" and current_game and current_game.get("status") == "playing":
return await self._next_song(bot, session_id, sender)
# 如果游戏进行中,且有内容,则视为答案
if current_game and current_game.get("status") == "playing":
if content: # 有内容,视为答案
return await self._check_answer(message)
else: # 没有内容,提示已有游戏在进行中
await bot.send_text_message(session_id,
f"⚠️ 当前已有猜歌游戏在进行中,请直接回复 [猜歌名 歌名] 进行猜测\n或回复 [猜歌名 下一首] 跳过当前歌曲", sender)
return True, "已有游戏进行中"
# 否则开始新游戏(可以指定歌手或随机)
return await self._start_new_game(bot, session_id, sender, content if content else None)
async def _start_new_game(self, bot: WechatAPIClient, session_id: str, sender: str, singer_name: Optional[str]) -> \
Tuple[bool, str]:
"""开始新游戏"""
try:
# 搜索歌曲
song_info = await self._get_random_song(singer_name)
if not song_info or not song_info.get("play_url"):
await bot.send_text_message(session_id, f"❌未找到{singer_name or '随机'}歌曲,请重试", sender)
return False, "未找到歌曲"
# 保存游戏会话
game_data = {
"status": "playing",
"song_name": song_info.get("song_name", ""),
"singer_name": song_info.get("singer_name", ""),
"start_time": time.time(),
"hint_given": False,
"play_url": song_info.get("play_url"),
"singer_pic": song_info.get('singer_pic', '') or song_info.get('singer_pic', ''),
"data_url": song_info.get('data_url', '')
}
if self.redis_db:
self.redis_db.save_game_session(session_id, game_data)
# 发送游戏开始消息
await bot.send_text_message(session_id,
f"🎵 猜歌名游戏开始!\n请听10秒钟歌曲片段,然后回复[猜歌名 歌名]来猜测歌曲名称。\n回复[猜歌名 下一首]可跳过当前歌曲。\n歌手: {song_info.get('singer_name', '未知')}",
sender)
# 发送10秒音频片段
await self._send_song_clip(bot, song_info, session_id)
return True, "游戏开始"
except Exception as e:
self.LOG.error(f"开始猜歌游戏出错: {e}")
await bot.send_text_message(session_id, f"❌开始猜歌游戏出错,请稍后重试", sender)
return False, f"处理出错: {e}"
@points_reward_decorator(5, "game", "猜歌名游戏", Feature.GUESS_MUSIC)
async def _check_answer(self, message: Dict[str, Any]) -> Tuple[bool, str]:
"""检查答案"""
try:
# 从message中提取所需参数
sender = message.get("sender", "")
session_id = message.get("roomid", "") or sender # 如果没有roomid,使用sender作为session_id
content = message.get("content", "").strip() # 从消息内容中获取答案
bot = message.get("bot")
answer = None
for cmd in self._commands:
if content.startswith(cmd):
answer = content[len(cmd):].strip()
break
# 从Redis获取游戏数据
game_data = None
if self.redis_db:
game_data = self.redis_db.get_game_session(session_id)
if not game_data:
return False, "没有进行中的游戏"
correct_answer = game_data.get("song_name", "")
singer_name = game_data.get("singer_name", "")
# 检查答案是否正确(简单比较,可以改进为模糊匹配)
if answer.lower() in correct_answer.lower():
# 游戏结束,发送成功消息
await bot.send_text_message(session_id, f"🎉 恭喜你猜对了!\n歌曲:{correct_answer}\n歌手:{singer_name}",
sender)
# 删除游戏会话
if self.redis_db:
self.redis_db.delete_game_session(session_id)
await self._send_music_message(bot, game_data, session_id)
return True, "猜对了"
else:
# 答案错误
# 检查是否需要给提示(超过30秒且未给过提示)
current_time = time.time()
start_time = game_data.get("start_time", 0)
hint_given = game_data.get("hint_given", False)
if not hint_given and (current_time - start_time) > 30:
# 给出提示(显示歌名的第一个字)
hint = correct_answer[0] + "*" * (len(correct_answer) - 1)
await bot.send_text_message(session_id, f"💡 提示:歌名以 '{correct_answer[0]}' 开头")
# 更新游戏会话,标记已给出提示
game_data["hint_given"] = True
if self.redis_db:
self.redis_db.save_game_session(session_id, game_data)
# 告知用户答案错误
await bot.send_text_message(session_id, f"❌ 答案错误,请继续猜测!")
return False, "答案错误"
except Exception as e:
self.LOG.error(f"检查答案出错: {e}")
session_id = message.get("roomid", "") or message.get("sender", "")
sender = message.get("sender", "")
bot = message.get("bot")
await bot.send_text_message(session_id, f"❌检查答案出错,请稍后重试", sender)
return False, f"处理出错: {e}"
async def _get_random_song(self, singer_name: Optional[str]) -> Dict[str, Any]:
"""获取随机歌曲信息"""
try:
# 构建API请求URL - 使用抖音音乐API
if singer_name:
# 如果指定了歌手,搜索该歌手的歌曲
api_url = f"https://hhlqilongzhu.cn/api/dg_douyinmusic.php?msg={singer_name}&type=json"
else:
# 如果没有指定歌手,使用热门歌手列表
popular_singers = ["周杰伦", "林俊杰", "薛之谦", "陈奕迅", "邓紫棋", "Taylor Swift", "李荣浩"]
random_singer = random.choice(popular_singers)
api_url = f"https://hhlqilongzhu.cn/api/dg_douyinmusic.php?msg={random_singer}&type=json"
# 发送请求获取歌曲列表
response = requests.get(api_url)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return {}
# 解析响应数据
json_data = response.json()
if not json_data.get("data") or json_data.get("code") != 200:
self.LOG.error(f"API 返回数据格式错误: {json_data}")
return {}
# 从歌曲列表中随机选择一首
song_list = json_data.get("data")
if not song_list or not isinstance(song_list, list) or len(song_list) == 0:
self.LOG.error(f"未找到歌曲列表或列表为空")
return {}
# 随机选择一首歌曲
random_song = random.choice(song_list)
# 根据随机选择的歌曲信息,再次调用API获取完整信息(包含播放链接)
song_name = random_song.get('title', '')
singer_name = random_song.get('singer', '')
search_query = f"{song_name} {singer_name}"
# 调用原有API获取播放链接
detail_api = f"https://www.hhlqilongzhu.cn/api/joox/juhe_music.php?msg={search_query}&type=json&n=1"
detail_response = requests.get(detail_api)
if detail_response.status_code != 200:
self.LOG.error(f"获取歌曲详情失败,状态码: {detail_response.status_code}")
return {}
detail_data = detail_response.json().get("data")
if not detail_data:
self.LOG.error(f"获取歌曲详情数据为空")
return {}
# 返回完整的歌曲信息
return {
"song_name": song_name,
"singer_name": singer_name,
"play_url": detail_data.get('url', ''),
"singer_pic": detail_data.get('cover', '') or detail_data.get('cover', ''),
"data_url": detail_data.get('link', '')
}
except Exception as e:
self.LOG.error(f"获取随机歌曲出错: {e}")
return {}
async def _send_song_clip(self, bot: WechatAPIClient, song_info: Dict[str, Any], session_id: str) -> bool:
"""发送歌曲片段"""
try:
play_url = song_info.get("play_url", "")
if not play_url:
return False
# 下载歌曲
async with aiohttp.ClientSession() as session:
async with session.get(play_url) as resp:
if resp.status != 200:
self.LOG.error(f"下载歌曲失败,状态码: {resp.status}")
return False
# 读取音频数据
audio_data = await resp.read()
try:
# 使用pydub处理音频
audio = AudioSegment.from_file(BytesIO(audio_data))
# 截取前10秒
clip_duration = 10 * 1000 # 10秒,单位毫秒
audio_clip = audio[:clip_duration]
# 将音频片段转换为字节
output = BytesIO()
audio_clip.export(output, format="mp3")
clip_bytes = output.getvalue()
await bot.send_voice_message(session_id, clip_bytes, "mp3")
return True
except Exception as audio_error:
self.LOG.error(f"处理音频出错: {audio_error}")
return False
except Exception as e:
self.LOG.error(f"发送歌曲片段出错: {e}")
return False
async def _send_music_message(self, bot: WechatAPIClient, song_info: Dict[str, Any], receiver: str) -> bool:
"""发送音乐消息"""
try:
song_name = song_info.get("song_name", "")
singer_name = song_info.get("singer_name", "")
play_url = song_info.get("play_url", "")
singer_pic = song_info.get("singer_pic", "")
data_url = song_info.get("data_url", "")
xml_message = f"""
{song_name}
{singer_name}-点击三角直接播放
view
3
0
{data_url}
{play_url}
0
0
0
{singer_pic}
0
49
汽水音乐
"""
self.LOG.info(f"发送音乐消息:{xml_message}")
res = await bot.send_app_message(wxid=receiver, xml=xml_message, type=0)
self.LOG.info(f"发送音乐消息 res:{res}")
return True
except Exception as e:
self.LOG.error(f"发送音乐消息出错: {e}")
return False
async def _next_song(self, bot: WechatAPIClient, session_id: str, sender: str) -> Tuple[bool, str]:
"""跳过当前歌曲,播放下一首"""
try:
# 获取当前游戏数据
current_game = None
if self.redis_db:
current_game = self.redis_db.get_game_session(session_id)
if not current_game or current_game.get("status") != "playing":
await bot.send_text_message(session_id, f"⚠️ 当前没有进行中的猜歌游戏", sender)
return False, "没有进行中的游戏"
# 显示当前歌曲答案
correct_answer = current_game.get("song_name", "")
singer_name = current_game.get("singer_name", "")
await bot.send_text_message(session_id, f"⏭️ 已跳过当前歌曲\n歌曲:{correct_answer}\n歌手:{singer_name}", sender)
# 发送完整音乐
await self._send_music_message(bot, current_game, session_id)
# 删除当前游戏会话
if self.redis_db:
self.redis_db.delete_game_session(session_id)
# 开始新游戏
return await self._start_new_game(bot, session_id, sender, None)
except Exception as e:
self.LOG.error(f"跳过当前歌曲出错: {e}")
await bot.send_text_message(session_id, f"❌跳过当前歌曲出错,请稍后重试", sender)
return False, f"处理出错: {e}"