162 lines
5.8 KiB
Python
162 lines
5.8 KiB
Python
import logging
|
||
import os
|
||
import random
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from wcferry import Wcf
|
||
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
|
||
|
||
class XiurenImagePlugin(MessagePluginInterface):
|
||
"""秀人图片插件"""
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "秀人图片"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供随机秀人网图片功能"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Trae AI"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.image_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "xiuren")
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logging.getLogger(f"Plugin.{self.name}")
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.wcf = context.get("wcf")
|
||
self.event_system = context.get("event_system")
|
||
self.message_util = context.get("message_util")
|
||
|
||
self._commands = self._config.get("XiurenImage", {}).get("command", ["图来", "秀人"])
|
||
self.command_format = self._config.get("XiurenImage", {}).get("command-format", "图来")
|
||
self.enable = self._config.get("XiurenImage", {}).get("enable", True)
|
||
|
||
# 检查图片文件夹是否存在
|
||
if not os.path.exists(self.image_folder):
|
||
self.LOG.warning(f"图片文件夹不存在: {self.image_folder}")
|
||
os.makedirs(self.image_folder, exist_ok=True)
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="秀人图片")
|
||
@plugin_points_cost(2, "秀人图片消耗积分", Feature.PIC)
|
||
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.info(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
wcf: Wcf = message.get("wcf")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, Feature.PIC) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
try:
|
||
# 获取随机图片
|
||
pic_path = self._get_random_pic()
|
||
if not pic_path:
|
||
wcf.send_text(f"❌未找到图片资源",
|
||
(roomid if roomid else sender), sender)
|
||
return True, "未找到图片资源"
|
||
|
||
# 发送图片
|
||
result = wcf.send_file(pic_path, (roomid if roomid else sender))
|
||
self.LOG.info(f"发送图片结果: {result}")
|
||
return True, "发送成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理图片请求出错: {e}")
|
||
return True, f"处理出错: {e}"
|
||
|
||
def _get_random_pic(self) -> Optional[str]:
|
||
"""获取随机图片路径"""
|
||
try:
|
||
# 获取图片文件夹中的所有图片
|
||
if not os.path.exists(self.image_folder):
|
||
self.LOG.error(f"图片文件夹不存在: {self.image_folder}")
|
||
return None
|
||
image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}
|
||
image_files = []
|
||
if not os.path.exists(self.image_folder):
|
||
print(f"Error: Directory '{self.image_folder}' does not exist.")
|
||
return None
|
||
|
||
if not os.access(self.image_folder, os.R_OK):
|
||
print(f"Error: No read access to directory '{self.image_folder}'.")
|
||
return None
|
||
|
||
print(f"Scanning directory: {self.image_folder} (including subdirectories)")
|
||
|
||
# 使用 os.walk() 递归遍历所有子目录
|
||
for root, _, files in os.walk(self.image_folder):
|
||
for file in files:
|
||
_, ext = os.path.splitext(file)
|
||
if ext.lower() in image_extensions:
|
||
full_path = os.path.join(root, file)
|
||
image_files.append(full_path)
|
||
|
||
if not image_files:
|
||
print("No image files found in the directory (including subdirectories).")
|
||
return None
|
||
# 随机选择一张图片
|
||
random_pic = random.choice(image_files)
|
||
if random_pic:
|
||
return os.path.abspath(random_pic)
|
||
else:
|
||
return None
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"获取随机图片出错: {e}")
|
||
return None
|