From 3bfa1f6c9845d37775b610335e3e603a9e57f144 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 19 Mar 2025 15:43:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E6=8F=92=E4=BB=B6=EF=BC=9A?= =?UTF-8?q?=E9=BB=91=E4=B8=9D=E8=A7=86=E9=A2=91=EF=BC=8C=E7=BE=8E=E8=85=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/beautyleg/__init__.py | 7 ++ plugins/beautyleg/config.toml | 9 ++ plugins/beautyleg/main.py | 150 +++++++++++++++++++++++++++++++ plugins/video/__init__.py | 7 ++ plugins/video/config.toml | 8 ++ plugins/video/main.py | 161 ++++++++++++++++++++++++++++++++++ robot.py | 18 ---- 7 files changed, 342 insertions(+), 18 deletions(-) create mode 100644 plugins/beautyleg/__init__.py create mode 100644 plugins/beautyleg/config.toml create mode 100644 plugins/beautyleg/main.py create mode 100644 plugins/video/__init__.py create mode 100644 plugins/video/config.toml create mode 100644 plugins/video/main.py diff --git a/plugins/beautyleg/__init__.py b/plugins/beautyleg/__init__.py new file mode 100644 index 0000000..9c4e619 --- /dev/null +++ b/plugins/beautyleg/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入BeautyLegPlugin类 +from .main import BeautyLegPlugin + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return BeautyLegPlugin() \ No newline at end of file diff --git a/plugins/beautyleg/config.toml b/plugins/beautyleg/config.toml new file mode 100644 index 0000000..21403ac --- /dev/null +++ b/plugins/beautyleg/config.toml @@ -0,0 +1,9 @@ +[Beautyleg] +enable = true +command = ["美腿", "腿来"] +command-format = """ +-----Bot----- +🦵美腿图片指令: +美腿 +腿来 +""" \ No newline at end of file diff --git a/plugins/beautyleg/main.py b/plugins/beautyleg/main.py new file mode 100644 index 0000000..4b63560 --- /dev/null +++ b/plugins/beautyleg/main.py @@ -0,0 +1,150 @@ +import logging +import os +import random +from typing import Dict, Any, List, Optional, Tuple + +from wcferry import Wcf + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from plugins.stats_collector.decorators import plugin_stats_decorator +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + + +class BeautyLegPlugin(MessagePluginInterface): + """美腿图片插件""" + + @property + def name(self) -> str: + return "美腿图片" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供随机美腿图片功能" + + @property + def author(self) -> str: + return "Trae AI" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "beautyleg", "download_dir") + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.event_system = context.get("event_system") + self.message_util = context.get("message_util") + self.gbm = context.get("gbm") + + self._commands = self._config.get("Beautyleg", {}).get("command", ["美腿", "腿来"]) + self.command_format = self._config.get("Beautyleg", {}).get("command-format", "美腿") + self.enable = self._config.get("Beautyleg", {}).get("enable", True) + + # 检查图片文件夹是否存在 + if not os.path.exists(self.image_folder): + self.LOG.warning(f"图片文件夹不存在: {self.image_folder}") + os.makedirs(self.image_folder, exist_ok=True) + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + + return command in self._commands + + @plugin_stats_decorator(plugin_name="美腿图片") + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.info(f"插件执行: {self.name}:{content}") + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf: Wcf = message.get("wcf") + gbm: GroupBotManager = message.get("gbm") + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, Feature.BEAUTY_LEG) == PermissionStatus.DISABLED: + return False, "没有权限" + + try: + # 获取随机图片 + random_file_path = self._get_random_file_from_dir(self.image_folder) + if not random_file_path: + wcf.send_text(f"\n❌未找到美腿图片资源", + (roomid if roomid else sender), sender) + return True, "未找到图片资源" + + # 发送图片 + random_file_path = os.path.abspath(random_file_path) + self.LOG.info(f"BeautyLeg.random_file_path: {random_file_path}") + result = wcf.send_file(random_file_path, (roomid if roomid else sender)) + self.LOG.info(f"发送图片结果: {result}") + return True, "发送成功" + + except Exception as e: + self.LOG.error(f"处理美腿图片请求出错: {e}") + return True, f"处理出错: {e}" + + def _get_random_file_from_dir(self, directory): + """获取随机图片路径""" + image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'} + image_files = [] + + if not os.path.exists(directory): + self.LOG.error(f"Error: Directory '{directory}' does not exist.") + return None + + if not os.access(directory, os.R_OK): + self.LOG.error(f"Error: No read access to directory '{directory}'.") + return None + + self.LOG.info(f"Scanning directory: {directory} (including subdirectories)") + + # 使用 os.walk() 递归遍历所有子目录 + for root, _, files in os.walk(directory): + for file in files: + _, ext = os.path.splitext(file) + if ext.lower() in image_extensions: + full_path = os.path.join(root, file) + image_files.append(full_path) + + if not image_files: + self.LOG.error("No image files found in the directory (including subdirectories).") + return None + + return random.choice(image_files) \ No newline at end of file diff --git a/plugins/video/__init__.py b/plugins/video/__init__.py new file mode 100644 index 0000000..687a708 --- /dev/null +++ b/plugins/video/__init__.py @@ -0,0 +1,7 @@ +# 从当前包的main模块导入VideoPlugin类 +from .main import VideoPlugin + +# 提供get_plugin函数,返回插件实例 +def get_plugin(): + """获取插件实例""" + return VideoPlugin() \ No newline at end of file diff --git a/plugins/video/config.toml b/plugins/video/config.toml new file mode 100644 index 0000000..bab9f5a --- /dev/null +++ b/plugins/video/config.toml @@ -0,0 +1,8 @@ +[Video] +enable = true +command = ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"] +command-format = """ +-----Bot----- +🎬视频指令: +黑丝 +""" \ No newline at end of file diff --git a/plugins/video/main.py b/plugins/video/main.py new file mode 100644 index 0000000..e0c2a0c --- /dev/null +++ b/plugins/video/main.py @@ -0,0 +1,161 @@ +import logging +import os +import requests +from typing import Dict, Any, List, Optional, Tuple + +from wcferry import Wcf + +from plugin_common.message_plugin_interface import MessagePluginInterface +from plugin_common.plugin_interface import PluginStatus +from plugins.stats_collector.decorators import plugin_stats_decorator +from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager + + +class VideoPlugin(MessagePluginInterface): + """视频插件""" + + @property + def name(self) -> str: + return "黑丝视频" + + @property + def version(self) -> str: + return "1.0.0" + + @property + def description(self) -> str: + return "提供视频点播功能" + + @property + def author(self) -> str: + return "Trae AI" + + @property + def command_prefix(self) -> Optional[str]: + return "" # 不需要前缀,直接匹配命令 + + @property + def commands(self) -> List[str]: + return self._commands + + def __init__(self): + super().__init__() + # 修改为使用插件目录下的down_load_dir文件夹 + self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") + + def initialize(self, context: Dict[str, Any]) -> bool: + """初始化插件""" + self.LOG = logging.getLogger(f"Plugin.{self.name}") + self.LOG.info(f"正在初始化 {self.name} 插件...") + + # 保存上下文对象 + self.wcf = context.get("wcf") + self.event_system = context.get("event_system") + self.message_util = context.get("message_util") + self.gbm = context.get("gbm") + + self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"]) + self.command_format = self._config.get("Video", {}).get("command-format", "黑丝") + self.enable = self._config.get("Video", {}).get("enable", True) + + # 确保下载目录存在 + if not os.path.exists(self.download_dir): + os.makedirs(self.download_dir, exist_ok=True) + + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + return True + + def start(self) -> bool: + """启动插件""" + self.LOG.info(f"[{self.name}] 插件已启动") + self.status = PluginStatus.RUNNING + return True + + def stop(self) -> bool: + """停止插件""" + self.LOG.info(f"[{self.name}] 插件已停止") + self.status = PluginStatus.STOPPED + return True + + def can_process(self, message: Dict[str, Any]) -> bool: + """检查是否可以处理该消息""" + if not self.enable: + return False + + content = str(message.get("content", "")).strip() + command = content.split(" ")[0] + + return command in self._commands + + @plugin_stats_decorator(plugin_name="视频插件") + def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + """处理消息""" + content = str(message.get("content", "")).strip() + self.LOG.info(f"插件执行: {self.name}:{content}") + sender = message.get("sender") + roomid = message.get("roomid", "") + wcf: Wcf = message.get("wcf") + gbm: GroupBotManager = message.get("gbm") + + # 检查权限 + if roomid and gbm.get_group_permission(roomid, Feature.VIDEO) == PermissionStatus.DISABLED: + return False, "没有权限" + + try: + # 下载视频 + save_path = os.path.join(self.download_dir, "video.mp4") + file_abspath = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path) + + if not file_abspath or not file_abspath.endswith("mp4"): + wcf.send_text(f"\n❌视频下载失败,请稍后再试", + (roomid if roomid else sender), sender) + return True, "视频下载失败" + + # 发送视频 + result = wcf.send_file(file_abspath, (roomid if roomid else sender)) + self.LOG.info(f"发送视频结果: {result}") + return True, "发送成功" + + except Exception as e: + self.LOG.error(f"处理视频请求出错: {e}") + wcf.send_text(f"\n❌请求出错:{e}", + (roomid if roomid else sender), sender) + return True, f"处理出错: {e}" + + def _download_stream(self, url, save_path): + """ + 从指定URL读取视频流并保存到本地 + :param url: 视频流的URL + :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") + """ + try: + # 发送GET请求,启用流式传输 + response = requests.get(url, stream=True) + + # 检查请求是否成功 + response.raise_for_status() # 如果状态码不是200,将抛出异常 + + # 确保保存路径的目录存在 + os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) + + # 检查是否是视频流(可选,根据Content-Type判断) + content_type = response.headers.get("Content-Type", "").lower() + if "video" not in content_type and "application/octet-stream" not in content_type: + self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") + self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 + return None + + # 以二进制写入模式保存流数据 + with open(save_path, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB + if chunk: # 过滤空块 + file.write(chunk) + self.LOG.info(f"视频已下载到: {save_path}") + return os.path.abspath(save_path) + except requests.RequestException as e: + self.LOG.error(f"请求失败: {e}") + except IOError as e: + self.LOG.error(f"文件写入失败: {e}") + except Exception as e: + self.LOG.error(f"发生未知错误: {e}") + return None \ No newline at end of file diff --git a/robot.py b/robot.py index cbcc563..33b3094 100644 --- a/robot.py +++ b/robot.py @@ -20,7 +20,6 @@ from base.func_news import News from base.func_tigerbot import TigerBot from base.func_xinghuo_web import XinghuoWeb from base.func_claude import Claude -from beautyleg.beauty_leg import BeautyLeg from configuration import Config from constants import ChatType from dify.dify_chat import DifyChat @@ -29,7 +28,6 @@ from game_task.game_task_encyclopedia import game_process_message, get_group_ids from group_add.main import GroupAdd from group_auto.group_auto_invite import get_first_group_id, process_command from group_auto.group_member_change import GroupMemberChange -from group_video.bot_video import BotVideo from group_video_man.bot_video_man import BotVideoMan from message_sign.main import SignInSystem from message_storage.message_to_db import MessageStorage @@ -125,12 +123,8 @@ class Robot(Job): self.signin = SignInSystem(wcf, self.gbm, self.allContacts, self.db_pool, self.redis_pool, self.message_util) # 积分赠送功能加载 self.trade = PointTrade(wcf, self.gbm, self.db_pool) - # 获取视频模块 - self.video = BotVideo(wcf, self.gbm) # 肌肉男视频 self.videoman = BotVideoMan(wcf, self.gbm) - # 美腿模块 - self.beautyleg = BeautyLeg(wcf, self.gbm) # 加群测试 self.group_add = GroupAdd(wcf, self.gbm) # 抖音转视频 @@ -338,23 +332,11 @@ class Robot(Job): self.trade.handle_text(message=msg) except Exception as e: self.LOG.error(f"point trade error: {e}") - - # 加入黑丝视频功能 - try: - self.video.get_video(message=msg) - except Exception as e: - self.LOG.error(f"video get_video error: {e}") - # 加入肌肉男视频功能 try: self.videoman.get_video(message=msg) except Exception as e: self.LOG.error(f"videoman get_video error: {e}") - # 美腿功能 - try: - self.beautyleg.handle_message(message=msg) - except Exception as e: - self.LOG.error(f"beautyleg.handle_text error: {e}") # 抖音组件 try: self.douyin.handle_douyin_links(message=msg)