Files
abot/plugins/video_man/main.py
2026-01-16 13:29:52 +08:00

245 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
import asyncio
import aiofiles
from typing import Dict, Any, List, Optional, Tuple
import aiohttp
import cv2
import requests
from loguru import logger
from pathlib import Path
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.points_decorator import plugin_points_cost
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from wechat_ipad import WechatAPIClient
class VideoManPlugin(MessagePluginInterface):
"""猛男视频插件"""
# 功能权限常量
FEATURE_KEY = "VIDEO_MAN"
FEATURE_DESCRIPTION = "💪 肌肉视频 [猛男, 肌肉, 帅哥]"
@property
def name(self) -> str:
return "猛男视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供猛男视频点播功能"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
# 使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
# 注册功能权限
self.feature = self.register_feature()
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.gbm = context.get("gbm")
self._commands = self._config.get("VideoMan", {}).get("command", ["猛男", "肌肉", "帅哥"])
self.command_format = self._config.get("VideoMan", {}).get("command-format", "猛男")
self.enable = self._config.get("VideoMan", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="猛男视频")
@plugin_points_cost(2, "猛男视频消耗积分", FEATURE_KEY)
@group_feature_rate_limit(max_per_minute=3, feature_key=FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
file_abspath, first_frame = await self._download_video("https://api.52vmy.cn/api/video/boy?type=json")
# FIXME 需要换成web容器地址。否则无法获取。
if not file_abspath:
await bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
sender)
return False, "视频下载失败"
# 发送视频
result = await bot.send_video_message((roomid if roomid else sender), Path(file_abspath),
Path(first_frame))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
return False, f"处理出错: {e}"
async def _download_video(self, api_url):
"""
从API异步获取视频URL并下载到本地
:param api_url: API的URL
:return: 下载后的视频文件绝对路径和首帧路径,或 None
"""
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
save_path = os.path.join(self.download_dir, "video.mp4")
try:
# 使用 aiohttp 异步获取视频URL
async with aiohttp.ClientSession() as session:
async with session.get(api_url, timeout=20) as response:
if response.status != 200:
self.LOG.error(f"API请求失败HTTP状态码: {response.status}")
return None
data = await response.json()
video_url = data.get("data").get("video")
if not video_url:
self.LOG.error("API响应中没有找到视频URL")
return None
# 异步下载视频
async with session.get(video_url, timeout=20) as video_response:
if video_response.status != 200:
self.LOG.error(f"无法下载视频HTTP状态码: {video_response.status}")
return None
# 使用 aiofiles 异步保存视频
async with aiofiles.open(save_path, "wb") as file:
async for chunk in video_response.content.iter_chunked(1024):
if chunk: # 过滤空块
await file.write(chunk)
abs_path = os.path.abspath(save_path)
self.LOG.info(f"视频已下载至: {abs_path}")
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = await self._get_first_frame(save_path, first_frame_path)
return abs_path, first_frame
except aiohttp.ClientError as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None
async def _get_first_frame(self, video_path, output_path):
"""
提取视频的第一帧并保存为图片(异步版本)
:param video_path: 视频文件路径
:param output_path: 输出图片路径
:return: 输出图片的绝对路径如果失败则返回None
"""
try:
self.LOG.info(f"开始提取视频首帧: {video_path}")
# 使用 asyncio.to_thread 包装 OpenCV 操作
def extract_frame():
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧
ret, frame = cap.read()
if not ret:
self.LOG.error("无法读取视频帧")
cap.release()
return None
# 保存首帧为图片
try:
cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release()
return os.path.abspath(output_path)
# 在线程池中执行 OpenCV 操作
result = await asyncio.to_thread(extract_frame)
return result
except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}")
return None