Files

689 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
音乐点歌插件
支持两种触发方式:
1. 指令触发:点歌 歌曲名
2. AI函数调用
"""
import aiohttp
from pathlib import Path
import tomllib
from typing import List, Optional
from loguru import logger
from utils.plugin_base import PluginBase
from utils.decorators import on_text_message
from WechatHook import WechatHookClient
class MusicPlugin(PluginBase):
"""音乐点歌插件"""
description = "音乐点歌 - 搜索并播放歌曲"
author = "ShiHao"
version = "1.0.0"
def __init__(self):
super().__init__()
self.api_url = "https://tunehub.sayqz.com/api"
self.api_key = ""
self.fallback_api_url = "https://www.hhlqilongzhu.cn/api/dg_wyymusic.php"
self.tool_max_results = 3
config_path = Path("plugins/Music/config.toml")
if config_path.exists():
try:
with open(config_path, "rb") as f:
plugin_config = tomllib.load(f)
music_config = plugin_config.get("Music", {})
self.api_url = music_config.get("api_url", self.api_url)
self.api_key = music_config.get("api_key", self.api_key)
self.fallback_api_url = music_config.get("fallback_api_url", self.fallback_api_url)
tool_max_results = music_config.get("tool_max_results", self.tool_max_results)
try:
self.tool_max_results = int(tool_max_results)
except (TypeError, ValueError):
logger.warning(f"Music tool_max_results 配置无效: {tool_max_results}")
except Exception as e:
logger.warning(f"读取 Music 配置失败: {e}")
self.api_base = self.api_url.rstrip("/")
self.api_headers = {"X-API-Key": self.api_key} if self.api_key else {}
async def _request_json(
self,
session: aiohttp.ClientSession,
url: str,
method: str = "GET",
params: Optional[dict] = None,
json_body: Optional[dict] = None,
timeout: int = 10,
headers: Optional[dict] = None,
) -> Optional[dict]:
"""请求 JSON带简易重试处理 502/503/504"""
import asyncio
retry_statuses = {502, 503, 504}
for attempt in range(3):
try:
if method.upper() == "POST":
req = session.post
else:
req = session.get
async with req(
url,
params=params,
json=json_body,
headers=headers,
timeout=aiohttp.ClientTimeout(total=timeout),
) as resp:
if resp.status in retry_statuses and attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
continue
if resp.status != 200:
logger.warning(f"HTTP 请求失败: {url} -> {resp.status}")
return None
try:
return await resp.json(content_type=None)
except Exception:
text = await resp.text()
try:
import json
return json.loads(text)
except Exception:
logger.warning(f"HTTP 响应不是 JSON: {url}")
return None
except Exception as e:
if attempt < 2:
await asyncio.sleep(0.5 * (attempt + 1))
continue
logger.warning(f"HTTP 请求异常: {url} -> {e}")
return None
async def async_init(self):
"""异步初始化"""
logger.success("音乐点歌插件初始化完成")
async def _get_method_config(
self,
session: aiohttp.ClientSession,
platform: str,
function: str
) -> Optional[dict]:
"""获取平台方法配置(方法下发)"""
url = f"{self.api_base}/v1/methods/{platform}/{function}"
result = await self._request_json(
session,
url,
method="GET",
headers=self.api_headers,
timeout=10
)
if not result or result.get("code") != 0:
logger.warning(f"[{platform}] 获取方法失败: {result}")
return None
return result.get("data")
async def _search_platform(
self,
session: aiohttp.ClientSession,
keyword: str,
platform: str,
page: int = 1,
limit: int = 20
) -> List[dict]:
"""使用方法下发配置搜索歌曲"""
config = await self._get_method_config(session, platform, "search")
if not config:
return []
headers = config.get("headers", {})
url = config.get("url")
method = config.get("method", "GET")
if platform == "netease":
params = {
"s": keyword,
"type": "1",
"offset": max(page - 1, 0) * limit,
"limit": limit,
}
response = await self._request_json(
session,
url,
method=method,
params=params,
headers=headers,
timeout=10
)
songs = (response or {}).get("result", {}).get("songs") or []
results = []
for item in songs:
results.append({
"id": str(item.get("id")),
"name": item.get("name"),
"artist": ", ".join([a.get("name") for a in item.get("artists", [])]),
"album": (item.get("album") or {}).get("name", ""),
"platform": platform,
})
return results
if platform == "qq":
import copy
body = copy.deepcopy(config.get("body", {}))
body.setdefault("req", {}).setdefault("param", {})
body["req"]["param"]["query"] = keyword
body["req"]["param"]["page_num"] = page
body["req"]["param"]["num_per_page"] = limit
response = await self._request_json(
session,
url,
method="POST",
json_body=body,
headers=headers,
timeout=10
)
song_list = (((response or {}).get("req") or {}).get("data") or {}).get("body", {})
song_list = song_list.get("song", {}).get("list", []) or []
results = []
for item in song_list:
results.append({
"id": item.get("mid"),
"name": item.get("name"),
"artist": ", ".join([s.get("name") for s in item.get("singer", [])]),
"album": (item.get("album") or {}).get("name", ""),
"platform": platform,
})
return results
if platform == "kuwo":
params = config.get("params", {}).copy()
params["all"] = keyword
params["pn"] = max(page - 1, 0)
params["rn"] = limit
response = await self._request_json(
session,
url,
method=method,
params=params,
headers=headers,
timeout=10
)
song_list = (response or {}).get("abslist") or []
results = []
for item in song_list:
results.append({
"id": str(item.get("MUSICRID", "")).replace("MUSIC_", ""),
"name": item.get("SONGNAME"),
"artist": (item.get("ARTIST") or "").replace("&", ", "),
"album": item.get("ALBUM") or "",
"platform": platform,
})
return results
logger.warning(f"未知平台: {platform}")
return []
async def search_music_fallback(
self,
session: aiohttp.ClientSession,
keyword: str
) -> Optional[dict]:
"""备用搜索(仅网易云),用于主源不可用时兜底"""
params = {
"gm": keyword,
"n": 1,
"br": 2,
"type": "json",
}
result = await self._request_json(session, self.fallback_api_url, params=params, timeout=10)
if not result:
return None
if result.get("code") != 200:
logger.warning(f"[fallback] 搜索失败: {result}")
return None
name = result.get("title") or keyword
artist = result.get("singer") or "未知歌手"
music_url = (result.get("music_url") or "").split("?")[0]
link_url = result.get("link") or ""
cover_url = result.get("cover") or ""
if not music_url and not link_url:
return None
return {
"name": name,
"artist": artist,
"url": music_url or link_url,
"pic": cover_url,
"platform": "netease",
}
async def search_music(self, keyword: str) -> List[dict]:
"""
从三个平台搜索音乐
Args:
keyword: 歌曲关键词
Returns:
歌曲信息列表
"""
import asyncio
sources = ["netease", "qq", "kuwo"]
if not self.api_key:
logger.warning("Music API Key 未配置,无法使用 TuneHub")
async with aiohttp.ClientSession() as session:
fallback = await self.search_music_fallback(session, keyword)
return [fallback] if fallback else []
async with aiohttp.ClientSession() as session:
tasks = [self._search_platform(session, keyword, source) for source in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
songs = []
for result in results:
if isinstance(result, list):
songs.extend([item for item in result if isinstance(item, dict) and item.get("id")])
if songs:
return songs
fallback = await self.search_music_fallback(session, keyword)
if fallback:
logger.info("主源不可用,已使用备用网易云搜索")
return [fallback]
return []
async def get_real_url(self, redirect_url: str) -> str:
"""
获取重定向后的真实 URL
Args:
redirect_url: 重定向链接
Returns:
真实 URL
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
redirect_url,
allow_redirects=True,
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
return str(resp.url)
except Exception as e:
logger.warning(f"获取真实URL失败: {e}")
return redirect_url
async def parse_song(
self,
session: aiohttp.ClientSession,
platform: str,
song_id: str,
quality: str = "320k"
) -> Optional[dict]:
"""调用解析接口获取真实播放链接与歌词"""
if not self.api_key:
logger.warning("Music API Key 未配置,无法解析歌曲")
return None
body = {
"platform": platform,
"ids": str(song_id),
"quality": quality,
}
url = f"{self.api_base}/v1/parse"
result = await self._request_json(
session,
url,
method="POST",
json_body=body,
headers=self.api_headers,
timeout=20
)
if not result or result.get("code") != 0:
logger.warning(f"[{platform}] 解析失败: {result}")
return None
data = (result.get("data") or {}).get("data") or []
if not data:
return None
item = data[0]
if not item.get("success", True):
logger.warning(f"[{platform}] 解析失败: {item}")
return None
return item
async def send_music_chat_record(self, bot: WechatHookClient, to_wxid: str, keyword: str, songs: list):
"""
以聊天记录格式发送音乐卡片
Args:
bot: 机器人客户端
to_wxid: 接收者 wxid
keyword: 搜索关键词
songs: 歌曲列表
"""
try:
import uuid
import time
import hashlib
import xml.etree.ElementTree as ET
is_group = to_wxid.endswith("@chatroom")
# 构造聊天记录 XML
recordinfo = ET.Element("recordinfo")
ET.SubElement(recordinfo, "info").text = f"🎵 {keyword}"
ET.SubElement(recordinfo, "isChatRoom").text = "1" if is_group else "0"
datalist = ET.SubElement(recordinfo, "datalist")
datalist.set("count", str(len(songs)))
ET.SubElement(recordinfo, "desc").text = f"{keyword} 音乐"
ET.SubElement(recordinfo, "fromscene").text = "3"
appid_map = {
"netease": "wx8dd6ecd81906fd84",
"qq": "wx45116b30f23e0cc4",
"kuwo": "wxc305711a2a7ad71c"
}
async with aiohttp.ClientSession() as session:
for song in songs:
name = song.get("name", "未知歌曲")
artist = song.get("artist", "未知歌手")
platform = song.get("platform", "unknown")
url_redirect = song.get("url", "")
pic_redirect = song.get("pic", "")
if not url_redirect and song.get("id"):
parsed = await self.parse_song(session, platform, song.get("id"))
if parsed:
info = parsed.get("info") or {}
name = info.get("name") or name
artist = info.get("artist") or artist
url_redirect = parsed.get("url") or url_redirect
pic_redirect = parsed.get("cover") or pic_redirect
# 获取真实 URL
url = await self.get_real_url(url_redirect) if url_redirect else ""
pic = await self.get_real_url(pic_redirect) if pic_redirect else ""
appid = appid_map.get(platform, "wx8dd6ecd81906fd84")
# 构造音乐卡片 XML
music_xml = f"""<appmsg appid="{appid}" sdkver="0"><title>{name}</title><des>{artist}</des><action>view</action><type>3</type><showtype>0</showtype><content/><url>{url}</url><dataurl>{url}</dataurl><lowurl>{url}</lowurl><lowdataurl>{url}</lowdataurl><recorditem/><thumburl>{pic}</thumburl><messageaction/><laninfo/><extinfo/><sourceusername/><sourcedisplayname/><songlyric></songlyric><commenturl/><appattach><totallen>0</totallen><attachid/><emoticonmd5/><fileext/><aeskey/></appattach><webviewshared><publisherId/><publisherReqId>0</publisherReqId></webviewshared><weappinfo><pagepath/><username/><appid/><appservicetype>0</appservicetype></weappinfo><websearch/><songalbumurl>{pic}</songalbumurl></appmsg>"""
di = ET.SubElement(datalist, "dataitem")
di.set("datatype", "49") # 49=appmsg应用消息
di.set("dataid", uuid.uuid4().hex)
ET.SubElement(di, "srcMsgLocalid").text = str((int(time.time() * 1000) % 90000) + 10000)
ET.SubElement(di, "sourcetime").text = time.strftime("%Y-%m-%d %H:%M")
ET.SubElement(di, "fromnewmsgid").text = str(int(time.time() * 1000))
ET.SubElement(di, "srcMsgCreateTime").text = str(int(time.time()))
ET.SubElement(di, "sourcename").text = f"{name}"
ET.SubElement(di, "sourceheadurl").text = pic
ET.SubElement(di, "datatitle").text = f"{name} - {artist}"
ET.SubElement(di, "datadesc").text = artist
ET.SubElement(di, "datafmt").text = "appmsg"
ET.SubElement(di, "ischatroom").text = "1" if is_group else "0"
# 使用 CDATA 包裹音乐 XML
appmsg_elem = ET.SubElement(di, "appmsg")
appmsg_elem.text = f"<![CDATA[{music_xml}]]>"
dataitemsource = ET.SubElement(di, "dataitemsource")
ET.SubElement(dataitemsource, "hashusername").text = hashlib.sha256(to_wxid.encode("utf-8")).hexdigest()
record_xml = ET.tostring(recordinfo, encoding="unicode")
appmsg_xml = f"""<appmsg appid="" sdkver="0"><title>🎵 {keyword}</title><des>{keyword} 音乐</des><type>19</type><url>https://support.weixin.qq.com/cgi-bin/mmsupport-bin/readtemplate?t=page/favorite_record__w_unsupport</url><appattach><cdnthumbaeskey></cdnthumbaeskey><aeskey></aeskey></appattach><recorditem><![CDATA[{record_xml}]]></recorditem><percent>0</percent></appmsg>"""
await bot.send_xml(to_wxid, appmsg_xml)
logger.success(f"已发送音乐聊天记录: {len(songs)}")
except Exception as e:
logger.error(f"发送音乐聊天记录失败: {e}")
async def send_music_card(self, bot: WechatHookClient, to_wxid: str, song: dict, retry: int = 2):
"""
发送音乐卡片
Args:
bot: 机器人客户端
to_wxid: 接收者 wxid
song: 歌曲信息
retry: 重试次数
"""
try:
name = song.get("name", "未知歌曲")
artist = song.get("artist", "未知歌手")
platform = song.get("platform", "unknown")
song_id = song.get("id")
logger.info(f"准备发送音乐卡片: {name} - {artist} (平台: {platform})")
url = ""
pic = ""
if song_id:
async with aiohttp.ClientSession() as session:
parsed = await self.parse_song(session, platform, song_id)
if parsed:
info = parsed.get("info") or {}
name = info.get("name") or name
artist = info.get("artist") or artist
url = parsed.get("url") or ""
pic = parsed.get("cover") or ""
# 解析失败时尝试使用搜索结果的直链
if not url:
url_redirect = song.get("url", "")
if url_redirect:
try:
url = await self.get_real_url(url_redirect)
except Exception as e:
logger.warning(f"获取播放链接失败,使用原链接: {e}")
if not pic:
pic_redirect = song.get("pic", "")
if pic_redirect:
try:
pic = await self.get_real_url(pic_redirect)
except Exception as e:
logger.warning(f"获取封面失败,使用空封面: {e}")
if not url:
logger.warning(f"未获取到播放链接: {name} - {artist}")
return False
# 歌词字段留空避免XML过大
lrc = ""
# 根据平台选择 appid
appid_map = {
"netease": "wx8dd6ecd81906fd84",
"qq": "wx45116b30f23e0cc4",
"kuwo": "wxc305711a2a7ad71c"
}
appid = appid_map.get(platform, "wx8dd6ecd81906fd84")
# 构造音乐卡片 XML
xml = f"""<appmsg appid="{appid}" sdkver="0">
<title>{name}</title>
<des>{artist}</des>
<action>view</action>
<type>3</type>
<showtype>0</showtype>
<content/>
<url>{url}</url>
<dataurl>{url}</dataurl>
<lowurl>{url}</lowurl>
<lowdataurl>{url}</lowdataurl>
<recorditem/>
<thumburl>{pic}</thumburl>
<messageaction/>
<laninfo/>
<extinfo/>
<sourceusername/>
<sourcedisplayname/>
<songlyric>{lrc}</songlyric>
<commenturl/>
<appattach>
<totallen>0</totallen>
<attachid/>
<emoticonmd5/>
<fileext/>
<aeskey/>
</appattach>
<webviewshared>
<publisherId/>
<publisherReqId>0</publisherReqId>
</webviewshared>
<weappinfo>
<pagepath/>
<username/>
<appid/>
<appservicetype>0</appservicetype>
</weappinfo>
<websearch/>
<songalbumurl>{pic}</songalbumurl>
</appmsg>"""
result = await bot.send_xml(to_wxid, xml)
if result:
logger.success(f"已发送音乐卡片: {name} - {artist}")
else:
# 发送失败,尝试重试
if retry > 0:
logger.warning(f"发送失败,{retry}秒后重试: {name}")
import asyncio
await asyncio.sleep(1)
await self.send_music_card(bot, to_wxid, song, retry - 1)
else:
logger.error(f"发送音乐卡片失败(已重试): {name} - {artist}")
except Exception as e:
logger.error(f"发送音乐卡片异常: {e}")
def _select_tool_songs(self, songs: List[dict]) -> List[dict]:
"""为工具调用挑选少量歌曲,避免刷屏。"""
if not songs:
return []
max_results = max(1, int(self.tool_max_results))
return list(songs[:max_results])
@on_text_message(priority=60)
async def handle_music_command(self, bot: WechatHookClient, message: dict):
"""处理点歌指令"""
content = message.get("Content", "").strip()
from_wxid = message.get("FromWxid", "")
# 精确匹配 "点歌 " 开头
if not content.startswith("点歌 "):
return True
# 提取歌曲名和参数
parts = content[3:].strip().split()
if not parts:
await bot.send_text(from_wxid, "❌ 请输入歌曲名\n格式:点歌 歌曲名 [3]")
return False
# 检查是否要发送全部平台
send_all = len(parts) > 1 and parts[-1] == "3"
keyword = " ".join(parts[:-1]) if send_all else " ".join(parts)
logger.info(f"点歌: {keyword}, 发送全部: {send_all}")
# 从三个平台搜索歌曲
songs = await self.search_music(keyword)
if not songs:
await bot.send_text(from_wxid, f"❌ 未找到歌曲:{keyword}")
return False
# 根据参数决定发送哪些
if send_all:
# 发送所有平台(添加延迟避免限流)
import asyncio
for i, song in enumerate(songs):
await self.send_music_card(bot, from_wxid, song)
if i < len(songs) - 1:
await asyncio.sleep(2)
else:
# 只发送 QQ 音乐
qq_song = next((s for s in songs if s.get("platform") == "qq"), None)
if qq_song:
await self.send_music_card(bot, from_wxid, qq_song)
else:
# 如果没有 QQ 音乐,发送第一个
await self.send_music_card(bot, from_wxid, songs[0])
logger.success(f"已发送歌曲")
return False
def get_llm_tools(self) -> List[dict]:
"""返回 LLM 工具定义"""
return [
{
"type": "function",
"function": {
"name": "search_music",
"description": (
"根据歌曲名/歌手名检索并发送音乐卡片。"
"仅当用户明确提出点歌、听歌、播放某首歌时调用;"
"如果只是询问歌词出处,优先先确认歌名再决定是否点歌。"
),
"parameters": {
"type": "object",
"properties": {
"keyword": {
"type": "string",
"description": "歌曲检索词,可为歌名、歌手名或二者组合,如“周杰伦 晴天”。"
}
},
"required": ["keyword"],
"additionalProperties": False
}
}
}
]
async def execute_llm_tool(self, tool_name: str, arguments: dict, bot, from_wxid: str) -> dict:
"""执行 LLM 工具调用"""
try:
if tool_name != "search_music":
return None
keyword = arguments.get("keyword")
if not keyword:
return {"success": False, "message": "缺少歌曲名称参数"}
logger.info(f"AI 调用点歌: {keyword}")
await bot.send_text(from_wxid, f"🔍 正在搜索:{keyword}")
# 从三个平台搜索歌曲
songs = await self.search_music(keyword)
if not songs:
await bot.send_text(from_wxid, f"❌ 未找到歌曲:{keyword}")
return {"success": False, "message": f"未找到歌曲:{keyword}"}
# 工具调用默认只发送少量结果,避免刷屏
import asyncio
selected_songs = self._select_tool_songs(songs)
for i, song in enumerate(selected_songs):
await self.send_music_card(bot, from_wxid, song)
if i < len(selected_songs) - 1: # 最后一个不需要延迟
await asyncio.sleep(2) # 每条消息间隔2秒
return {"success": True, "message": f"已发送 {len(selected_songs)} 首歌曲"}
except Exception as e:
logger.error(f"LLM 工具执行失败: {e}")
return {"success": False, "message": f"执行失败: {str(e)}"}