Files
2025-12-03 15:48:44 +08:00

411 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
音乐点歌插件
支持两种触发方式:
1. 指令触发:点歌 歌曲名
2. AI函数调用
"""
import aiohttp
from pathlib import Path
from typing import List, Optional
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message
from WechatHook import WechatHookClient
class MusicPlugin(PluginBase):
"""音乐点歌插件"""
description = "音乐点歌 - 搜索并播放歌曲"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.api_url = "https://music-dl.sayqz.com/api/"
async def async_init(self):
"""异步初始化"""
logger.success("音乐点歌插件初始化完成")
async def search_music_from_source(self, keyword: str, source: str) -> Optional[dict]:
"""
从指定平台搜索音乐
Args:
keyword: 歌曲关键词
source: 音乐平台 (netease/qq/kuwo)
Returns:
歌曲信息字典,失败返回 None
"""
params = {
"source": source,
"keyword": keyword,
"type": "search"
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
self.api_url,
params=params,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status != 200:
logger.warning(f"[{source}] 搜索失败: HTTP {resp.status}")
return None
result = await resp.json()
if result.get("code") != 200:
logger.warning(f"[{source}] 搜索失败: {result.get('message')}")
return None
data = result.get("data", {})
results = data.get("results", [])
if not results:
logger.warning(f"[{source}] 未找到歌曲: {keyword}")
return None
# 返回第一个结果
return results[0]
except Exception as e:
logger.warning(f"[{source}] 搜索异常: {e}")
return None
async def search_music(self, keyword: str) -> List[dict]:
"""
从三个平台搜索音乐
Args:
keyword: 歌曲关键词
Returns:
歌曲信息列表
"""
import asyncio
sources = ["netease", "qq", "kuwo"] # 恢复qq用于调试
tasks = [self.search_music_from_source(keyword, source) for source in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉失败的结果
songs = []
for result in results:
if isinstance(result, dict) and result:
songs.append(result)
return songs
async def get_real_url(self, redirect_url: str) -> str:
"""
获取重定向后的真实 URL
Args:
redirect_url: 重定向链接
Returns:
真实 URL
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
redirect_url,
allow_redirects=True,
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
return str(resp.url)
except Exception as e:
logger.warning(f"获取真实URL失败: {e}")
return redirect_url
async def send_music_chat_record(self, bot: WechatHookClient, to_wxid: str, keyword: str, songs: list):
"""
以聊天记录格式发送音乐卡片
Args:
bot: 机器人客户端
to_wxid: 接收者 wxid
keyword: 搜索关键词
songs: 歌曲列表
"""
try:
import uuid
import time
import hashlib
import xml.etree.ElementTree as ET
is_group = to_wxid.endswith("@chatroom")
# 构造聊天记录 XML
recordinfo = ET.Element("recordinfo")
ET.SubElement(recordinfo, "info").text = f"🎵 {keyword}"
ET.SubElement(recordinfo, "isChatRoom").text = "1" if is_group else "0"
datalist = ET.SubElement(recordinfo, "datalist")
datalist.set("count", str(len(songs)))
ET.SubElement(recordinfo, "desc").text = f"{keyword} 音乐"
ET.SubElement(recordinfo, "fromscene").text = "3"
for song in songs:
name = song.get("name", "未知歌曲")
artist = song.get("artist", "未知歌手")
platform = song.get("platform", "unknown")
url_redirect = song.get("url", "")
pic_redirect = song.get("pic", "")
# 获取真实 URL
url = await self.get_real_url(url_redirect) if url_redirect else ""
pic = await self.get_real_url(pic_redirect) if pic_redirect else ""
# 根据平台选择 appid
appid_map = {
"netease": "wx8dd6ecd81906fd84",
"qq": "wx45116b30f23e0cc4",
"kuwo": "wxc305711a2a7ad71c"
}
appid = appid_map.get(platform, "wx8dd6ecd81906fd84")
# 构造音乐卡片 XML
music_xml = f"""<appmsg appid="{appid}" sdkver="0"><title>{name}</title><des>{artist}</des><action>view</action><type>3</type><showtype>0</showtype><content/><url>{url}</url><dataurl>{url}</dataurl><lowurl>{url}</lowurl><lowdataurl>{url}</lowdataurl><recorditem/><thumburl>{pic}</thumburl><messageaction/><laninfo/><extinfo/><sourceusername/><sourcedisplayname/><songlyric></songlyric><commenturl/><appattach><totallen>0</totallen><attachid/><emoticonmd5/><fileext/><aeskey/></appattach><webviewshared><publisherId/><publisherReqId>0</publisherReqId></webviewshared><weappinfo><pagepath/><username/><appid/><appservicetype>0</appservicetype></weappinfo><websearch/><songalbumurl>{pic}</songalbumurl></appmsg>"""
di = ET.SubElement(datalist, "dataitem")
di.set("datatype", "49") # 49=appmsg应用消息
di.set("dataid", uuid.uuid4().hex)
ET.SubElement(di, "srcMsgLocalid").text = str((int(time.time() * 1000) % 90000) + 10000)
ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M")
ET.SubElement(di, "fromnewmsgid").text = str(int(time.time() * 1000))
ET.SubElement(di, "srcMsgCreateTime").text = str(int(time.time()))
ET.SubElement(di, "sourcename").text = f"{name}"
ET.SubElement(di, "sourceheadurl").text = pic
ET.SubElement(di, "datatitle").text = f"{name} - {artist}"
ET.SubElement(di, "datadesc").text = artist
ET.SubElement(di, "datafmt").text = "appmsg"
ET.SubElement(di, "ischatroom").text = "1" if is_group else "0"
# 使用 CDATA 包裹音乐 XML
appmsg_elem = ET.SubElement(di, "appmsg")
appmsg_elem.text = f"<![CDATA[{music_xml}]]>"
dataitemsource = ET.SubElement(di, "dataitemsource")
ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(to_wxid.encode("utf-8")).hexdigest()
record_xml = ET.tostring(recordinfo, encoding="unicode")
appmsg_xml = f"""<appmsg appid="" sdkver="0"><title>🎵 {keyword}</title><des>{keyword} 音乐</des><type>19</type><url>https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport</url><appattach><cdnthumbaeskey></cdnthumbaeskey><aeskey></aeskey></appattach><recorditem><![CDATA[{record_xml}]]></recorditem><percent>0</percent></appmsg>"""
await bot._send_data_async(11214, {"to_wxid": to_wxid, "content": appmsg_xml})
logger.success(f"已发送音乐聊天记录: {len(songs)}")
except Exception as e:
logger.error(f"发送音乐聊天记录失败: {e}")
async def send_music_card(self, bot: WechatHookClient, to_wxid: str, song: dict, retry: int = 2):
"""
发送音乐卡片
Args:
bot: 机器人客户端
to_wxid: 接收者 wxid
song: 歌曲信息
retry: 重试次数
"""
try:
name = song.get("name", "未知歌曲")
artist = song.get("artist", "未知歌手")
url_redirect = song.get("url", "")
pic_redirect = song.get("pic", "")
platform = song.get("platform", "unknown")
logger.info(f"准备发送音乐卡片: {name} - {artist} (平台: {platform})")
# 获取真实播放 URL失败则使用原链接
url = url_redirect
if url_redirect:
try:
url = await self.get_real_url(url_redirect)
except Exception as e:
logger.warning(f"获取播放链接失败,使用原链接: {e}")
# 获取真实封面图片 URL失败则使用空字符串
pic = ""
if pic_redirect:
try:
pic = await self.get_real_url(pic_redirect)
except Exception as e:
logger.warning(f"获取封面失败,使用空封面: {e}")
# 歌词字段留空避免XML过大
lrc = ""
# 根据平台选择 appid
appid_map = {
"netease": "wx8dd6ecd81906fd84",
"qq": "wx45116b30f23e0cc4",
"kuwo": "wxc305711a2a7ad71c"
}
appid = appid_map.get(platform, "wx8dd6ecd81906fd84")
# 构造音乐卡片 XML
xml = f"""<appmsg appid="{appid}" sdkver="0">
<title>{name}</title>
<des>{artist}</des>
<action>view</action>
<type>3</type>
<showtype>0</showtype>
<content/>
<url>{url}</url>
<dataurl>{url}</dataurl>
<lowurl>{url}</lowurl>
<lowdataurl>{url}</lowdataurl>
<recorditem/>
<thumburl>{pic}</thumburl>
<messageaction/>
<laninfo/>
<extinfo/>
<sourceusername/>
<sourcedisplayname/>
<songlyric>{lrc}</songlyric>
<commenturl/>
<appattach>
<totallen>0</totallen>
<attachid/>
<emoticonmd5/>
<fileext/>
<aeskey/>
</appattach>
<webviewshared>
<publisherId/>
<publisherReqId>0</publisherReqId>
</webviewshared>
<weappinfo>
<pagepath/>
<username/>
<appid/>
<appservicetype>0</appservicetype>
</weappinfo>
<websearch/>
<songalbumurl>{pic}</songalbumurl>
</appmsg>"""
result = await bot._send_data_async(11214, {"to_wxid": to_wxid, "content": xml})
if result:
logger.success(f"已发送音乐卡片: {name} - {artist}")
else:
# 发送失败,尝试重试
if retry > 0:
logger.warning(f"发送失败,{retry}秒后重试: {name}")
import asyncio
await asyncio.sleep(1)
await self.send_music_card(bot, to_wxid, song, retry - 1)
else:
logger.error(f"发送音乐卡片失败(已重试): {name} - {artist}")
except Exception as e:
logger.error(f"发送音乐卡片异常: {e}")
@on_text_message(priority=60)
async def handle_music_command(self, bot: WechatHookClient, message: dict):
"""处理点歌指令"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
# 精确匹配 "点歌 " 开头
if not content.startswith("点歌 "):
return True
# 提取歌曲名和参数
parts = content[3:].strip().split()
if not parts:
await bot.send_text(from_wxid, "❌ 请输入歌曲名\n格式:点歌 歌曲名 [3]")
return False
# 检查是否要发送全部平台
send_all = len(parts) > 1 and parts[-1] == "3"
keyword = " ".join(parts[:-1]) if send_all else " ".join(parts)
logger.info(f"点歌: {keyword}, 发送全部: {send_all}")
# 从三个平台搜索歌曲
songs = await self.search_music(keyword)
if not songs:
await bot.send_text(from_wxid, f"❌ 未找到歌曲:{keyword}")
return False
# 根据参数决定发送哪些
if send_all:
# 发送所有平台(添加延迟避免限流)
import asyncio
for i, song in enumerate(songs):
await self.send_music_card(bot, from_wxid, song)
if i < len(songs) - 1:
await asyncio.sleep(2)
else:
# 只发送 QQ 音乐
qq_song = next((s for s in songs if s.get("platform") == "qq"), None)
if qq_song:
await self.send_music_card(bot, from_wxid, qq_song)
else:
# 如果没有 QQ 音乐,发送第一个
await self.send_music_card(bot, from_wxid, songs[0])
logger.success(f"已发送歌曲")
return False
def get_llm_tools(self) -> List[dict]:
"""返回 LLM 工具定义"""
return [
{
"type": "function",
"function": {
"name": "search_music",
"description": "搜索并播放音乐。当用户想听歌、点歌、播放音乐时调用此函数。",
"parameters": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "歌曲名称或关键词,例如:告白气球、周杰伦 晴天"
}
},
"required": ["keyword"]
}
}
}
]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict:
"""执行 LLM 工具调用"""
try:
if tool_name != "search_music":
return None
keyword = arguments.get("keyword")
if not keyword:
return {"success": False, "message": "缺少歌曲名称参数"}
logger.info(f"AI 调用点歌: {keyword}")
await bot.send_text(from_wxid, f"🔍 正在搜索:{keyword}")
# 从三个平台搜索歌曲
songs = await self.search_music(keyword)
if not songs:
await bot.send_text(from_wxid, f"❌ 未找到歌曲:{keyword}")
return {"success": False, "message": f"未找到歌曲:{keyword}"}
# 发送所有找到的音乐卡片(添加延迟避免限流)
import asyncio
for i, song in enumerate(songs):
await self.send_music_card(bot, from_wxid, song)
if i < len(songs) - 1: # 最后一个不需要延迟
await asyncio.sleep(2) # 每条消息间隔2秒
return {"success": True, "message": f"已找到 {len(songs)} 首歌曲"}
except Exception as e:
logger.error(f"LLM 工具执行失败: {e}")
return {"success": False, "message": f"执行失败: {str(e)}"}