import time from loguru import logger import os import requests from typing import Dict, Any, List, Optional, Tuple from pathlib import Path import cv2 from plugin_common.message_plugin_interface import MessagePluginInterface from plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from wechat_ipad import WechatAPIClient class VideoPlugin(MessagePluginInterface): """视频插件""" @property def name(self) -> str: return "黑丝视频" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供视频点播功能" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands def __init__(self): super().__init__() self.bot: WechatAPIClient = None # 使用Path对象处理路径,自动适应不同操作系统 self.download_dir = str(Path(Path(__file__).parent, "down_load_dir")) def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.gbm = context.get("gbm") self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"]) self.command_format = self._config.get("Video", {}).get("command-format", "黑丝") self.enable = self._config.get("Video", {}).get("enable", True) # 确保下载目录存在 try: if not os.path.exists(self.download_dir): self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}") os.makedirs(self.download_dir, exist_ok=True) # 检查目录权限 if not os.access(self.download_dir, os.W_OK): self.LOG.error(f"下载目录没有写入权限: {self.download_dir}") return False except Exception as e: self.LOG.error(f"创建下载目录失败: {e}") return False self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="视频插件") @plugin_points_cost(2, "视频插件消耗积分", Feature.VIDEO) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") self.bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, Feature.VIDEO) == PermissionStatus.DISABLED: return False, "没有权限" try: # 下载视频 video_filename = f"video_{int(time.time())}.mp4" save_path = os.path.join(self.download_dir, video_filename) self.LOG.info(f"开始下载视频到: {save_path}") file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path) if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"): self.LOG.error(f"视频下载失败,文件路径: {file_abspath}") await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试", sender) return False, "视频下载失败" # 发送视频 self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}") result = await self.bot.send_video_message((roomid if roomid else sender), file_abspath, first_frame) self.LOG.info(f"发送视频结果: {result}") return True, "发送成功" except Exception as e: self.LOG.error(f"处理视频请求出错: {e}") await self.bot.send_text_message((roomid if roomid else sender), f"\n❌请求出错:{e}", sender) return False, f"处理出错: {e}" def _download_stream(self, url, save_path): """ 从指定URL读取视频流并保存到本地 :param url: 视频流的URL :param save_path: 本地保存路径(包含文件名,例如 "video.mp4") """ try: # 发送GET请求,启用流式传输 self.LOG.info(f"开始从 {url} 下载视频") headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Referer": "https://api.guiguiya.com/" } response = requests.get(url, stream=True, timeout=30, headers=headers, allow_redirects=True) # 检查请求是否成功 response.raise_for_status() # 如果状态码不是200,将抛出异常 # 确保保存路径的目录存在 save_dir = os.path.dirname(save_path) if save_dir and not os.path.exists(save_dir): os.makedirs(save_dir, exist_ok=True) self.LOG.info(f"创建目录: {save_dir}") # 检查是否是视频流(可选,根据Content-Type判断) content_type = response.headers.get("Content-Type", "").lower() self.LOG.info(f"响应Content-Type: {content_type}") if "video" not in content_type and "application/octet-stream" not in content_type: self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}") self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看 return None, None # 以二进制写入模式保存流数据 try: total_size = int(response.headers.get('Content-Length', 0)) self.LOG.info(f"预期下载大小: {total_size} 字节") downloaded_size = 0 with open(save_path, "wb") as file: for chunk in response.iter_content(chunk_size=1024): # 分块读取,每块1KB if chunk: # 过滤空块 file.write(chunk) downloaded_size += len(chunk) self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节") # 验证下载是否完整 if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90% self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节") return None, None except IOError as e: self.LOG.error(f"文件写入失败: {e}") return None, None # 检查文件是否存在且大小合理 if not os.path.exists(save_path): self.LOG.error(f"下载的文件不存在: {save_path}") return None, None file_size = os.path.getsize(save_path) if file_size < 10000: # 小于10KB的文件可能不是有效视频 self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节") # 尝试读取文件内容以诊断问题 try: with open(save_path, 'rb') as f: content_preview = f.read(200) self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}") except Exception as e: self.LOG.error(f"读取文件内容失败: {e}") return None, None # 加入首帧下载 first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") first_frame = self._get_first_frame(save_path, first_frame_path) if not first_frame or not os.path.exists(first_frame): self.LOG.warning(f"无法提取首帧,使用默认图片") # 可以在这里设置一个默认图片路径 return os.path.abspath(save_path), first_frame except requests.RequestException as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: self.LOG.error(f"下载视频时发生未知错误: {e}") return None, None def _get_first_frame(self, video_path, output_path): """ 提取视频的第一帧并保存为图片 :param video_path: 视频文件路径 :param output_path: 输出图片路径 :return: 输出图片的绝对路径,如果失败则返回None """ try: self.LOG.info(f"开始提取视频首帧: {video_path}") # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): self.LOG.error(f"无法打开视频: {video_path}") return None # 读取首帧 ret, frame = cap.read() if not ret: self.LOG.error("无法读取视频帧") cap.release() return None # 保存首帧为图片 try: cv2.imwrite(output_path, frame) self.LOG.info(f"首帧已保存为: {output_path}") except Exception as e: self.LOG.error(f"保存首帧图片失败: {e}") cap.release() return None # 释放资源 cap.release() return os.path.abspath(output_path) except Exception as e: self.LOG.error(f"提取视频首帧时出错: {e}") return None