1.删除原有music内容,使用插件music;
2.新建插件,秀人图功能
This commit is contained in:
@@ -1,168 +0,0 @@
|
|||||||
import logging
|
|
||||||
import tomllib
|
|
||||||
|
|
||||||
import requests
|
|
||||||
from wcferry import WxMsg, Wcf
|
|
||||||
|
|
||||||
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
|
||||||
import lz4.block as lb
|
|
||||||
|
|
||||||
|
|
||||||
class BotMusic:
|
|
||||||
def __init__(self, wcf: Wcf, gbm: GroupBotManager):
|
|
||||||
self.LOG = logging.getLogger(__name__)
|
|
||||||
self.wcf = wcf # 假设 wcf 对象在此类中初始化
|
|
||||||
self.gbm = gbm # 权限功能
|
|
||||||
with open("music/config.toml", "rb") as f:
|
|
||||||
plugin_config = tomllib.load(f)
|
|
||||||
|
|
||||||
config = plugin_config["Music"]
|
|
||||||
|
|
||||||
self.enable = config["enable"]
|
|
||||||
self.command = config["command"]
|
|
||||||
self.command_format = config["command-format"]
|
|
||||||
self.LOG.info(f"[点歌台] 组件初始化完成,指令: {self.command}")
|
|
||||||
|
|
||||||
def get_music(self, message: WxMsg):
|
|
||||||
if not self.enable:
|
|
||||||
return
|
|
||||||
|
|
||||||
content = str(message.content).strip()
|
|
||||||
command = content.split(" ")
|
|
||||||
|
|
||||||
if command[0] not in self.command:
|
|
||||||
return
|
|
||||||
|
|
||||||
if len(command) == 1:
|
|
||||||
self.wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}",
|
|
||||||
(message.roomid if message.from_group() else message.sender), message.sender)
|
|
||||||
return
|
|
||||||
|
|
||||||
# 如果触发了指令,但是没有权限,则返回权限不足
|
|
||||||
if self.gbm.get_group_permission(message.roomid, Feature.MUSIC) == PermissionStatus.DISABLED:
|
|
||||||
return
|
|
||||||
|
|
||||||
user_song_name = content[len(command[0]):].strip()
|
|
||||||
|
|
||||||
try:
|
|
||||||
short_play_api = f"https://qqmusic.qqovo.cn/getSearchByKey?key={user_song_name}&page=1&limit=1"
|
|
||||||
fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={user_song_name}&n=1&num=1&type=json"
|
|
||||||
|
|
||||||
response = requests.get(short_play_api)
|
|
||||||
if response.status_code == 400:
|
|
||||||
response = requests.get(fallback_api)
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
|
||||||
print(f"API 请求失败,状态码: {response.status_code}")
|
|
||||||
return
|
|
||||||
|
|
||||||
json_data = response.json()
|
|
||||||
result = json_data.get('response', {}).get('data', {}).get('song', {}).get('list', [])
|
|
||||||
if not result:
|
|
||||||
print("未找到匹配的歌曲")
|
|
||||||
return
|
|
||||||
|
|
||||||
first_song = result[0]
|
|
||||||
song_name = first_song.get('songname', '')
|
|
||||||
song_mid = first_song.get('songmid', '')
|
|
||||||
first_singer_name = first_song.get('singer', [{}])[0].get('name', '')
|
|
||||||
|
|
||||||
zhida_singer = json_data.get('response', {}).get('data', {}).get('zhida', {}).get('zhida_singer', {})
|
|
||||||
singer_pic = zhida_singer.get('singerPic', '') if zhida_singer else None
|
|
||||||
|
|
||||||
music_play_api = f"https://qqmusic.qqovo.cn/getMusicPlay?songmid={song_mid}&quality=m4a"
|
|
||||||
music_response = requests.get(music_play_api)
|
|
||||||
|
|
||||||
if music_response.status_code == 400:
|
|
||||||
print("获取播放链接失败,状态码 400,尝试备用接口")
|
|
||||||
music_response = requests.get(fallback_api)
|
|
||||||
|
|
||||||
if music_response.status_code != 200:
|
|
||||||
print(f"获取播放链接失败,状态码: {music_response.status_code}")
|
|
||||||
return
|
|
||||||
|
|
||||||
music_data = music_response.json()
|
|
||||||
play_url = music_data.get('data', {}).get('playUrl', {}).get(song_mid, {}).get('url', '')
|
|
||||||
|
|
||||||
if not play_url:
|
|
||||||
print("主接口play_url为空,尝试备用接口")
|
|
||||||
music_response = requests.get(fallback_api).json()
|
|
||||||
song_name = music_response.get('title', song_name)
|
|
||||||
first_singer_name = music_response.get('singer', first_singer_name)
|
|
||||||
play_url = music_response.get('music_url', '')
|
|
||||||
singer_pic = music_response.get('cover', singer_pic)
|
|
||||||
dataurl = music_response.get('link', '')
|
|
||||||
else:
|
|
||||||
dataurl = f"https://y.qq.com/n/ryqq/songDetail/{song_mid}"
|
|
||||||
|
|
||||||
if not play_url:
|
|
||||||
print("未获取到音乐播放链接")
|
|
||||||
return
|
|
||||||
|
|
||||||
except requests.RequestException as e:
|
|
||||||
self.wcf.send_text(f"-----Bot-----\n❌请求出错:{e}",
|
|
||||||
(message.roomid if message.from_group() else message.sender), message.sender)
|
|
||||||
return
|
|
||||||
|
|
||||||
xml_message = f"""
|
|
||||||
<?xml version="1.0"?>
|
|
||||||
<msg>
|
|
||||||
<appmsg appid="wx8dd6ecd81906fd84" sdkver="0">
|
|
||||||
<title>{song_name}</title>
|
|
||||||
<des>{first_singer_name}\n❤Bot-祝您天天开心❤</des>
|
|
||||||
<action>view</action>
|
|
||||||
<type>3</type>
|
|
||||||
<showtype>0</showtype>
|
|
||||||
<content />
|
|
||||||
<url>{dataurl}</url>
|
|
||||||
<dataurl>{play_url}</dataurl>
|
|
||||||
<lowurl/>
|
|
||||||
<lowdataurl/>
|
|
||||||
<recorditem />
|
|
||||||
<thumburl />
|
|
||||||
<messageaction />
|
|
||||||
<laninfo />
|
|
||||||
<extinfo />
|
|
||||||
<sourceusername />
|
|
||||||
<sourcedisplayname />
|
|
||||||
<commenturl />
|
|
||||||
<appattach>
|
|
||||||
<totallen>0</totallen>
|
|
||||||
<attachid />
|
|
||||||
<emoticonmd5></emoticonmd5>
|
|
||||||
<fileext />
|
|
||||||
<aeskey></aeskey>
|
|
||||||
</appattach>
|
|
||||||
<webviewshared>
|
|
||||||
<publisherId />
|
|
||||||
<publisherReqId>0</publisherReqId>
|
|
||||||
</webviewshared>
|
|
||||||
<weappinfo>
|
|
||||||
<pagepath />
|
|
||||||
<username />
|
|
||||||
<appid />
|
|
||||||
<appservicetype>0</appservicetype>
|
|
||||||
</weappinfo>
|
|
||||||
<websearch />
|
|
||||||
<songalbumurl>{singer_pic}</songalbumurl>
|
|
||||||
</appmsg>
|
|
||||||
<scene>0</scene>
|
|
||||||
<appinfo>
|
|
||||||
<version>49</version>
|
|
||||||
<appname></appname>
|
|
||||||
</appinfo>
|
|
||||||
<commenturl />
|
|
||||||
</msg>"""
|
|
||||||
|
|
||||||
# 修改消息数据库里面的消息content 内容
|
|
||||||
text_bytes = xml_message.encode('utf-8')
|
|
||||||
compressed_data = lb.compress(text_bytes, store_size=False).hex()
|
|
||||||
|
|
||||||
data = self.wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1")
|
|
||||||
self.wcf.query_sql('MSG0.db',
|
|
||||||
f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3,
|
|
||||||
IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}"""
|
|
||||||
)
|
|
||||||
|
|
||||||
result = self.wcf.forward_msg(data[0]["MsgSvrID"], message.roomid)
|
|
||||||
print(f"点歌发送:{result}")
|
|
||||||
@@ -1,8 +0,0 @@
|
|||||||
[Music]
|
|
||||||
enable = true
|
|
||||||
command = ["点歌", "音乐", "音乐点播", "点播音乐", "音乐点歌"]
|
|
||||||
command-format = """
|
|
||||||
-----Bot-----
|
|
||||||
🎵点歌指令:
|
|
||||||
点歌 歌曲名
|
|
||||||
"""
|
|
||||||
@@ -103,30 +103,7 @@ class StatsDashboardPlugin(PluginInterface):
|
|||||||
return content.strip().startswith("/stats")
|
return content.strip().startswith("/stats")
|
||||||
|
|
||||||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, str]:
|
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, str]:
|
||||||
"""处理消息"""
|
|
||||||
# 暂时不启用指令
|
|
||||||
return False, ""
|
return False, ""
|
||||||
# content = str(message.get("content", "")).strip()
|
|
||||||
# if content == "/stats start":
|
|
||||||
# if self.start_server():
|
|
||||||
# return True, "统计看板服务器已启动"
|
|
||||||
# else:
|
|
||||||
# return False, "启动统计看板服务器失败"
|
|
||||||
#
|
|
||||||
# elif content == "/stats stop":
|
|
||||||
# if self.stop_server():
|
|
||||||
# return True, "统计看板服务器已停止"
|
|
||||||
# else:
|
|
||||||
# return False, "停止统计看板服务器失败"
|
|
||||||
#
|
|
||||||
# elif content == "/stats status":
|
|
||||||
# if self.server_thread and self.server_thread.is_alive():
|
|
||||||
# return True, f"统计看板服务器正在运行,访问地址: http://{self.config['host']}:{self.config['port']}"
|
|
||||||
# else:
|
|
||||||
# return True, "统计看板服务器未运行"
|
|
||||||
#
|
|
||||||
# else:
|
|
||||||
# return True, f"统计看板命令格式错误,可用命令:\n/stats start - 启动服务器\n/stats stop - 停止服务器\n/stats status - 查看状态"
|
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
"""关闭插件"""
|
"""关闭插件"""
|
||||||
|
|||||||
7
plugins/xiuren_image/__init__.py
Normal file
7
plugins/xiuren_image/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
# 从当前包的main模块导入XiurenImagePlugin类
|
||||||
|
from .main import XiurenImagePlugin
|
||||||
|
|
||||||
|
# 提供get_plugin函数,返回插件实例
|
||||||
|
def get_plugin():
|
||||||
|
"""获取插件实例"""
|
||||||
|
return XiurenImagePlugin()
|
||||||
9
plugins/xiuren_image/config.toml
Normal file
9
plugins/xiuren_image/config.toml
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
[XiurenImage]
|
||||||
|
enable = true
|
||||||
|
command = ["图来", "秀人", "美图", "随机图片"]
|
||||||
|
command-format = """
|
||||||
|
-----Bot-----
|
||||||
|
🖼️秀人图片指令:
|
||||||
|
图来
|
||||||
|
秀人
|
||||||
|
"""
|
||||||
143
plugins/xiuren_image/main.py
Normal file
143
plugins/xiuren_image/main.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
from typing import Dict, Any, List, Optional, Tuple
|
||||||
|
|
||||||
|
from wcferry import Wcf
|
||||||
|
|
||||||
|
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||||||
|
from plugin_common.plugin_interface import PluginStatus
|
||||||
|
from plugins.stats_collector.decorators import plugin_stats_decorator
|
||||||
|
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||||||
|
|
||||||
|
|
||||||
|
class XiurenImagePlugin(MessagePluginInterface):
|
||||||
|
"""秀人图片插件"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return "秀人图片[xiuren_image]"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def version(self) -> str:
|
||||||
|
return "1.0.0"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def description(self) -> str:
|
||||||
|
return "提供随机秀人网图片功能"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def author(self) -> str:
|
||||||
|
return "Trae AI"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def command_prefix(self) -> Optional[str]:
|
||||||
|
return "" # 不需要前缀,直接匹配命令
|
||||||
|
|
||||||
|
@property
|
||||||
|
def commands(self) -> List[str]:
|
||||||
|
return self._commands
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "xiuren", "pics")
|
||||||
|
|
||||||
|
def initialize(self, context: Dict[str, Any]) -> bool:
|
||||||
|
"""初始化插件"""
|
||||||
|
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||||||
|
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||||||
|
|
||||||
|
# 保存上下文对象
|
||||||
|
self.wcf = context.get("wcf")
|
||||||
|
self.event_system = context.get("event_system")
|
||||||
|
self.message_util = context.get("message_util")
|
||||||
|
|
||||||
|
self._commands = self._config.get("XiurenImage", {}).get("command", ["图来", "秀人"])
|
||||||
|
self.command_format = self._config.get("XiurenImage", {}).get("command-format", "图来")
|
||||||
|
self.enable = self._config.get("XiurenImage", {}).get("enable", True)
|
||||||
|
|
||||||
|
# 检查图片文件夹是否存在
|
||||||
|
if not os.path.exists(self.image_folder):
|
||||||
|
self.LOG.warning(f"图片文件夹不存在: {self.image_folder}")
|
||||||
|
os.makedirs(self.image_folder, exist_ok=True)
|
||||||
|
|
||||||
|
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def start(self) -> bool:
|
||||||
|
"""启动插件"""
|
||||||
|
self.LOG.info(f"[{self.name}] 插件已启动")
|
||||||
|
self.status = PluginStatus.RUNNING
|
||||||
|
return True
|
||||||
|
|
||||||
|
def stop(self) -> bool:
|
||||||
|
"""停止插件"""
|
||||||
|
self.LOG.info(f"[{self.name}] 插件已停止")
|
||||||
|
self.status = PluginStatus.STOPPED
|
||||||
|
return True
|
||||||
|
|
||||||
|
def can_process(self, message: Dict[str, Any]) -> bool:
|
||||||
|
"""检查是否可以处理该消息"""
|
||||||
|
if not self.enable:
|
||||||
|
return False
|
||||||
|
|
||||||
|
content = str(message.get("content", "")).strip()
|
||||||
|
command = content.split(" ")[0]
|
||||||
|
|
||||||
|
return command in self._commands
|
||||||
|
|
||||||
|
@plugin_stats_decorator(plugin_name="秀人图片")
|
||||||
|
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||||||
|
"""处理消息"""
|
||||||
|
content = str(message.get("content", "")).strip()
|
||||||
|
self.LOG.info(f"插件执行: {self.name}:{content}")
|
||||||
|
sender = message.get("sender")
|
||||||
|
roomid = message.get("roomid", "")
|
||||||
|
wcf: Wcf = message.get("wcf")
|
||||||
|
gbm: GroupBotManager = message.get("gbm")
|
||||||
|
|
||||||
|
# 检查权限
|
||||||
|
if roomid and gbm.get_group_permission(roomid, Feature.PIC) == PermissionStatus.DISABLED:
|
||||||
|
return False, "没有权限"
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 获取随机图片
|
||||||
|
pic_path = self._get_random_pic()
|
||||||
|
if not pic_path:
|
||||||
|
wcf.send_text(f"-----Bot-----\n❌未找到图片资源",
|
||||||
|
(roomid if roomid else sender), sender)
|
||||||
|
return True, "未找到图片资源"
|
||||||
|
|
||||||
|
# 发送图片
|
||||||
|
result = wcf.send_file(pic_path, (roomid if roomid else sender))
|
||||||
|
self.LOG.info(f"发送图片结果: {result}")
|
||||||
|
return True, "发送成功"
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.LOG.error(f"处理图片请求出错: {e}")
|
||||||
|
return True, f"处理出错: {e}"
|
||||||
|
|
||||||
|
def _get_random_pic(self) -> Optional[str]:
|
||||||
|
"""获取随机图片路径"""
|
||||||
|
try:
|
||||||
|
# 获取图片文件夹中的所有图片
|
||||||
|
if not os.path.exists(self.image_folder):
|
||||||
|
self.LOG.error(f"图片文件夹不存在: {self.image_folder}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
image_files = [f for f in os.listdir(self.image_folder)
|
||||||
|
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))]
|
||||||
|
|
||||||
|
if not image_files:
|
||||||
|
self.LOG.error("图片文件夹中没有图片")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 随机选择一张图片
|
||||||
|
random_pic = random.choice(image_files)
|
||||||
|
pic_path = os.path.join(self.image_folder, random_pic)
|
||||||
|
|
||||||
|
return pic_path
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.LOG.error(f"获取随机图片出错: {e}")
|
||||||
|
return None
|
||||||
Reference in New Issue
Block a user