From 0b3dce8c23ce7b04933fc757a10284d84c1bcafd Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 19 Mar 2025 14:58:08 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=88=A0=E9=99=A4=E5=8E=9F=E6=9C=89music?= =?UTF-8?q?=E5=86=85=E5=AE=B9=EF=BC=8C=E4=BD=BF=E7=94=A8=E6=8F=92=E4=BB=B6?= =?UTF-8?q?music=EF=BC=9B=202.=E6=96=B0=E5=BB=BA=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=EF=BC=8C=E7=A7=80=E4=BA=BA=E5=9B=BE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- music/bot_music.py | 168 ------------------------------- music/config.toml | 8 -- plugins/stats_dashboard/main.py | 23 ----- plugins/xiuren_image/__init__.py | 7 ++ plugins/xiuren_image/config.toml | 9 ++ plugins/xiuren_image/main.py | 143 ++++++++++++++++++++++++++ 6 files changed, 159 insertions(+), 199 deletions(-) delete mode 100644 music/bot_music.py delete mode 100644 music/config.toml create mode 100644 plugins/xiuren_image/__init__.py create mode 100644 plugins/xiuren_image/config.toml create mode 100644 plugins/xiuren_image/main.py diff --git a/music/bot_music.py b/music/bot_music.py deleted file mode 100644 index ce1d54a..0000000 --- a/music/bot_music.py +++ /dev/null @@ -1,168 +0,0 @@ -import logging -import tomllib - -import requests -from wcferry import WxMsg, Wcf - -from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager -import lz4.block as lb - - -class BotMusic: - def __init__(self, wcf: Wcf, gbm: GroupBotManager): - self.LOG = logging.getLogger(__name__) - self.wcf = wcf # 假设 wcf 对象在此类中初始化 - self.gbm = gbm # 权限功能 - with open("music/config.toml", "rb") as f: - plugin_config = tomllib.load(f) - - config = plugin_config["Music"] - - self.enable = config["enable"] - self.command = config["command"] - self.command_format = config["command-format"] - self.LOG.info(f"[点歌台] 组件初始化完成,指令: {self.command}") - - def get_music(self, message: WxMsg): - if not self.enable: - return - - content = str(message.content).strip() - command = content.split(" ") - - if command[0] not in self.command: - return - - if len(command) == 1: - self.wcf.send_text(f"-----Bot-----\n❌命令格式错误!\n{self.command_format}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - # 如果触发了指令,但是没有权限,则返回权限不足 - if self.gbm.get_group_permission(message.roomid, Feature.MUSIC) == PermissionStatus.DISABLED: - return - - user_song_name = content[len(command[0]):].strip() - - try: - short_play_api = f"https://qqmusic.qqovo.cn/getSearchByKey?key={user_song_name}&page=1&limit=1" - fallback_api = f"https://www.hhlqilongzhu.cn/api/dg_wyymusic.php?gm={user_song_name}&n=1&num=1&type=json" - - response = requests.get(short_play_api) - if response.status_code == 400: - response = requests.get(fallback_api) - - if response.status_code != 200: - print(f"API 请求失败,状态码: {response.status_code}") - return - - json_data = response.json() - result = json_data.get('response', {}).get('data', {}).get('song', {}).get('list', []) - if not result: - print("未找到匹配的歌曲") - return - - first_song = result[0] - song_name = first_song.get('songname', '') - song_mid = first_song.get('songmid', '') - first_singer_name = first_song.get('singer', [{}])[0].get('name', '') - - zhida_singer = json_data.get('response', {}).get('data', {}).get('zhida', {}).get('zhida_singer', {}) - singer_pic = zhida_singer.get('singerPic', '') if zhida_singer else None - - music_play_api = f"https://qqmusic.qqovo.cn/getMusicPlay?songmid={song_mid}&quality=m4a" - music_response = requests.get(music_play_api) - - if music_response.status_code == 400: - print("获取播放链接失败,状态码 400,尝试备用接口") - music_response = requests.get(fallback_api) - - if music_response.status_code != 200: - print(f"获取播放链接失败,状态码: {music_response.status_code}") - return - - music_data = music_response.json() - play_url = music_data.get('data', {}).get('playUrl', {}).get(song_mid, {}).get('url', '') - - if not play_url: - print("主接口play_url为空,尝试备用接口") - music_response = requests.get(fallback_api).json() - song_name = music_response.get('title', song_name) - first_singer_name = music_response.get('singer', first_singer_name) - play_url = music_response.get('music_url', '') - singer_pic = music_response.get('cover', singer_pic) - dataurl = music_response.get('link', '') - else: - dataurl = f"https://y.qq.com/n/ryqq/songDetail/{song_mid}" - - if not play_url: - print("未获取到音乐播放链接") - return - - except requests.RequestException as e: - self.wcf.send_text(f"-----Bot-----\n❌请求出错:{e}", - (message.roomid if message.from_group() else message.sender), message.sender) - return - - xml_message = f""" - - - - {song_name} - {first_singer_name}\n❤Bot-祝您天天开心❤ - view - 3 - 0 - - {dataurl} - {play_url} - - - - - - - - - - - - 0 - - - - - - - - 0 - - - - - - 0 - - - {singer_pic} - - 0 - - 49 - - - - """ - - # 修改消息数据库里面的消息content 内容 - text_bytes = xml_message.encode('utf-8') - compressed_data = lb.compress(text_bytes, store_size=False).hex() - - data = self.wcf.query_sql('MSG0.db', "SELECT * FROM MSG where type = 49 limit 1") - self.wcf.query_sql('MSG0.db', - f"""UPDATE MSG SET CompressContent = x'{compressed_data}', BytesExtra=x'', type=49, SubType=3, - IsSender=0, TalkerId=2 WHERE MsgSvrID={data[0]['MsgSvrID']}""" - ) - - result = self.wcf.forward_msg(data[0]["MsgSvrID"], message.roomid) - print(f"点歌发送:{result}") diff --git a/music/config.toml b/music/config.toml deleted file mode 100644 index 50adc7b..0000000 --- a/music/config.toml +++ /dev/null @@ -1,8 +0,0 @@ -[Music] -enable = true -command = ["点歌", "音乐", "音乐点播", "点播音乐", "音乐点歌"] -command-format = """ ------Bot----- -🎵点歌指令: -点歌 歌曲名 -""" \ No newline at end of file diff --git a/plugins/stats_dashboard/main.py b/plugins/stats_dashboard/main.py index 6e1563e..4324e6b 100644 --- a/plugins/stats_dashboard/main.py +++ b/plugins/stats_dashboard/main.py @@ -103,30 +103,7 @@ class StatsDashboardPlugin(PluginInterface): return content.strip().startswith("/stats") def process_message(self, message: Dict[str, Any]) -> Tuple[bool, str]: - """处理消息""" - # 暂时不启用指令 return False, "" - # content = str(message.get("content", "")).strip() - # if content == "/stats start": - # if self.start_server(): - # return True, "统计看板服务器已启动" - # else: - # return False, "启动统计看板服务器失败" - # - # elif content == "/stats stop": - # if self.stop_server(): - # return True, "统计看板服务器已停止" - # else: - # return False, "停止统计看板服务器失败" - # - # elif content == "/stats status": - # if self.server_thread and self.server_thread.is_alive(): - # return True, f"统计看板服务器正在运行,访问地址: http://{self.config['host']}:{self.config['port']}" - # else: - # return True, "统计看板服务器未运行" - # - # else: - # return True, f"统计看板命令格式错误,可用命令:\n/stats start - 启动服务器\n/stats stop - 停止服务器\n/stats status - 查看状态" def shutdown(self) -> None: """关闭插件""" diff --git a/plugins/xiuren_image/__init__.py b/plugins/xiuren_image/__init__.py new file mode 100644 index 0000000..0c5a27f --- /dev/null +++ b/plugins/xiuren_image/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入XiurenImagePlugin类 +from .main import XiurenImagePlugin + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return XiurenImagePlugin() \ No newline at end of file diff --git a/plugins/xiuren_image/config.toml b/plugins/xiuren_image/config.toml new file mode 100644 index 0000000..fd7e9d2 --- /dev/null +++ b/plugins/xiuren_image/config.toml @@ -0,0 +1,9 @@ +[XiurenImage] +enable = true +command = ["图来", "秀人", "美图", "随机图片"] +command-format = """ +-----Bot----- +🖼️秀人图片指令: +图来 +秀人 +""" \ No newline at end of file diff --git a/plugins/xiuren_image/main.py b/plugins/xiuren_image/main.py new file mode 100644 index 0000000..1cb64e6 --- /dev/null +++ b/plugins/xiuren_image/main.py @@ -0,0 +1,143 @@ +import logging +import os +import random +from typing import Dict, Any, List, Optional, Tuple + +from wcferry import Wcf + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from plugins.stats_collector.decorators import plugin_stats_decorator +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + + +class XiurenImagePlugin(MessagePluginInterface): + """秀人图片插件""" + + @property + def name(self) -> str: + return "秀人图片[xiuren_image]" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供随机秀人网图片功能" + + @property + def author(self) -> str: + return "Trae AI" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "xiuren", "pics") + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.event_system = context.get("event_system") + self.message_util = context.get("message_util") + + self._commands = self._config.get("XiurenImage", {}).get("command", ["图来", "秀人"]) + self.command_format = self._config.get("XiurenImage", {}).get("command-format", "图来") + self.enable = self._config.get("XiurenImage", {}).get("enable", True) + + # 检查图片文件夹是否存在 + if not os.path.exists(self.image_folder): + self.LOG.warning(f"图片文件夹不存在: {self.image_folder}") + os.makedirs(self.image_folder, exist_ok=True) + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + + return command in self._commands + + @plugin_stats_decorator(plugin_name="秀人图片") + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.info(f"插件执行: {self.name}:{content}") + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf: Wcf = message.get("wcf") + gbm: GroupBotManager = message.get("gbm") + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, Feature.PIC) == PermissionStatus.DISABLED: + return False, "没有权限" + + try: + # 获取随机图片 + pic_path = self._get_random_pic() + if not pic_path: + wcf.send_text(f"-----Bot-----\n❌未找到图片资源", + (roomid if roomid else sender), sender) + return True, "未找到图片资源" + + # 发送图片 + result = wcf.send_file(pic_path, (roomid if roomid else sender)) + self.LOG.info(f"发送图片结果: {result}") + return True, "发送成功" + + except Exception as e: + self.LOG.error(f"处理图片请求出错: {e}") + return True, f"处理出错: {e}" + + def _get_random_pic(self) -> Optional[str]: + """获取随机图片路径""" + try: + # 获取图片文件夹中的所有图片 + if not os.path.exists(self.image_folder): + self.LOG.error(f"图片文件夹不存在: {self.image_folder}") + return None + + image_files = [f for f in os.listdir(self.image_folder) + if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp'))] + + if not image_files: + self.LOG.error("图片文件夹中没有图片") + return None + + # 随机选择一张图片 + random_pic = random.choice(image_files) + pic_path = os.path.join(self.image_folder, random_pic) + + return pic_path + + except Exception as e: + self.LOG.error(f"获取随机图片出错: {e}") + return None \ No newline at end of file