import logging import requests import lz4.block as lb from typing import Dict, Any, List, Optional, Tuple from wcferry import Wcf from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager class MusicPlugin(MessagePluginInterface): """音乐点播插件""" @property def name(self) -> str: return "音乐点播" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供音乐点播功能,支持QQ音乐和网易云音乐" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logging.getLogger(f"Plugin.{self.name}") self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.wcf = context.get("wcf") self.event_system = context.get("event_system") self.message_util = context.get("message_util") # 加载配置 self.load_config() self._commands = self._config.get("Music", {}).get("command", ["点歌", "音乐"]) self.command_format = self._config.get("Music", {}).get("command-format", "点歌 歌曲名") self.enable = self._config.get("Music", {}).get("enable", True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() command = content.split(" ")[0] sender = message.get("sender") roomid = message.get("roomid", "") wcf: Wcf = message.get("wcf") gbm: GroupBotManager = message.get("gbm") # 检查命令格式 if len(content.split(" ")) == 1: wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}", (roomid if roomid else sender), sender) return True, "命令格式错误" # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.MUSIC) == PermissionStatus.DISABLED: return False, "没有权限" # 提取歌曲名 user_song_name = content[len(command):].strip() try: # 搜索歌曲 song_info = self._search_song(user_song_name) if not song_info or not song_info.get("play_url"): wcf.send_text(f"-----Bot-----\n❌未找到歌曲:{user_song_name}", (roomid if roomid else sender), sender) return True, "未找到歌曲" # 发送音乐 self._send_music_message(wcf, song_info, roomid or sender) return True, "发送成功" except Exception as e: self.LOG.error(f"处理音乐请求出错: {e}") wcf.send_text(f"-----Bot-----\n❌请求出错:{e}", (roomid if roomid else sender), sender) return True, f"处理出错: {e}" def _search_song(self, song_name: str) -> Dict[str, Any]: """搜索歌曲信息""" try: # 尝试QQ音乐API short_play_api = f"https://qqmusic.qqovo.cn/getSearchByKey?key={song_name}&page=1&limit=1" fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={song_name}&n=1&num=1&type=json" response = requests.get(short_play_api) if response.status_code == 400: response = requests.get(fallback_api) if response.status_code != 200: self.LOG.error(f"API 请求失败,状态码: {response.status_code}") return {} json_data = response.json() result = json_data.get('response', {}).get('data', {}).get('song', {}).get('list', []) if not result: # 尝试备用API response = requests.get(fallback_api) if response.status_code == 200: music_response = response.json() return { "song_name": music_response.get('title', ''), "singer_name": music_response.get('singer', ''), "play_url": music_response.get('music_url', ''), "singer_pic": music_response.get('cover', ''), "data_url": music_response.get('link', '') } return {} # 解析QQ音乐结果 first_song = result[0] song_name = first_song.get('songname', '') song_mid = first_song.get('songmid', '') first_singer_name = first_song.get('singer', [{}])[0].get('name', '') zhida_singer = json_data.get('response', {}).get('data', {}).get('zhida', {}).get('zhida_singer', {}) singer_pic = zhida_singer.get('singerPic', '') if zhida_singer else None # 获取播放链接 music_play_api = f"https://qqmusic.qqovo.cn/getMusicPlay?songmid={song_mid}&quality=m4a" music_response = requests.get(music_play_api) if music_response.status_code == 400: # 尝试备用API response = requests.get(fallback_api) if response.status_code == 200: music_response = response.json() return { "song_name": music_response.get('title', song_name), "singer_name": music_response.get('singer', first_singer_name), "play_url": music_response.get('music_url', ''), "singer_pic": music_response.get('cover', singer_pic), "data_url": music_response.get('link', '') } return {} if music_response.status_code != 200: self.LOG.error(f"获取播放链接失败,状态码: {music_response.status_code}") return {} music_data = music_response.json() play_url = music_data.get('data', {}).get('playUrl', {}).get(song_mid, {}).get('url', '') if not play_url: # 尝试备用API response = requests.get(fallback_api) if response.status_code == 200: music_response = response.json() return { "song_name": music_response.get('title', song_name), "singer_name": music_response.get('singer', first_singer_name), "play_url": music_response.get('music_url', ''), "singer_pic": music_response.get('cover', singer_pic), "data_url": music_response.get('link', '') } return {} data_url = f"https://y.qq.com/n/ryqq/songDetail/{song_mid}" return { "song_name": song_name, "singer_name": first_singer_name, "play_url": play_url, "singer_pic": singer_pic, "data_url": data_url } except Exception as e: self.LOG.error(f"搜索歌曲出错: {e}") return {} def _send_music_message(self, wcf, song_info: Dict[str, Any], receiver: str) -> bool: """发送音乐消息""" try: song_name = song_info.get("song_name", "") singer_name = song_info.get("singer_name", "") play_url = song_info.get("play_url", "") singer_pic = song_info.get("singer_pic", "") data_url = song_info.get("data_url", "") xml_message = f""" {song_name} {singer_name}\n❤Bot-祝您天天开心❤ view 3 0 {data_url} {play_url} 0 0 0 {singer_pic} 0 49 """ # 修改消息数据库里面的消息content内容 text_bytes = xml_message.encode('utf-8') compressed_data = lb.compress(text_bytes, store_size=False).hex() data = wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1") wcf.query_sql('MSG0.db', f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3, IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}""" ) result = wcf.forward_msg(data[0]["MsgSvrID"], receiver) self.LOG.info(f"点歌发送结果: {result}") return True except Exception as e: self.LOG.error(f"发送音乐消息出错: {e}") return False