""" 随机视频插件 用户发送关键词获取随机小姐姐视频 """ import tomllib import httpx import uuid from pathlib import Path from loguru import logger from typing import List from utils.plugin_base import PluginBase from utils.decorators import on_text_message class RandomVideo(PluginBase): """随机视频插件""" description = "随机小姐姐视频" author = "ShiHao" version = "1.0.0" def __init__(self): super().__init__() self.config = None async def async_init(self): """异步初始化""" config_path = Path(__file__).parent / "config.toml" with open(config_path, "rb") as f: self.config = tomllib.load(f) logger.success("随机视频插件已加载") @on_text_message(priority=65) async def handle_message(self, bot, message: dict): """处理文本消息""" content = message.get("Content", "").strip() from_wxid = message.get("FromWxid", "") is_group = message.get("IsGroup", False) # 精确匹配关键词 if content not in self.config["behavior"]["keywords"]: return True if not self.config["behavior"]["enabled"]: return True # 检查群聊过滤 if is_group: enabled_groups = self.config["behavior"]["enabled_groups"] disabled_groups = self.config["behavior"]["disabled_groups"] if from_wxid in disabled_groups: return True if enabled_groups and from_wxid not in enabled_groups: return True logger.info(f"收到随机视频请求: {from_wxid}") try: # 获取视频URL video_url = await self._fetch_video_url() if not video_url: await bot.send_text(from_wxid, self.config["messages"]["api_error"]) return False # 下载视频 video_path = await self._download_video(video_url) if not video_path: await bot.send_text(from_wxid, self.config["messages"]["download_error"]) return False # 发送视频 success = await bot.send_media(from_wxid, video_path, media_type="video") if success: logger.success(f"随机视频发送成功: {from_wxid}") # 延迟删除,等待微信上传完成 import asyncio asyncio.create_task(self._delayed_cleanup(video_path, 120)) else: await bot.send_text(from_wxid, self.config["messages"]["send_error"]) logger.error(f"随机视频发送失败: {from_wxid}") # 立即删除失败的文件 try: Path(video_path).unlink() except: pass except Exception as e: logger.error(f"随机视频处理失败: {e}") await bot.send_text(from_wxid, self.config["messages"]["api_error"]) return False async def _delayed_cleanup(self, file_path: str, delay: int): """延迟删除文件""" import asyncio await asyncio.sleep(delay) try: Path(file_path).unlink() logger.info(f"已清理临时文件: {file_path}") except Exception as e: logger.warning(f"清理临时文件失败: {e}") async def _fetch_video_url(self) -> str: """获取视频URL""" try: timeout = httpx.Timeout(self.config["api"]["timeout"]) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get(self.config["api"]["url"]) if response.status_code != 200: logger.error(f"API返回错误: {response.status_code}") return "" result = response.json() if result.get("code") != 200: logger.error(f"API错误: {result.get('msg')}") return "" video_url = result.get("data", "") logger.info(f"获取到视频URL: {video_url}") return video_url except Exception as e: logger.error(f"获取视频URL失败: {e}") return "" async def _download_video(self, video_url: str) -> str: """下载视频到本地""" try: videos_dir = Path(__file__).parent / "videos" videos_dir.mkdir(exist_ok=True) filename = f"random_{uuid.uuid4().hex[:8]}.mp4" file_path = videos_dir / filename timeout = httpx.Timeout(connect=10.0, read=60.0, write=10.0, pool=10.0) async with httpx.AsyncClient(timeout=timeout) as client: response = await client.get(video_url) response.raise_for_status() with open(file_path, "wb") as f: f.write(response.content) logger.info(f"视频下载成功: {file_path}") return str(file_path.resolve()) except Exception as e: logger.error(f"下载视频失败: {e}") return ""