Files
abot/plugins/video_man/main.py

174 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
class VideoManPlugin(MessagePluginInterface):
"""猛男视频插件"""
@property
def name(self) -> str:
return "猛男视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供猛男视频点播功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
# 使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util: MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"])
self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男")
self.enable = self._config.get("VideoMan", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="猛男视频")
@plugin_points_cost(2, "猛男视频消耗积分", Feature.VIDEO_MAN)
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.VIDEO_MAN) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
file_abspath = self._download_video("https://api.guiguiya.com/api/video/fuji?type=json")
if not file_abspath:
self.message_util.send_text(f"\n❌视频下载失败,请稍后再试",
(roomid if roomid else sender), sender)
return False, "视频下载失败"
# 发送视频
result = wcf.send_file(file_abspath, (roomid if roomid else sender))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
return False, f"处理出错: {e}"
def _download_video(self, api_url):
"""
从API获取视频URL并下载到本地
:param api_url: API的URL
:return: 下载后的视频文件绝对路径
"""
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
save_path = os.path.join(self.download_dir, "video.mp4")
try:
# 获取视频URL
response = requests.get(api_url)
if response.status_code != 200:
self.LOG.error(f"API请求失败HTTP状态码: {response.status_code}")
return None
data = response.json()
video_url = data.get("url")
if not video_url:
self.LOG.error("API响应中没有找到视频URL")
return None
# 下载视频
video_response = requests.get(video_url, stream=True)
if video_response.status_code != 200:
self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status_code}")
return None
# 保存视频
with open(save_path, "wb") as file:
for chunk in video_response.iter_content(chunk_size=1024):
if chunk: # 过滤空块
file.write(chunk)
abs_path = os.path.abspath(save_path)
self.LOG.info(f"视频已下载至: {abs_path}")
return abs_path
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None