import base64 from loguru import logger import requests from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient import aiohttp from wechat_ipad.models.appmsg_xml import MUSIC_XML class MusicPlugin(MessagePluginInterface): """音乐点播插件""" # 功能权限常量 FEATURE_KEY = "MUSIC" FEATURE_DESCRIPTION = "🎵 点歌功能 [点歌, 音乐, 音乐点播, 点播音乐, 音乐点歌]" @property def name(self) -> str: return "音乐点播" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供音乐点播功能,支持QQ音乐和网易云音乐" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self._commands = self._config.get("Music", {}).get("command", ["点歌", "音乐"]) self.command_format = self._config.get("Music", {}).get("command-format", "点歌 歌曲名") self.enable = self._config.get("Music", {}).get("enable", True) self.music_api_url = self._config.get("Music", {}).get("music_api_url", "http://192.168.2.170:5000?keywords={keywords}&limit=10") self.song_api_url = self._config.get("Music", {}).get("song_api_url", "") self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="音乐点播") @plugin_points_cost(2, "音乐点播消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查命令格式 if len(content.split(" ")) == 1: await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}" , sender) return False, "命令格式错误" # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 提取歌曲名 user_song_name = content[len(command):].strip() try: # 搜索歌曲 song_info = self._search_song(user_song_name) if not song_info or not song_info.get("play_url"): await bot.send_text_message((roomid if roomid else sender), f"❌未找到歌曲:{user_song_name}", sender) return False, "未找到歌曲" # 发送音乐 await self._send_music_message(bot, song_info, roomid or sender) return True, "发送成功" except Exception as e: self.LOG.error(f"处理音乐请求出错: {e}") return False, f"处理出错: {e}" def _search_song(self, song_name: str) -> Dict[str, Any]: """搜索歌曲信息""" try: if "{keywords}" in self.music_api_url: search_url = self.music_api_url.format(keywords=song_name) else: base = self.music_api_url.split("?")[0].rstrip("/") search_url = f"{base}/Search?keyword={song_name}&limit=10" self.LOG.info(f"请求歌曲API: {search_url}") resp = requests.get(search_url, timeout=10) if resp.status_code != 200: self.LOG.error(f"API 请求失败,状态码: {resp.status_code}") return {} search_data = resp.json() if not search_data.get("success") or search_data.get("status") != 200: self.LOG.error(f"API 返回错误: {search_data.get('message')}") return {} songs = search_data.get("data", []) if not songs: self.LOG.error(f"未找到歌曲: {song_name}") return {} first_song = songs[0] song_id = str(first_song.get("id", "")) if self.song_api_url: detail_url = self.song_api_url.format(url=song_id, level="standard", type="json") else: base = self.music_api_url.split("?")[0].rstrip("/") detail_url = f"{base}/Song_V1?url={song_id}&level=exhigh&type=json" detail_resp = requests.get(detail_url, timeout=15) if detail_resp.status_code != 200: self.LOG.error(f"详情请求失败,状态码: {detail_resp.status_code}") return {} detail_data = detail_resp.json() if not detail_data.get("success") or detail_data.get("status") != 200: self.LOG.error(f"获取歌曲详情失败: {detail_data.get('message')}") return {} song_info = detail_data.get("data", {}) song_name_res = song_info.get("name", first_song.get("name", "")) singer_name_res = song_info.get("ar_name", first_song.get("artist_string", first_song.get("artists", ""))) play_url = song_info.get("url", "") pic_url = song_info.get("pic", first_song.get("picUrl", "")) data_url = song_info.get("url", "") real_play_url = self._get_real_url(play_url) real_data_url = self._get_real_url(data_url) real_pic_url = self._get_real_url(pic_url) self.LOG.info(f"成功获取歌曲: {singer_name_res} - {song_name_res}") return { "song_name": song_name_res, "singer_name": singer_name_res, "play_url": real_play_url, "pic_url": real_pic_url, "data_url": real_data_url } except Exception as e: self.LOG.error(f"搜索歌曲出错: {e}") return {} def _get_real_url(self, redirect_url: str) -> str: """跟随重定向获取真实URL""" try: if not redirect_url: return "" # 发送HEAD请求,跟随重定向,获取真实URL response = requests.head(redirect_url, allow_redirects=True, verify=False, timeout=10) # 返回最终的URL(重定向后的真实地址) if response.status_code in [200, 301, 302, 303, 307, 308]: real_url = response.url self.LOG.debug(f"URL重定向: {redirect_url} -> {real_url}") return real_url else: self.LOG.warning(f"获取真实URL失败,状态码: {response.status_code}") return redirect_url except Exception as e: self.LOG.error(f"获取真实URL出错: {e}") return redirect_url async def url_to_base64(self, play_url: str): async with aiohttp.ClientSession() as session: async with session.get(play_url) as resp: resp.raise_for_status() data = await resp.read() b64_str = base64.b64encode(data).decode('utf-8') return b64_str async def _send_music_message(self, bot: WechatAPIClient, song_info: Dict[str, Any], receiver: str) -> bool: """发送音乐消息""" try: song_name = song_info.get("song_name", "") singer_name = song_info.get("singer_name", "") play_url = song_info.get("play_url", "") singer_pic = song_info.get("pic_url", "") data_url = song_info.get("data_url", "") xml_message = f"{MUSIC_XML}".format(song_name=song_name, singer_name=singer_name, play_url=play_url, data_url=data_url, singer_pic=singer_pic) self.LOG.info(f"发送音乐消息:{xml_message}") res = await bot.send_app_message(wxid=receiver, xml=xml_message, type=0) self.LOG.info(f"发送音乐消息 res:{res}") return True except Exception as e: self.LOG.error(f"发送音乐消息出错: {e}") return False