视频功能优化

This commit is contained in:
liuwei
2025-05-15 17:11:38 +08:00
parent 05bb0aff28
commit 931aba4dd0

View File

@@ -1,7 +1,10 @@
import time
from loguru import logger from loguru import logger
import os import os
import requests import requests
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import cv2 import cv2
from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.message_plugin_interface import MessagePluginInterface
@@ -42,8 +45,8 @@ class VideoPlugin(MessagePluginInterface):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.bot: WechatAPIClient = None self.bot: WechatAPIClient = None
# 修改为使用插件目录下的down_load_dir文件夹 # 使用Path对象处理路径自动适应不同操作系统
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
def initialize(self, context: Dict[str, Any]) -> bool: def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件""" """初始化插件"""
@@ -60,10 +63,20 @@ class VideoPlugin(MessagePluginInterface):
self.enable = self._config.get("Video", {}).get("enable", True) self.enable = self._config.get("Video", {}).get("enable", True)
# 确保下载目录存在 # 确保下载目录存在
if not os.path.exists(self.download_dir): try:
os.makedirs(self.download_dir, exist_ok=True) if not os.path.exists(self.download_dir):
self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}")
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") # 检查目录权限
if not os.access(self.download_dir, os.W_OK):
self.LOG.error(f"下载目录没有写入权限: {self.download_dir}")
return False
except Exception as e:
self.LOG.error(f"创建下载目录失败: {e}")
return False
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}")
return True return True
def start(self) -> bool: def start(self) -> bool:
@@ -105,15 +118,20 @@ class VideoPlugin(MessagePluginInterface):
try: try:
# 下载视频 # 下载视频
save_path = os.path.join(self.download_dir, "video.mp4") video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}")
file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path) file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path)
if not file_abspath or not file_abspath.endswith("mp4"): if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"):
self.LOG.error(f"视频下载失败,文件路径: {file_abspath}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试", await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
sender) sender)
return False, "视频下载失败" return False, "视频下载失败"
# 发送视频 # 发送视频
self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}")
result = await self.bot.send_video_message((roomid if roomid else sender), file_abspath, first_frame) result = await self.bot.send_video_message((roomid if roomid else sender), file_abspath, first_frame)
self.LOG.info(f"发送视频结果: {result}") self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功" return True, "发送成功"
@@ -132,52 +150,96 @@ class VideoPlugin(MessagePluginInterface):
""" """
try: try:
# 发送GET请求启用流式传输 # 发送GET请求启用流式传输
response = requests.get(url, stream=True) self.LOG.info(f"开始从 {url} 下载视频")
response = requests.get(url, stream=True, timeout=30)
# 检查请求是否成功 # 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常 response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在 # 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) save_dir = os.path.dirname(save_path)
if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
self.LOG.info(f"创建目录: {save_dir}")
# 检查是否是视频流可选根据Content-Type判断 # 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower() content_type = response.headers.get("Content-Type", "").lower()
self.LOG.info(f"响应Content-Type: {content_type}")
if "video" not in content_type and "application/octet-stream" not in content_type: if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}") self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None, None return None, None
# 以二进制写入模式保存流数据 # 以二进制写入模式保存流数据
with open(save_path, "wb") as file: try:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB with open(save_path, "wb") as file:
if chunk: # 过滤空块 for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
file.write(chunk) if chunk: # 过滤空块
self.LOG.info(f"视频已下载到: {save_path}") file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
return None, None
# 检查文件是否存在且大小大于0
if not os.path.exists(save_path) or os.path.getsize(save_path) == 0:
self.LOG.error(f"下载的文件不存在或为空: {save_path}")
return None, None
# 加入首帧下载 # 加入首帧下载
first_frame = self._get_first_frame(save_path) first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = self._get_first_frame(save_path, first_frame_path)
if not first_frame or not os.path.exists(first_frame):
self.LOG.warning(f"无法提取首帧,使用默认图片")
# 可以在这里设置一个默认图片路径
return os.path.abspath(save_path), first_frame return os.path.abspath(save_path), first_frame
except requests.RequestException as e: except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}") self.LOG.error(f"请求失败: {e}")
except IOError as e: except IOError as e:
self.LOG.error(f"文件写入失败: {e}") self.LOG.error(f"文件写入失败: {e}")
except Exception as e: except Exception as e:
self.LOG.error(f"发生未知错误: {e}") self.LOG.error(f"下载视频时发生未知错误: {e}")
return None, None return None, None
def _get_first_frame(self, video_path, output_path="first_frame.jpg"): def _get_first_frame(self, video_path, output_path):
# 打开视频文件 """
cap = cv2.VideoCapture(video_path) 提取视频的第一帧并保存为图片
if not cap.isOpened(): :param video_path: 视频文件路径
print("无法打开视频") :param output_path: 输出图片路径
return :return: 输出图片的绝对路径如果失败则返回None
"""
try:
self.LOG.info(f"开始提取视频首帧: {video_path}")
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧
ret, frame = cap.read()
if not ret:
self.LOG.error("无法读取视频帧")
cap.release()
return None
# 读取首帧
ret, frame = cap.read()
if ret:
# 保存首帧为图片 # 保存首帧为图片
cv2.imwrite(output_path, frame) try:
print(f"首帧已保存为 {output_path}") cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源 # 释放资源
cap.release() cap.release()
return os.path.abspath(output_path) return os.path.abspath(output_path)
except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}")
return None