Files
abot/plugins/music/main.py
2025-03-19 11:55:31 +08:00

224 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import requests
import lz4.block as lb
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class MusicPlugin(MessagePluginInterface):
"""音乐点播插件"""
@property
def name(self) -> str:
return "音乐点播"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供音乐点播功能支持QQ音乐和网易云音乐"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
# 在初始化时动态应用装饰器
self.process_message = plugin_stats_decorator(plugin_name=self.name)(self.process_message)
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self._commands = self._config.get("Music", {}).get("command", ["点歌", "音乐"])
self.command_format = self._config.get("Music", {}).get("command-format", "点歌 歌曲名")
self.enable = self._config.get("Music", {}).get("enable", True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查命令格式
if len(content.split(" ")) == 1:
wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}",
(roomid if roomid else sender), sender)
return True, "命令格式错误"
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.MUSIC) == PermissionStatus.DISABLED:
return False, "没有权限"
# 提取歌曲名
user_song_name = content[len(command):].strip()
try:
# 搜索歌曲
song_info = self._search_song(user_song_name)
if not song_info or not song_info.get("play_url"):
wcf.send_text(f"-----Bot-----\n❌未找到歌曲:{user_song_name}",
(roomid if roomid else sender), sender)
return True, "未找到歌曲"
# 发送音乐
self._send_music_message(wcf, song_info, roomid or sender)
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理音乐请求出错: {e}")
return True, f"处理出错: {e}"
def _search_song(self, song_name: str) -> Dict[str, Any]:
"""搜索歌曲信息"""
try:
# 尝试QQ音乐API
fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={song_name}&n=1&num=1&type=json"
response = requests.get(fallback_api)
if response.status_code != 200:
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
return {}
json_data = response.json()
return {
"song_name": json_data.get('title', ''),
"singer_name": json_data.get('singer', ''),
"play_url": json_data.get('music_url', ''),
"singer_pic": json_data.get('cover', ''),
"data_url": json_data.get('link', '')
}
except Exception as e:
self.LOG.error(f"搜索歌曲出错: {e}")
return {}
def _send_music_message(self, wcf, song_info: Dict[str, Any], receiver: str) -> bool:
"""发送音乐消息"""
try:
song_name = song_info.get("song_name", "")
singer_name = song_info.get("singer_name", "")
play_url = song_info.get("play_url", "")
singer_pic = song_info.get("singer_pic", "")
data_url = song_info.get("data_url", "")
xml_message = f"""
<?xml version="1.0"?>
<msg>
<appmsg appid="wx8dd6ecd81906fd84" sdkver="0">
<title>{song_name}</title>
<des>{singer_name}\n❤Bot-祝您天天开心❤</des>
<action>view</action>
<type>3</type>
<showtype>0</showtype>
<content />
<url>{data_url}</url>
<dataurl>{play_url}</dataurl>
<lowurl/>
<lowdataurl/>
<recorditem />
<thumburl />
<messageaction />
<laninfo />
<extinfo />
<sourceusername />
<sourcedisplayname />
<commenturl />
<appattach>
<totallen>0</totallen>
<attachid />
<emoticonmd5></emoticonmd5>
<fileext />
<aeskey></aeskey>
</appattach>
<webviewshared>
<publisherId />
<publisherReqId>0</publisherReqId>
</webviewshared>
<weappinfo>
<pagepath />
<username />
<appid />
<appservicetype>0</appservicetype>
</weappinfo>
<websearch />
<songalbumurl>{singer_pic}</songalbumurl>
</appmsg>
<scene>0</scene>
<appinfo>
<version>49</version>
<appname></appname>
</appinfo>
<commenturl />
</msg>"""
# 修改消息数据库里面的消息content内容
text_bytes = xml_message.encode('utf-8')
compressed_data = lb.compress(text_bytes, store_size=False).hex()
data = wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
wcf.query_sql('MSG0.db',
f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3,
IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
)
result = wcf.forward_msg(data[0]["MsgSvrID"], receiver)
self.LOG.info(f"插件化:点歌发送结果: {result}")
return True
except Exception as e:
self.LOG.error(f"发送音乐消息出错: {e}")
return False