Files
abot/plugins/beautyleg/main.py

153 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
import os
import random
from typing import Dict, Any, List, Optional, Tuple
from wcferry import Wcf
from message_util import MessageUtil
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from plugins.stats_collector.decorators import plugin_stats_decorator
from robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
class BeautyLegPlugin(MessagePluginInterface):
"""美腿图片插件"""
@property
def name(self) -> str:
return "美腿图片"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供随机美腿图片功能"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"beautyleg", "download_dir")
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logging.getLogger(f"Plugin.{self.name}")
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.wcf = context.get("wcf")
self.event_system = context.get("event_system")
self.message_util: MessageUtil = context.get("message_util")
self.gbm = context.get("gbm")
self._commands = self._config.get("Beautyleg", {}).get("command", ["美腿", "腿来"])
self.command_format = self._config.get("Beautyleg", {}).get("command-format", "美腿")
self.enable = self._config.get("Beautyleg", {}).get("enable", True)
# 检查图片文件夹是否存在
if not os.path.exists(self.image_folder):
self.LOG.warning(f"图片文件夹不存在: {self.image_folder}")
os.makedirs(self.image_folder, exist_ok=True)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
return command in self._commands
@plugin_stats_decorator(plugin_name="美腿图片")
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.info(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
wcf: Wcf = message.get("wcf")
gbm: GroupBotManager = message.get("gbm")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.BEAUTY_LEG) == PermissionStatus.DISABLED:
return False, "没有权限"
try:
# 获取随机图片
random_file_path = self._get_random_file_from_dir(self.image_folder)
if not random_file_path:
self.message_util.send_text_msg(f"\n❌未找到美腿图片资源",
(roomid if roomid else sender), sender)
return True, "未找到图片资源"
# 发送图片
random_file_path = os.path.abspath(random_file_path)
self.LOG.info(f"BeautyLeg.random_file_path: {random_file_path}")
result = wcf.send_file(random_file_path, (roomid if roomid else sender))
self.LOG.info(f"发送图片结果: {result}")
return True, "发送成功"
except Exception as e:
self.LOG.error(f"处理美腿图片请求出错: {e}")
return True, f"处理出错: {e}"
def _get_random_file_from_dir(self, directory):
"""获取随机图片路径"""
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
image_files = []
if not os.path.exists(directory):
self.LOG.error(f"Error: Directory '{directory}' does not exist.")
return None
if not os.access(directory, os.R_OK):
self.LOG.error(f"Error: No read access to directory '{directory}'.")
return None
self.LOG.info(f"Scanning directory: {directory} (including subdirectories)")
# 使用 os.walk() 递归遍历所有子目录
for root, _, files in os.walk(directory):
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() in image_extensions:
full_path = os.path.join(root, file)
image_files.append(full_path)
if not image_files:
self.LOG.error("No image files found in the directory (including subdirectories).")
return None
return random.choice(image_files)