From 931aba4dd03c3f9fcbcb23c4e511a154c5a699ba Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 15 May 2025 17:11:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=86=E9=A2=91=E5=8A=9F=E8=83=BD=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/video/main.py | 122 +++++++++++++++++++++++++++++++----------- 1 file changed, 92 insertions(+), 30 deletions(-) diff --git a/plugins/video/main.py b/plugins/video/main.py index e2058b6..e3ca526 100644 --- a/plugins/video/main.py +++ b/plugins/video/main.py @@ -1,7 +1,10 @@ +import time + from loguru import logger import os import requests from typing import Dict, Any, List, Optional, Tuple +from pathlib import Path import cv2 from plugin_common.message_plugin_interface import MessagePluginInterface @@ -42,8 +45,8 @@ class VideoPlugin(MessagePluginInterface): def __init__(self): super().__init__() self.bot: WechatAPIClient = None - # 修改为使用插件目录下的down_load_dir文件夹 - self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") + # 使用Path对象处理路径,自动适应不同操作系统 + self.download_dir = str(Path(Path(__file__).parent, "down_load_dir")) def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" @@ -60,10 +63,20 @@ class VideoPlugin(MessagePluginInterface): self.enable = self._config.get("Video", {}).get("enable", True) # 确保下载目录存在 - if not os.path.exists(self.download_dir): - os.makedirs(self.download_dir, exist_ok=True) + try: + if not os.path.exists(self.download_dir): + self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}") + os.makedirs(self.download_dir, exist_ok=True) + + # 检查目录权限 + if not os.access(self.download_dir, os.W_OK): + self.LOG.error(f"下载目录没有写入权限: {self.download_dir}") + return False + except Exception as e: + self.LOG.error(f"创建下载目录失败: {e}") + return False - self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") + self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}") return True def start(self) -> bool: @@ -105,15 +118,20 @@ class VideoPlugin(MessagePluginInterface): try: # 下载视频 - save_path = os.path.join(self.download_dir, "video.mp4") + video_filename = f"video_{int(time.time())}.mp4" + save_path = os.path.join(self.download_dir, video_filename) + self.LOG.info(f"开始下载视频到: {save_path}") + file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path) - if not file_abspath or not file_abspath.endswith("mp4"): + if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"): + self.LOG.error(f"视频下载失败,文件路径: {file_abspath}") await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试", sender) return False, "视频下载失败" # 发送视频 + self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}") result = await self.bot.send_video_message((roomid if roomid else sender), file_abspath, first_frame) self.LOG.info(f"发送视频结果: {result}") return True, "发送成功" @@ -132,52 +150,96 @@ class VideoPlugin(MessagePluginInterface): """ try: # 发送GET请求,启用流式传输 - response = requests.get(url, stream=True) + self.LOG.info(f"开始从 {url} 下载视频") + response = requests.get(url, stream=True, timeout=30) # 检查请求是否成功 response.raise_for_status() # 如果状态码不是200,将抛出异常 # 确保保存路径的目录存在 - os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True) + save_dir = os.path.dirname(save_path) + if save_dir and not os.path.exists(save_dir): + os.makedirs(save_dir, exist_ok=True) + self.LOG.info(f"创建目录: {save_dir}") # 检查是否是视频流(可选,根据Content-Type判断) content_type = response.headers.get("Content-Type", "").lower() + self.LOG.info(f"响应Content-Type: {content_type}") + if "video" not in content_type and "application/octet-stream" not in content_type: self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 return None, None # 以二进制写入模式保存流数据 - with open(save_path, "wb") as file: - for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB - if chunk: # 过滤空块 - file.write(chunk) - self.LOG.info(f"视频已下载到: {save_path}") + try: + with open(save_path, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB + if chunk: # 过滤空块 + file.write(chunk) + self.LOG.info(f"视频已下载到: {save_path}") + except IOError as e: + self.LOG.error(f"文件写入失败: {e}") + return None, None + + # 检查文件是否存在且大小大于0 + if not os.path.exists(save_path) or os.path.getsize(save_path) == 0: + self.LOG.error(f"下载的文件不存在或为空: {save_path}") + return None, None + # 加入首帧下载 - first_frame = self._get_first_frame(save_path) + first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") + first_frame = self._get_first_frame(save_path, first_frame_path) + + if not first_frame or not os.path.exists(first_frame): + self.LOG.warning(f"无法提取首帧,使用默认图片") + # 可以在这里设置一个默认图片路径 + return os.path.abspath(save_path), first_frame + except requests.RequestException as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: - self.LOG.error(f"发生未知错误: {e}") + self.LOG.error(f"下载视频时发生未知错误: {e}") return None, None - def _get_first_frame(self, video_path, output_path="first_frame.jpg"): - # 打开视频文件 - cap = cv2.VideoCapture(video_path) - if not cap.isOpened(): - print("无法打开视频") - return + def _get_first_frame(self, video_path, output_path): + """ + 提取视频的第一帧并保存为图片 + :param video_path: 视频文件路径 + :param output_path: 输出图片路径 + :return: 输出图片的绝对路径,如果失败则返回None + """ + try: + self.LOG.info(f"开始提取视频首帧: {video_path}") + # 打开视频文件 + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + self.LOG.error(f"无法打开视频: {video_path}") + return None + + # 读取首帧 + ret, frame = cap.read() + if not ret: + self.LOG.error("无法读取视频帧") + cap.release() + return None - # 读取首帧 - ret, frame = cap.read() - if ret: # 保存首帧为图片 - cv2.imwrite(output_path, frame) - print(f"首帧已保存为 {output_path}") + try: + cv2.imwrite(output_path, frame) + self.LOG.info(f"首帧已保存为: {output_path}") + except Exception as e: + self.LOG.error(f"保存首帧图片失败: {e}") + cap.release() + return None - # 释放资源 - cap.release() - return os.path.abspath(output_path) + # 释放资源 + cap.release() + return os.path.abspath(output_path) + + except Exception as e: + self.LOG.error(f"提取视频首帧时出错: {e}") + return None