Files
abot/plugins/video/main.py
2026-01-16 13:34:37 +08:00

301 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import os
import aiohttp
import aiofiles
import asyncio
from loguru import logger
import requests
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
import cv2
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.decorator.rate_limit_decorator import group_feature_rate_limit
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class VideoPlugin(MessagePluginInterface):
"""视频插件"""
# 功能权限常量
FEATURE_KEY = "VIDEO"
FEATURE_DESCRIPTION = "🎥 黑丝视频功能 [黑丝视频, 黑丝, 来个黑丝, 搞个黑丝]"
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
@property
def name(self) -> str:
return "黑丝视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供视频点播功能"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
self.feature = self.register_feature()
# 使用Path对象处理路径自动适应不同操作系统
self.download_dir = str(Path(Path(__file__).parent, "down_load_dir"))
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.gbm = context.get("gbm")
self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"])
self.command_format = self._config.get("Video", {}).get("command-format", "黑丝")
self.enable = self._config.get("Video", {}).get("enable", True)
# 确保下载目录存在
try:
if not os.path.exists(self.download_dir):
self.LOG.warning(f"下载目录不存在,正在创建: {self.download_dir}")
os.makedirs(self.download_dir, exist_ok=True)
# 检查目录权限
if not os.access(self.download_dir, os.W_OK):
self.LOG.error(f"下载目录没有写入权限: {self.download_dir}")
return False
except Exception as e:
self.LOG.error(f"创建下载目录失败: {e}")
return False
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands},下载目录:{self.download_dir}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="视频插件")
@plugin_points_cost(8, "视频插件消耗积分", FEATURE_KEY)
@group_feature_rate_limit(max_per_minute=3, feature_key=FEATURE_KEY)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
video_filename = f"video_{int(time.time())}.mp4"
save_path = os.path.join(self.download_dir, video_filename)
self.LOG.info(f"开始下载视频到: {save_path}")
file_abspath, first_frame = await self._download_stream("https://api.jkyai.top/API/jxhssp.php", save_path)
if not file_abspath or not os.path.exists(file_abspath) or not file_abspath.endswith("mp4"):
self.LOG.error(f"视频下载失败,文件路径: {file_abspath}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
sender)
return False, "视频下载失败"
# 发送视频
self.LOG.info(f"准备发送视频: {file_abspath}, 首帧: {first_frame}")
result = await self.bot.send_video_message((roomid if roomid else sender), Path(file_abspath),
Path(first_frame))
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌请求出错:{e}",
sender)
return False, f"处理出错: {e}"
async def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地异步版本
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
self.LOG.info(f"开始从 {url} 下载视频")
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": "https://api.guiguiya.com/"
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, allow_redirects=True) as response:
# 检查请求是否成功
response.raise_for_status()
# 确保保存路径的目录存在
save_dir = os.path.dirname(save_path)
if save_dir and not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
self.LOG.info(f"创建目录: {save_dir}")
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
self.LOG.info(f"响应Content-Type: {content_type}")
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
text = await response.text()
self.LOG.warning(f"响应内容预览: {text[:100]}") # 打印前100字符查看
return None, None
# 以二进制写入模式保存流数据
try:
total_size = int(response.headers.get('Content-Length', 0))
self.LOG.info(f"预期下载大小: {total_size} 字节")
downloaded_size = 0
async with aiofiles.open(save_path, "wb") as file:
async for chunk in response.content.iter_chunked(1024): # 分块读取每块1KB
if chunk: # 过滤空块
await file.write(chunk)
downloaded_size += len(chunk)
self.LOG.info(f"视频已下载到: {save_path}, 大小: {downloaded_size} 字节")
# 验证下载是否完整
if total_size > 0 and downloaded_size < total_size * 0.9: # 如果下载不到预期大小的90%
self.LOG.error(f"下载不完整: 预期 {total_size} 字节, 实际 {downloaded_size} 字节")
return None, None
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
return None, None
# 检查文件是否存在且大小合理
if not os.path.exists(save_path):
self.LOG.error(f"下载的文件不存在: {save_path}")
return None, None
file_size = os.path.getsize(save_path)
if file_size < 10000: # 小于10KB的文件可能不是有效视频
self.LOG.error(f"下载的文件太小,可能不是有效视频: {file_size} 字节")
# 尝试读取文件内容以诊断问题
try:
async with aiofiles.open(save_path, 'rb') as f:
content_preview = await f.read(200)
self.LOG.warning(f"文件内容预览(十六进制): {content_preview.hex()[:100]}")
except Exception as e:
self.LOG.error(f"读取文件内容失败: {e}")
return None, None
# 加入首帧下载
first_frame_path = os.path.join(self.download_dir, f"frame_{int(time.time())}.jpg")
first_frame = await self._get_first_frame(save_path, first_frame_path)
if not first_frame or not os.path.exists(first_frame):
self.LOG.warning(f"无法提取首帧,使用默认图片")
# 可以在这里设置一个默认图片路径
return os.path.abspath(save_path), first_frame
except aiohttp.ClientError as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"下载视频时发生未知错误: {e}")
return None, None
async def _get_first_frame(self, video_path, output_path):
"""
提取视频的第一帧并保存为图片(异步版本)
:param video_path: 视频文件路径
:param output_path: 输出图片路径
:return: 输出图片的绝对路径如果失败则返回None
"""
try:
self.LOG.info(f"开始提取视频首帧: {video_path}")
# 使用 asyncio.to_thread 包装 OpenCV 操作
def extract_frame():
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
self.LOG.error(f"无法打开视频: {video_path}")
return None
# 读取首帧
ret, frame = cap.read()
if not ret:
self.LOG.error("无法读取视频帧")
cap.release()
return None
# 保存首帧为图片
try:
cv2.imwrite(output_path, frame)
self.LOG.info(f"首帧已保存为: {output_path}")
except Exception as e:
self.LOG.error(f"保存首帧图片失败: {e}")
cap.release()
return None
# 释放资源
cap.release()
return os.path.abspath(output_path)
# 在线程池中执行 OpenCV 操作
result = await asyncio.to_thread(extract_frame)
return result
except Exception as e:
self.LOG.error(f"提取视频首帧时出错: {e}")
return None