301 lines
13 KiB
Python
301 lines
13 KiB
Python
import time
|
||
import os
|
||
import aiohttp
|
||
import aiofiles
|
||
import asyncio
|
||
from loguru import logger
|
||
import requests
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
from pathlib import Path
|
||
|
||
import cv2
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class VideoPlugin(MessagePluginInterface):
|
||
"""视频插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "VIDEO"
|
||
FEATURE_DESCRIPTION = "🎥 黑丝视频功能 [黑丝视频, 黑丝, 来个黑丝, 搞个黑丝]"
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "黑丝视频"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供视频点播功能"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.bot: WechatAPIClient = None
|
||
self.feature = self.register_feature()
|
||
# 使用Path对象处理路径,自动适应不同操作系统
|
||
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.gbm = context.get("gbm")
|
||
|
||
self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"])
|
||
self.command_format = self._config.get("Video", {}).get("command-format", "黑丝")
|
||
self.enable = self._config.get("Video", {}).get("enable", True)
|
||
|
||
# 确保下载目录存在
|
||
try:
|
||
if not os.path.exists(self.download_dir):
|
||
self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}")
|
||
os.makedirs(self.download_dir, exist_ok=True)
|
||
|
||
# 检查目录权限
|
||
if not os.access(self.download_dir, os.W_OK):
|
||
self.LOG.error(f"下载目录没有写入权限: {self.download_dir}")
|
||
return False
|
||
except Exception as e:
|
||
self.LOG.error(f"创建下载目录失败: {e}")
|
||
return False
|
||
|
||
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="视频插件")
|
||
@plugin_points_cost(8, "视频插件消耗积分", FEATURE_KEY)
|
||
@group_feature_rate_limit(max_per_minute=3, feature_key=FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
self.bot: WechatAPIClient = message.get("bot")
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
try:
|
||
# 下载视频
|
||
video_filename = f"video_{int(time.time())}.mp4"
|
||
save_path = os.path.join(self.download_dir, video_filename)
|
||
self.LOG.info(f"开始下载视频到: {save_path}")
|
||
|
||
file_abspath, first_frame = await self._download_stream("http://api.yujn.cn/api/heisis.php?type=video", save_path)
|
||
|
||
if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"):
|
||
self.LOG.error(f"视频下载失败,文件路径: {file_abspath}")
|
||
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
|
||
sender)
|
||
return False, "视频下载失败"
|
||
|
||
# 发送视频
|
||
self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}")
|
||
result = await self.bot.send_video_message((roomid if roomid else sender), Path(file_abspath),
|
||
Path(first_frame))
|
||
self.LOG.info(f"发送视频结果: {result}")
|
||
return True, "发送成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理视频请求出错: {e}")
|
||
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌请求出错:{e}",
|
||
sender)
|
||
return False, f"处理出错: {e}"
|
||
|
||
async def _download_stream(self, url, save_path):
|
||
"""
|
||
从指定URL读取视频流并保存到本地(异步版本)
|
||
:param url: 视频流的URL
|
||
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4")
|
||
"""
|
||
try:
|
||
# 发送GET请求,启用流式传输
|
||
self.LOG.info(f"开始从 {url} 下载视频")
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
||
"Accept": "*/*",
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"Connection": "keep-alive",
|
||
"Referer": "https://api.guiguiya.com/"
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, headers=headers, allow_redirects=True) as response:
|
||
# 检查请求是否成功
|
||
response.raise_for_status()
|
||
|
||
# 确保保存路径的目录存在
|
||
save_dir = os.path.dirname(save_path)
|
||
if save_dir and not os.path.exists(save_dir):
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
self.LOG.info(f"创建目录: {save_dir}")
|
||
|
||
# 检查是否是视频流(可选,根据Content-Type判断)
|
||
content_type = response.headers.get("Content-Type", "").lower()
|
||
self.LOG.info(f"响应Content-Type: {content_type}")
|
||
|
||
if "video" not in content_type and "application/octet-stream" not in content_type:
|
||
self.LOG.warning(f"警告: 返回的可能不是视频流,Content-Type: {content_type}")
|
||
text = await response.text()
|
||
self.LOG.warning(f"响应内容预览: {text[:100]}") # 打印前100字符查看
|
||
return None, None
|
||
|
||
# 以二进制写入模式保存流数据
|
||
try:
|
||
total_size = int(response.headers.get('Content-Length', 0))
|
||
self.LOG.info(f"预期下载大小: {total_size} 字节")
|
||
downloaded_size = 0
|
||
|
||
async with aiofiles.open(save_path, "wb") as file:
|
||
async for chunk in response.content.iter_chunked(1024): # 分块读取,每块1KB
|
||
if chunk: # 过滤空块
|
||
await file.write(chunk)
|
||
downloaded_size += len(chunk)
|
||
|
||
self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节")
|
||
|
||
# 验证下载是否完整
|
||
if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90%
|
||
self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节")
|
||
return None, None
|
||
except IOError as e:
|
||
self.LOG.error(f"文件写入失败: {e}")
|
||
return None, None
|
||
|
||
# 检查文件是否存在且大小合理
|
||
if not os.path.exists(save_path):
|
||
self.LOG.error(f"下载的文件不存在: {save_path}")
|
||
return None, None
|
||
|
||
file_size = os.path.getsize(save_path)
|
||
if file_size < 10000: # 小于10KB的文件可能不是有效视频
|
||
self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节")
|
||
# 尝试读取文件内容以诊断问题
|
||
try:
|
||
async with aiofiles.open(save_path, 'rb') as f:
|
||
content_preview = await f.read(200)
|
||
self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}")
|
||
except Exception as e:
|
||
self.LOG.error(f"读取文件内容失败: {e}")
|
||
return None, None
|
||
|
||
# 加入首帧下载
|
||
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
|
||
first_frame = await self._get_first_frame(save_path, first_frame_path)
|
||
|
||
if not first_frame or not os.path.exists(first_frame):
|
||
self.LOG.warning(f"无法提取首帧,使用默认图片")
|
||
# 可以在这里设置一个默认图片路径
|
||
|
||
return os.path.abspath(save_path), first_frame
|
||
|
||
except aiohttp.ClientError as e:
|
||
self.LOG.error(f"请求失败: {e}")
|
||
except IOError as e:
|
||
self.LOG.error(f"文件写入失败: {e}")
|
||
except Exception as e:
|
||
self.LOG.error(f"下载视频时发生未知错误: {e}")
|
||
return None, None
|
||
|
||
async def _get_first_frame(self, video_path, output_path):
|
||
"""
|
||
提取视频的第一帧并保存为图片(异步版本)
|
||
:param video_path: 视频文件路径
|
||
:param output_path: 输出图片路径
|
||
:return: 输出图片的绝对路径,如果失败则返回None
|
||
"""
|
||
try:
|
||
self.LOG.info(f"开始提取视频首帧: {video_path}")
|
||
|
||
# 使用 asyncio.to_thread 包装 OpenCV 操作
|
||
def extract_frame():
|
||
# 打开视频文件
|
||
cap = cv2.VideoCapture(video_path)
|
||
if not cap.isOpened():
|
||
self.LOG.error(f"无法打开视频: {video_path}")
|
||
return None
|
||
|
||
# 读取首帧
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
self.LOG.error("无法读取视频帧")
|
||
cap.release()
|
||
return None
|
||
|
||
# 保存首帧为图片
|
||
try:
|
||
cv2.imwrite(output_path, frame)
|
||
self.LOG.info(f"首帧已保存为: {output_path}")
|
||
except Exception as e:
|
||
self.LOG.error(f"保存首帧图片失败: {e}")
|
||
cap.release()
|
||
return None
|
||
|
||
# 释放资源
|
||
cap.release()
|
||
return os.path.abspath(output_path)
|
||
|
||
# 在线程池中执行 OpenCV 操作
|
||
result = await asyncio.to_thread(extract_frame)
|
||
return result
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"提取视频首帧时出错: {e}")
|
||
return None
|