Files
abot/plugins/video/main.py
2025-05-09 14:28:18 +08:00

184 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
import cv2
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
class VideoPlugin(MessagePluginInterface):
"""视频插件"""
@property
def name(self) -> str:
return "黑丝视频"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供视频点播功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.bot: WechatAPIClient = None
# 修改为使用插件目录下的down_load_dir文件夹
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "down_load_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
self.message_util = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("Video", {}).get("command", ["黑丝视频", "黑丝", "来个黑丝", "搞个黑丝"])
self.command_format = self._config.get("Video", {}).get("command-format", "黑丝")
self.enable = self._config.get("Video", {}).get("enable", True)
# 确保下载目录存在
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="视频插件")
@plugin_points_cost(2, "视频插件消耗积分", Feature.VIDEO)
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
self.bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.VIDEO) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 下载视频
save_path = os.path.join(self.download_dir, "video.mp4")
file_abspath, first_frame = self._download_stream("https://api.guiguiya.com/api/hook/heisis", save_path)
if not file_abspath or not file_abspath.endswith("mp4"):
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌视频下载失败,请稍后再试",
sender)
return False, "视频下载失败"
# 发送视频
result = await self.bot.send_video_message((roomid if roomid else sender), file_abspath, first_frame)
self.LOG.info(f"发送视频结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理视频请求出错: {e}")
await self.bot.send_text_message((roomid if roomid else sender), f"\n❌请求出错:{e}",
sender)
return False, f"处理出错: {e}"
def _download_stream(self, url, save_path):
"""
从指定URL读取视频流并保存到本地
:param url: 视频流的URL
:param save_path: 本地保存路径(包含文件名,例如 "video.mp4"
"""
try:
# 发送GET请求启用流式传输
response = requests.get(url, stream=True)
# 检查请求是否成功
response.raise_for_status() # 如果状态码不是200将抛出异常
# 确保保存路径的目录存在
os.makedirs(os.path.dirname(save_path) or ".", exist_ok=True)
# 检查是否是视频流可选根据Content-Type判断
content_type = response.headers.get("Content-Type", "").lower()
if "video" not in content_type and "application/octet-stream" not in content_type:
self.LOG.warning(f"警告: 返回的可能不是视频流Content-Type: {content_type}")
self.LOG.warning(f"响应内容预览: {response.text[:100]}") # 打印前100字符查看
return None, None
# 以二进制写入模式保存流数据
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=1024): # 分块读取每块1KB
if chunk: # 过滤空块
file.write(chunk)
self.LOG.info(f"视频已下载到: {save_path}")
# 加入首帧下载
first_frame = self._get_first_frame(save_path)
return os.path.abspath(save_path), first_frame
except requests.RequestException as e:
self.LOG.error(f"请求失败: {e}")
except IOError as e:
self.LOG.error(f"文件写入失败: {e}")
except Exception as e:
self.LOG.error(f"发生未知错误: {e}")
return None, None
def _get_first_frame(self, video_path, output_path="first_frame.jpg"):
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("无法打开视频")
return
# 读取首帧
ret, frame = cap.read()
if ret:
# 保存首帧为图片
cv2.imwrite(output_path, frame)
print(f"首帧已保存为 {output_path}")
# 释放资源
cap.release()
return os.path.abspath(output_path)