加入插件:黑丝视频,美腿

This commit is contained in:
liuwei
2025-03-19 15:43:17 +08:00
parent 8665b418ba
commit 3bfa1f6c98
7 changed files with 342 additions and 18 deletions

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入BeautyLegPlugin类
from .main import BeautyLegPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return BeautyLegPlugin()

View File

@@ -0,0 +1,9 @@
[Beautyleg]
enable = true
command = ["美腿", "腿来"]
command-format = """
-----Bot-----
🦵美腿图片指令:
美腿
腿来
"""

150
plugins/beautyleg/main.py Normal file
View File

@@ -0,0 +1,150 @@
import logging
import os
import random
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class BeautyLegPlugin(MessagePluginInterface):
"""美腿图片插件"""
@property
def name(self) -> str:
return "美腿图片"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供随机美腿图片功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "beautyleg", "download_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("Beautyleg", {}).get("command", ["美腿", "腿来"])
self.command_format = self._config.get("Beautyleg", {}).get("command-format", "美腿")
self.enable = self._config.get("Beautyleg", {}).get("enable", True)
# 检查图片文件夹是否存在
if not os.path.exists(self.image_folder):
self.LOG.warning(f"图片文件夹不存在: {self.image_folder}")
os.makedirs(self.image_folder, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="美腿图片")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.BEAUTY_LEG) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 获取随机图片
random_file_path = self._get_random_file_from_dir(self.image_folder)
if not random_file_path:
wcf.send_text(f"\n❌未找到美腿图片资源",
(roomid if roomid else sender), sender)
return True, "未找到图片资源"
# 发送图片
random_file_path = os.path.abspath(random_file_path)
self.LOG.info(f"BeautyLeg.random_file_path: {random_file_path}")
result = wcf.send_file(random_file_path, (roomid if roomid else sender))
self.LOG.info(f"发送图片结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理美腿图片请求出错: {e}")
return True, f"处理出错: {e}"
def _get_random_file_from_dir(self, directory):
"""获取随机图片路径"""
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
image_files = []
if not os.path.exists(directory):
self.LOG.error(f"Error: Directory '{directory}' does not exist.")
return None
if not os.access(directory, os.R_OK):
self.LOG.error(f"Error: No read access to directory '{directory}'.")
return None
self.LOG.info(f"Scanning directory: {directory} (including subdirectories)")
# 使用 os.walk() 递归遍历所有子目录
for root, _, files in os.walk(directory):
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() in image_extensions:
full_path = os.path.join(root, file)
image_files.append(full_path)
if not image_files:
self.LOG.error("No image files found in the directory (including subdirectories).")
return None
return random.choice(image_files)

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入VideoPlugin类
from .main import VideoPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return VideoPlugin()

View File

@@ -0,0 +1,8 @@
[Video]
enable = true
command = ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"]
command-format = """
-----Bot-----
🎬视频指令:
黑丝
"""

161
plugins/video/main.py Normal file
View File

@@ -0,0 +1,161 @@
import logging
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class VideoPlugin(MessagePluginInterface):
"""视频插件"""
@property
def name(self) -> str:
return "黑丝视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供视频点播功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
# 修改为使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"])
self.command_format = self._config.get("Video", {}).get("command-format", "黑丝")
self.enable = self._config.get("Video", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="视频插件")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.VIDEO) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
save_path = os.path.join(self.download_dir, "video.mp4")
file_abspath = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path)
if not file_abspath or not file_abspath.endswith("mp4"):
wcf.send_text(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)
return True, "视频下载失败"
# 发送视频
result = wcf.send_file(file_abspath, (roomid if roomid else sender))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
wcf.send_text(f"\n❌请求出错:{e}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
response = requests.get(url, stream=True)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None
# 以二进制写入模式保存流数据
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
return os.path.abspath(save_path)
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None

View File

@@ -20,7 +20,6 @@ from base.func_news import News
from base.func_tigerbot import TigerBot
from base.func_xinghuo_web import XinghuoWeb
from base.func_claude import Claude
from beautyleg.beauty_leg import BeautyLeg
from configuration import Config
from constants import ChatType
from dify.dify_chat import DifyChat
@@ -29,7 +28,6 @@ from game_task.game_task_encyclopedia import game_process_message, get_group_ids
from group_add.main import GroupAdd
from group_auto.group_auto_invite import get_first_group_id, process_command
from group_auto.group_member_change import GroupMemberChange
from group_video.bot_video import BotVideo
from group_video_man.bot_video_man import BotVideoMan
from message_sign.main import SignInSystem
from message_storage.message_to_db import MessageStorage
@@ -125,12 +123,8 @@ class Robot(Job):
self.signin = SignInSystem(wcf, self.gbm, self.allContacts, self.db_pool, self.redis_pool, self.message_util)
# 积分赠送功能加载
self.trade = PointTrade(wcf, self.gbm, self.db_pool)
# 获取视频模块
self.video = BotVideo(wcf, self.gbm)
# 肌肉男视频
self.videoman = BotVideoMan(wcf, self.gbm)
# 美腿模块
self.beautyleg = BeautyLeg(wcf, self.gbm)
# 加群测试
self.group_add = GroupAdd(wcf, self.gbm)
# 抖音转视频
@@ -338,23 +332,11 @@ class Robot(Job):
self.trade.handle_text(message=msg)
except Exception as e:
self.LOG.error(f"point trade error: {e}")
# 加入黑丝视频功能
try:
self.video.get_video(message=msg)
except Exception as e:
self.LOG.error(f"video get_video error: {e}")
# 加入肌肉男视频功能
try:
self.videoman.get_video(message=msg)
except Exception as e:
self.LOG.error(f"videoman get_video error: {e}")
# 美腿功能
try:
self.beautyleg.handle_message(message=msg)
except Exception as e:
self.LOG.error(f"beautyleg.handle_text error: {e}")
# 抖音组件
try:
self.douyin.handle_douyin_links(message=msg)