Files
abot/plugins/guess_song/main.py

376 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import json
import random
import time
from datetime import datetime, timedelta
from loguru import logger
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost, points_reward_decorator
from wechat_ipad import WechatAPIClient
from db.connection import DBConnectionManager
import aiohttp
from pydub import AudioSegment
from io import BytesIO
class GuessSongRedisDB:
"""猜歌名游戏Redis相关操作"""
def __init__(self, db_manager: DBConnectionManager):
self.db_manager = db_manager
self.prefix = "group:guess_song:"
def get_redis_connection(self):
"""获取Redis连接"""
return self.db_manager.get_redis_connection()
def save_game_session(self, room_id: str, game_data: Dict[str, Any]) -> bool:
"""保存游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
# 设置过期时间为10分钟
redis_client.setex(
f'{self.prefix}{room_id}',
600, # 10分钟过期
json.dumps(game_data, ensure_ascii=False)
)
return True
except Exception as e:
logger.error(f"保存猜歌游戏数据出错: {e}")
return False
def get_game_session(self, room_id: str) -> Optional[Dict[str, Any]]:
"""获取游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
data = redis_client.get(f'{self.prefix}{room_id}')
if data:
if isinstance(data, bytes):
data = data.decode('utf-8')
return json.loads(data)
return None
except Exception as e:
logger.error(f"获取猜歌游戏数据出错: {e}")
return None
def delete_game_session(self, room_id: str) -> bool:
"""删除游戏会话数据"""
try:
with self.get_redis_connection() as redis_client:
redis_client.delete(f'{self.prefix}{room_id}')
return True
except Exception as e:
logger.error(f"删除猜歌游戏数据出错: {e}")
return False
class GuessSongPlugin(MessagePluginInterface):
"""猜歌名游戏插件"""
@property
def name(self) -> str:
return "猜歌名游戏"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供猜歌名游戏功能,支持指定歌手和猜歌名"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.redis_db = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.db_manager = context.get("db_manager")
# 初始化Redis操作类
if self.db_manager:
self.redis_db = GuessSongRedisDB(self.db_manager)
self._commands = self._config.get("GuessSong", {}).get("command", ["猜歌名"])
self.command_format = self._config.get("GuessSong", {}).get("command-format", "猜歌名 [歌手名]/[歌名]")
self.enable = self._config.get("GuessSong", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="猜歌名游戏")
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
# 简化命令处理,只检查开头是否包含命令前缀
command_found = False
for cmd in self._commands:
if content.startswith(cmd):
command_found = True
content = content[len(cmd):].strip()
break
if not command_found:
return False, "不匹配的命令"
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 使用roomid或sender作为游戏会话ID
session_id = roomid if roomid else sender
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.GUESS_MUSIC) == PermissionStatus.DISABLED:
return False, "没有权限"
# 获取当前游戏会话
current_game = None
if self.redis_db:
current_game = self.redis_db.get_game_session(session_id)
# 简化逻辑:如果有内容,且游戏进行中,则视为答案
if current_game and current_game.get("status") == "playing" and content:
return await self._check_answer(bot, session_id, sender, content, current_game)
# 否则开始新游戏(可以指定歌手或随机)
return await self._start_new_game(bot, session_id, sender, content if content else None)
async def _start_new_game(self, bot: WechatAPIClient, session_id: str, sender: str, singer_name: Optional[str]) -> \
Tuple[bool, str]:
"""开始新游戏"""
try:
# 搜索歌曲
song_info = await self._get_random_song(singer_name)
if not song_info or not song_info.get("play_url"):
await bot.send_text_message(session_id, f"❌未找到{singer_name or '随机'}歌曲,请重试", sender)
return False, "未找到歌曲"
# 保存游戏会话
game_data = {
"status": "playing",
"song_name": song_info.get("song_name", ""),
"singer_name": song_info.get("singer_name", ""),
"start_time": time.time(),
"hint_given": False
}
if self.redis_db:
self.redis_db.save_game_session(session_id, game_data)
# 发送游戏开始消息
await bot.send_text_message(session_id,
f"🎵 猜歌名游戏开始!\n请听10秒钟歌曲片段然后回复[猜歌名 歌名]来猜测歌曲名称。\n歌手: {song_info.get('singer_name', '未知')}",
sender)
# 发送10秒音频片段
await self._send_song_clip(bot, song_info, session_id)
return True, "游戏开始"
except Exception as e:
self.LOG.error(f"开始猜歌游戏出错: {e}")
await bot.send_text_message(session_id, f"❌开始猜歌游戏出错,请稍后重试", sender)
return False, f"处理出错: {e}"
@points_reward_decorator(5, "game", "猜歌名游戏", Feature.GUESS_MUSIC)
async def _check_answer(self, bot: WechatAPIClient, session_id: str, sender: str, answer: str,
game_data: Dict[str, Any]) -> Tuple[bool, str]:
"""检查答案"""
try:
correct_answer = game_data.get("song_name", "")
singer_name = game_data.get("singer_name", "")
# 检查答案是否正确(简单比较,可以改进为模糊匹配)
if answer.lower() == correct_answer.lower():
# 游戏结束,发送成功消息
await bot.send_text_message(session_id, f"🎉 恭喜你猜对了!\n歌曲:{correct_answer}\n歌手:{singer_name}",
sender)
# 删除游戏会话
if self.redis_db:
self.redis_db.delete_game_session(session_id)
return True, "猜对了"
else:
# 答案错误
# 检查是否需要给提示超过30秒且未给过提示
current_time = time.time()
start_time = game_data.get("start_time", 0)
hint_given = game_data.get("hint_given", False)
if not hint_given and (current_time - start_time) > 30:
# 给出提示(显示歌名的第一个字)
hint = correct_answer[0] + "*" * (len(correct_answer) - 1)
await bot.send_text_message(session_id, f"💡 提示:歌名以 '{correct_answer[0]}' 开头")
# 更新游戏会话,标记已给出提示
game_data["hint_given"] = True
if self.redis_db:
self.redis_db.save_game_session(session_id, game_data)
# 告知用户答案错误
await bot.send_text_message(session_id, f"❌ 答案错误,请继续猜测!")
return False, "答案错误"
except Exception as e:
self.LOG.error(f"检查答案出错: {e}")
await bot.send_text_message(session_id, f"❌检查答案出错,请稍后重试", sender)
return False, f"处理出错: {e}"
async def _get_random_song(self, singer_name: Optional[str]) -> Dict[str, Any]:
"""获取随机歌曲信息"""
try:
# 构建API请求URL - 使用抖音音乐API
if singer_name:
# 如果指定了歌手,搜索该歌手的歌曲
api_url = f"https://hhlqilongzhu.cn/api/dg_douyinmusic.php?msg={singer_name}&type=json"
else:
# 如果没有指定歌手,使用热门歌手列表
popular_singers = ["周杰伦", "林俊杰", "薛之谦", "陈奕迅", "邓紫棋", "Taylor Swift", "李荣浩"]
random_singer = random.choice(popular_singers)
api_url = f"https://hhlqilongzhu.cn/api/dg_douyinmusic.php?msg={random_singer}&type=json"
# 发送请求获取歌曲列表
response = requests.get(api_url)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return {}
# 解析响应数据
json_data = response.json()
if not json_data.get("data") or json_data.get("code") != 200:
self.LOG.error(f"API 返回数据格式错误: {json_data}")
return {}
# 从歌曲列表中随机选择一首
song_list = json_data.get("data")
if not song_list or not isinstance(song_list, list) or len(song_list) == 0:
self.LOG.error(f"未找到歌曲列表或列表为空")
return {}
# 随机选择一首歌曲
random_song = random.choice(song_list)
# 根据随机选择的歌曲信息再次调用API获取完整信息包含播放链接
song_name = random_song.get('title', '')
singer_name = random_song.get('singer', '')
search_query = f"{song_name} {singer_name}"
# 调用原有API获取播放链接
detail_api = f"https://www.hhlqilongzhu.cn/api/joox/juhe_music.php?msg={search_query}&type=json&n=1"
detail_response = requests.get(detail_api)
if detail_response.status_code != 200:
self.LOG.error(f"获取歌曲详情失败,状态码: {detail_response.status_code}")
return {}
detail_data = detail_response.json().get("data")
if not detail_data:
self.LOG.error(f"获取歌曲详情数据为空")
return {}
# 返回完整的歌曲信息
return {
"song_name": song_name,
"singer_name": singer_name,
"play_url": detail_data.get('url', ''),
"singer_pic": random_song.get('cover', '') or detail_data.get('cover', ''),
"data_url": detail_data.get('link', '')
}
except Exception as e:
self.LOG.error(f"获取随机歌曲出错: {e}")
return {}
async def _send_song_clip(self, bot: WechatAPIClient, song_info: Dict[str, Any], session_id: str) -> bool:
"""发送歌曲片段"""
try:
play_url = song_info.get("play_url", "")
if not play_url:
return False
# 下载歌曲
async with aiohttp.ClientSession() as session:
async with session.get(play_url) as resp:
if resp.status != 200:
self.LOG.error(f"下载歌曲失败,状态码: {resp.status}")
return False
# 读取音频数据
audio_data = await resp.read()
try:
# 使用pydub处理音频
audio = AudioSegment.from_file(BytesIO(audio_data))
# 截取前10秒
clip_duration = 10 * 1000 # 10秒单位毫秒
audio_clip = audio[:clip_duration]
# 将音频片段转换为字节
output = BytesIO()
audio_clip.export(output, format="mp3")
clip_bytes = output.getvalue()
await bot.send_voice_message(session_id, clip_bytes, "mp3")
return True
except Exception as audio_error:
self.LOG.error(f"处理音频出错: {audio_error}")
return False
except Exception as e:
self.LOG.error(f"发送歌曲片段出错: {e}")
return False