Files
abot/plugins/video/main.py
2025-05-21 09:03:41 +08:00

275 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from loguru import logger
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import cv2
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class VideoPlugin(MessagePluginInterface):
"""视频插件"""
@property
def name(self) -> str:
return "黑丝视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供视频点播功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
# 使用Path对象处理路径自动适应不同操作系统
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.gbm = context.get("gbm")
self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"])
self.command_format = self._config.get("Video", {}).get("command-format", "黑丝")
self.enable = self._config.get("Video", {}).get("enable", True)
# 确保下载目录存在
try:
if not os.path.exists(self.download_dir):
self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}")
os.makedirs(self.download_dir, exist_ok=True)
# 检查目录权限
if not os.access(self.download_dir, os.W_OK):
self.LOG.error(f"下载目录没有写入权限: {self.download_dir}")
return False
except Exception as e:
self.LOG.error(f"创建下载目录失败: {e}")
return False
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="视频插件")
@plugin_points_cost(2, "视频插件消耗积分", Feature.VIDEO)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.VIDEO) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}")
file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path)
if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"):
self.LOG.error(f"视频下载失败,文件路径: {file_abspath}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
sender)
return False, "视频下载失败"
# 发送视频
self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}")
result = await self.bot.send_video_message((roomid if roomid else sender), Path(file_abspath),
Path(first_frame))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌请求出错:{e}",
sender)
return False, f"处理出错: {e}"
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
self.LOG.info(f"开始从 {url} 下载视频")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": "https://api.guiguiya.com/"
}
response = requests.get(url, stream=True, timeout=30, headers=headers, allow_redirects=True)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
save_dir = os.path.dirname(save_path)
if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
self.LOG.info(f"创建目录: {save_dir}")
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
self.LOG.info(f"响应Content-Type: {content_type}")
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None, None
# 以二进制写入模式保存流数据
try:
total_size = int(response.headers.get('Content-Length', 0))
self.LOG.info(f"预期下载大小: {total_size} 字节")
downloaded_size = 0
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
downloaded_size += len(chunk)
self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节")
# 验证下载是否完整
if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90%
self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节")
return None, None
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
return None, None
# 检查文件是否存在且大小合理
if not os.path.exists(save_path):
self.LOG.error(f"下载的文件不存在: {save_path}")
return None, None
file_size = os.path.getsize(save_path)
if file_size < 10000: # 小于10KB的文件可能不是有效视频
self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节")
# 尝试读取文件内容以诊断问题
try:
with open(save_path, 'rb') as f:
content_preview = f.read(200)
self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}")
except Exception as e:
self.LOG.error(f"读取文件内容失败: {e}")
return None, None
# 加入首帧下载
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = self._get_first_frame(save_path, first_frame_path)
if not first_frame or not os.path.exists(first_frame):
self.LOG.warning(f"无法提取首帧,使用默认图片")
# 可以在这里设置一个默认图片路径
return os.path.abspath(save_path), first_frame
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"下载视频时发生未知错误: {e}")
return None, None
def _get_first_frame(self, video_path, output_path):
"""
提取视频的第一帧并保存为图片
:param video_path: 视频文件路径
:param output_path: 输出图片路径
:return: 输出图片的绝对路径如果失败则返回None
"""
try:
self.LOG.info(f"开始提取视频首帧: {video_path}")
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧
ret, frame = cap.read()
if not ret:
self.LOG.error("无法读取视频帧")
cap.release()
return None
# 保存首帧为图片
try:
cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release()
return os.path.abspath(output_path)
except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}")
return None