猛男视频迁移成插件。删除原有逻辑

This commit is contained in:
liuwei
2025-03-19 16:30:33 +08:00
parent 15fc732acb
commit bb98d6b802
3 changed files with 188 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
# 从当前包的main模块导入VideoManPlugin类
from .main import VideoManPlugin
# 提供get_plugin函数返回插件实例
def get_plugin():
"""获取插件实例"""
return VideoManPlugin()

View File

@@ -0,0 +1,9 @@
[VideoMan]
enable = true
command = ["猛男", "肌肉", "帅哥"]
command-format = """
🎬猛男视频指令:
猛男
肌肉
帅哥
"""

172
plugins/video_man/main.py Normal file
View File

@@ -0,0 +1,172 @@
import logging
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class VideoManPlugin(MessagePluginInterface):
"""猛男视频插件"""
@property
def name(self) -> str:
return "猛男视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供猛男视频点播功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
# 使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"])
self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男")
self.enable = self._config.get("VideoMan", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="猛男视频")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.VIDEO_MAN) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
file_abspath = self._download_video("https://api.guiguiya.com/api/video/fuji?type=json")
if not file_abspath:
wcf.send_text(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)
return True, "视频下载失败"
# 发送视频
result = wcf.send_file(file_abspath, (roomid if roomid else sender))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
wcf.send_text(f"\n❌请求出错:{e}",
(roomid if roomid else sender), sender)
return True, f"处理出错: {e}"
def _download_video(self, api_url):
"""
从API获取视频URL并下载到本地
:param api_url: API的URL
:return: 下载后的视频文件绝对路径
"""
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
save_path = os.path.join(self.download_dir, "video.mp4")
try:
# 获取视频URL
response = requests.get(api_url)
if response.status_code != 200:
self.LOG.error(f"API请求失败HTTP状态码: {response.status_code}")
return None
data = response.json()
video_url = data.get("url")
if not video_url:
self.LOG.error("API响应中没有找到视频URL")
return None
# 下载视频
video_response = requests.get(video_url, stream=True)
if video_response.status_code != 200:
self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status_code}")
return None
# 保存视频
with open(save_path, "wb") as file:
for chunk in video_response.iter_content(chunk_size=1024):
if chunk: # 过滤空块
file.write(chunk)
abs_path = os.path.abspath(save_path)
self.LOG.info(f"视频已下载至: {abs_path}")
return abs_path
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None