221 lines
8.1 KiB
Python
221 lines
8.1 KiB
Python
import logging
|
||
import requests
|
||
import lz4.block as lb
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from wcferry import Wcf
|
||
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.plugin_interface import PluginStatus
|
||
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
|
||
|
||
class MusicPlugin(MessagePluginInterface):
|
||
"""音乐点播插件"""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "音乐点播"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供音乐点播功能,支持QQ音乐和网易云音乐"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Trae AI"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.wcf = context.get("wcf")
|
||
self.event_system = context.get("event_system")
|
||
self.message_util = context.get("message_util")
|
||
|
||
self._commands = self._config.get("Music", {}).get("command", ["点歌", "音乐"])
|
||
self.command_format = self._config.get("Music", {}).get("command-format", "点歌 歌曲名")
|
||
self.enable = self._config.get("Music", {}).get("enable", True)
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.info(f"插件执行: {self.name}:{content}")
|
||
command = content.split(" ")[0]
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
wcf: Wcf = message.get("wcf")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
# 检查命令格式
|
||
if len(content.split(" ")) == 1:
|
||
wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "命令格式错误"
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, Feature.MUSIC) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# 提取歌曲名
|
||
user_song_name = content[len(command):].strip()
|
||
|
||
try:
|
||
# 搜索歌曲
|
||
song_info = self._search_song(user_song_name)
|
||
if not song_info or not song_info.get("play_url"):
|
||
wcf.send_text(f"-----Bot-----\n❌未找到歌曲:{user_song_name}",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "未找到歌曲"
|
||
|
||
# 发送音乐
|
||
self._send_music_message(wcf, song_info, roomid or sender)
|
||
return True, "发送成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理音乐请求出错: {e}")
|
||
return True, f"处理出错: {e}"
|
||
|
||
def _search_song(self, song_name: str) -> Dict[str, Any]:
|
||
"""搜索歌曲信息"""
|
||
try:
|
||
# 尝试QQ音乐API
|
||
fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={song_name}&n=1&num=1&type=json"
|
||
response = requests.get(fallback_api)
|
||
|
||
if response.status_code != 200:
|
||
self.LOG.error(f"API 请求失败,状态码: {response.status_code}")
|
||
return {}
|
||
|
||
json_data = response.json()
|
||
return {
|
||
"song_name": json_data.get('title', ''),
|
||
"singer_name": json_data.get('singer', ''),
|
||
"play_url": json_data.get('music_url', ''),
|
||
"singer_pic": json_data.get('cover', ''),
|
||
"data_url": json_data.get('link', '')
|
||
}
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"搜索歌曲出错: {e}")
|
||
return {}
|
||
|
||
def _send_music_message(self, wcf, song_info: Dict[str, Any], receiver: str) -> bool:
|
||
"""发送音乐消息"""
|
||
try:
|
||
song_name = song_info.get("song_name", "")
|
||
singer_name = song_info.get("singer_name", "")
|
||
play_url = song_info.get("play_url", "")
|
||
singer_pic = song_info.get("singer_pic", "")
|
||
data_url = song_info.get("data_url", "")
|
||
|
||
xml_message = f"""
|
||
<?xml version="1.0"?>
|
||
<msg>
|
||
<appmsg appid="wx8dd6ecd81906fd84" sdkver="0">
|
||
<title>{song_name}</title>
|
||
<des>{singer_name}\n❤Bot-祝您天天开心❤</des>
|
||
<action>view</action>
|
||
<type>3</type>
|
||
<showtype>0</showtype>
|
||
<content />
|
||
<url>{data_url}</url>
|
||
<dataurl>{play_url}</dataurl>
|
||
<lowurl/>
|
||
<lowdataurl/>
|
||
<recorditem />
|
||
<thumburl />
|
||
<messageaction />
|
||
<laninfo />
|
||
<extinfo />
|
||
<sourceusername />
|
||
<sourcedisplayname />
|
||
<commenturl />
|
||
<appattach>
|
||
<totallen>0</totallen>
|
||
<attachid />
|
||
<emoticonmd5></emoticonmd5>
|
||
<fileext />
|
||
<aeskey></aeskey>
|
||
</appattach>
|
||
<webviewshared>
|
||
<publisherId />
|
||
<publisherReqId>0</publisherReqId>
|
||
</webviewshared>
|
||
<weappinfo>
|
||
<pagepath />
|
||
<username />
|
||
<appid />
|
||
<appservicetype>0</appservicetype>
|
||
</weappinfo>
|
||
<websearch />
|
||
<songalbumurl>{singer_pic}</songalbumurl>
|
||
</appmsg>
|
||
<scene>0</scene>
|
||
<appinfo>
|
||
<version>49</version>
|
||
<appname></appname>
|
||
</appinfo>
|
||
<commenturl />
|
||
</msg>"""
|
||
|
||
# 修改消息数据库里面的消息content内容
|
||
text_bytes = xml_message.encode('utf-8')
|
||
compressed_data = lb.compress(text_bytes, store_size=False).hex()
|
||
|
||
data = wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
|
||
wcf.query_sql('MSG0.db',
|
||
f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3,
|
||
IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
|
||
)
|
||
|
||
result = wcf.forward_msg(data[0]["MsgSvrID"], receiver)
|
||
self.LOG.info(f"插件化:点歌发送结果: {result}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"发送音乐消息出错: {e}")
|
||
return False
|