import os import time import asyncio import aiofiles from typing import Dict, Any, List, Optional, Tuple import aiohttp import cv2 import requests from loguru import logger from pathlib import Path from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.points_decorator import plugin_points_cost from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient class VideoManPlugin(MessagePluginInterface): """猛男视频插件""" # 功能权限常量 FEATURE_KEY = "VIDEO_MAN" FEATURE_DESCRIPTION = "💪 肌肉视频 [猛男, 肌肉, 帅哥]" @property def name(self) -> str: return "猛男视频" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供猛男视频点播功能" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 使用插件目录下的down_load_dir文件夹 self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir") # 注册功能权限 self.feature = self.register_feature() def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") self.gbm = context.get("gbm") self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"]) self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男") self.enable = self._config.get("VideoMan", {}).get("enable", True) # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() command = content.split(" ")[0] return command in self._commands @plugin_stats_decorator(plugin_name="猛男视频") @plugin_points_cost(2, "猛男视频消耗积分", FEATURE_KEY) @group_feature_rate_limit(max_per_minute=3, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" try: # 下载视频 file_abspath, first_frame = await self._download_video("https://api.52vmy.cn/api/video/boy?type=json") # FIXME 需要换成web容器地址。否则无法获取。 if not file_abspath: await bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试", sender) return False, "视频下载失败" # 发送视频 result = await bot.send_video_message((roomid if roomid else sender), Path(file_abspath), Path(first_frame)) self.LOG.info(f"发送视频结果: {result}") return True, "发送成功" except Exception as e: self.LOG.error(f"处理视频请求出错: {e}") return False, f"处理出错: {e}" async def _download_video(self, api_url): """ 从API异步获取视频URL并下载到本地 :param api_url: API的URL :return: 下载后的视频文件绝对路径和首帧路径,或 None """ # 确保下载目录存在 if not os.path.exists(self.download_dir): os.makedirs(self.download_dir, exist_ok=True) save_path = os.path.join(self.download_dir, "video.mp4") try: # 使用 aiohttp 异步获取视频URL async with aiohttp.ClientSession() as session: async with session.get(api_url, timeout=20) as response: if response.status != 200: self.LOG.error(f"API请求失败,HTTP状态码: {response.status}") return None data = await response.json() video_url = data.get("data").get("video") if not video_url: self.LOG.error("API响应中没有找到视频URL") return None # 异步下载视频 async with session.get(video_url, timeout=20) as video_response: if video_response.status != 200: self.LOG.error(f"无法下载视频,HTTP状态码: {video_response.status}") return None # 使用 aiofiles 异步保存视频 async with aiofiles.open(save_path, "wb") as file: async for chunk in video_response.content.iter_chunked(1024): if chunk: # 过滤空块 await file.write(chunk) abs_path = os.path.abspath(save_path) self.LOG.info(f"视频已下载至: {abs_path}") first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg") first_frame = await self._get_first_frame(save_path, first_frame_path) return abs_path, first_frame except aiohttp.ClientError as e: self.LOG.error(f"请求失败: {e}") except IOError as e: self.LOG.error(f"文件写入失败: {e}") except Exception as e: self.LOG.error(f"发生未知错误: {e}") return None async def _get_first_frame(self, video_path, output_path): """ 提取视频的第一帧并保存为图片(异步版本) :param video_path: 视频文件路径 :param output_path: 输出图片路径 :return: 输出图片的绝对路径,如果失败则返回None """ try: self.LOG.info(f"开始提取视频首帧: {video_path}") # 使用 asyncio.to_thread 包装 OpenCV 操作 def extract_frame(): # 打开视频文件 cap = cv2.VideoCapture(video_path) if not cap.isOpened(): self.LOG.error(f"无法打开视频: {video_path}") return None # 读取首帧 ret, frame = cap.read() if not ret: self.LOG.error("无法读取视频帧") cap.release() return None # 保存首帧为图片 try: cv2.imwrite(output_path, frame) self.LOG.info(f"首帧已保存为: {output_path}") except Exception as e: self.LOG.error(f"保存首帧图片失败: {e}") cap.release() return None # 释放资源 cap.release() return os.path.abspath(output_path) # 在线程池中执行 OpenCV 操作 result = await asyncio.to_thread(extract_frame) return result except Exception as e: self.LOG.error(f"提取视频首帧时出错: {e}") return None