import os import time from typing import Dict, Any, List, Optional, Tuple import aiohttp import cv2 import requests from loguru import logger from pathlib import Path from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient class VideoManPlugin(MessagePluginInterface): """猛男视频插件""" # 功能权限常量 FEATURE_KEY = "VIDEO_MAN" FEATURE_DESCRIPTION = "💪 肌肉视频 [猛男, 肌肉, 帅哥]" @property def name(self) -> str: return "猛男视频" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供猛男视频点播功能" @property def author(self) -> str: return "Trae AI" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 使用插件目录下的down_load_dir文件夹 self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.info(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.gbm = context.get("gbm") self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"]) self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男") self.enable = self._config.get("VideoMan", {}).get("enable", True) # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.info(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="猛男视频") @plugin_points_cost(2, "猛男视频消耗积分", FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" try: # 下载视频 file_abspath, first_frame = await self._download_video("https://api.52vmy.cn/api/video/boy?type=json") # FIXME 需要换成web容器地址。否则无法获取。 if not file_abspath: await bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试", sender) return False, "视频下载失败" # 发送视频 result = await bot.send_video_message((roomid if roomid else sender), Path(file_abspath), Path(first_frame)) self.LOG.info(f"发送视频结果: {result}") return True, "发送成功" except Exception as e: self.LOG.error(f"处理视频请求出错: {e}") return False, f"处理出错: {e}" async def _download_video(self, api_url): """ 从API异步获取视频URL并下载到本地 :param api_url: API的URL :return: 下载后的视频文件绝对路径和首帧路径,或 None """ # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) save_path = os.path.join(self.download_dir, "video.mp4") try: # 使用 aiohttp 异步获取视频URL async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=20) as response: if response.status != 200: self.LOG.error(f"API请求失败,HTTP状态码: {response.status}") return None data = await response.json() video_url = data.get("data").get("video") if not video_url: self.LOG.error("API响应中没有找到视频URL") return None # 异步下载视频 async with session.get(video_url, timeout=20) as video_response: if video_response.status != 200: self.LOG.error(f"无法下载视频,HTTP状态码: {video_response.status}") return None # 保存视频 with open(save_path, "wb") as file: async for chunk in video_response.content.iter_chunked(1024): if chunk: # 过滤空块 file.write(chunk) abs_path = os.path.abspath(save_path) self.LOG.info(f"视频已下载至: {abs_path}") first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") first_frame = self._get_first_frame(save_path, first_frame_path) return abs_path, first_frame except aiohttp.ClientError as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: self.LOG.error(f"发生未知错误: {e}") return None def _get_first_frame(self, video_path, output_path): """ 提取视频的第一帧并保存为图片 :param video_path: 视频文件路径 :param output_path: 输出图片路径 :return: 输出图片的绝对路径,如果失败则返回None """ try: self.LOG.info(f"开始提取视频首帧: {video_path}") # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): self.LOG.error(f"无法打开视频: {video_path}") return None # 读取首帧 ret, frame = cap.read() if not ret: self.LOG.error("无法读取视频帧") cap.release() return None # 保存首帧为图片 try: cv2.imwrite(output_path, frame) self.LOG.info(f"首帧已保存为: {output_path}") except Exception as e: self.LOG.error(f"保存首帧图片失败: {e}") cap.release() return None # 释放资源 cap.release() return os.path.abspath(output_path) except Exception as e: self.LOG.error(f"提取视频首帧时出错: {e}") return None