192 lines
7.1 KiB
Python
192 lines
7.1 KiB
Python
import os
|
||
import time
|
||
import urllib.parse
|
||
import uuid
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
import requests
|
||
from loguru import logger
|
||
from pathlib import Path
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.decorator.plugin_decorators import plugin_stats_decorator
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from utils.decorator.points_decorator import plugin_points_cost
|
||
from wechat_ipad import WechatAPIClient
|
||
|
||
|
||
class AIGenImagePlugin(MessagePluginInterface):
|
||
"""AI绘图插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "AI_GEN_IMAGE"
|
||
FEATURE_DESCRIPTION = "🎨 AI绘图功能 [AI绘图, 绘图, 画图, 生成图片]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "AI绘图"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供AI绘图功能,基于pollinations.ai生成图片"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.feature = self.register_feature()
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.info(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
|
||
# 从配置文件加载配置
|
||
self._commands = self._config.get("AIGenImage", {}).get("command", ["AI绘图", "绘图", "画图", "生成图片"])
|
||
self.command_format = self._config.get("AIGenImage", {}).get("command-format", "AI绘图 描述文字")
|
||
self.enable = self._config.get("AIGenImage", {}).get("enable", True)
|
||
|
||
# API配置
|
||
self.image_api_url = self._config.get("AIGenImage", {}).get("image_api_url",
|
||
"https://image.pollinations.ai/prompt/{prompt}")
|
||
self.default_width = self._config.get("AIGenImage", {}).get("default_width", 1024)
|
||
self.default_height = self._config.get("AIGenImage", {}).get("default_height", 1024)
|
||
self.default_model = self._config.get("AIGenImage", {}).get("default_model", "turbo")
|
||
self.default_timeout = self._config.get("AIGenImage", {}).get("default_timeout", 300)
|
||
|
||
# 确保临时目录存在
|
||
self.temp_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
|
||
'temp')
|
||
os.makedirs(self.temp_dir, exist_ok=True)
|
||
|
||
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
command = content.split(" ")[0]
|
||
|
||
return command in self._commands
|
||
|
||
@plugin_stats_decorator(plugin_name="AI绘图")
|
||
@plugin_points_cost(20, "AI绘图消耗积分", FEATURE_KEY)
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
command = content.split(" ")[0]
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
gbm: GroupBotManager = message.get("gbm")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
|
||
# 检查命令格式
|
||
if len(content.split(" ")) == 1:
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}"
|
||
, sender)
|
||
return False, "命令格式错误"
|
||
|
||
# 检查权限
|
||
if roomid and gbm.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
|
||
# 提取描述文字
|
||
prompt = content[len(command):].strip()
|
||
|
||
try:
|
||
# 发送提示消息
|
||
await bot.send_text_message((roomid if roomid else sender), f"🎨正在生成图片,请稍候...", sender)
|
||
|
||
# 生成图片
|
||
image_path = self._generate_image(prompt)
|
||
if not image_path or not os.path.exists(image_path):
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌生成图片失败,请重试", sender)
|
||
return False, "生成图片失败"
|
||
|
||
# 发送图片
|
||
await bot.send_image_message((roomid if roomid else sender), Path(image_path))
|
||
return True, "发送成功"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理AI绘图请求出错: {e}")
|
||
await bot.send_text_message((roomid if roomid else sender), f"❌生成图片出错: {str(e)}", sender)
|
||
return False, f"处理出错: {e}"
|
||
|
||
def _generate_image(self, prompt: str) -> str:
|
||
"""生成图片并返回图片路径"""
|
||
try:
|
||
# 准备API参数
|
||
params = {
|
||
"width": self.default_width,
|
||
"height": self.default_height,
|
||
"model": self.default_model,
|
||
"seed": int(time.time()) % 1000000, # 使用时间戳作为随机种子
|
||
"nologo": "true" # Optional, set to "true" for registered referrers/tokens
|
||
}
|
||
|
||
# 编码提示词
|
||
encoded_prompt = urllib.parse.quote(prompt)
|
||
url = self.image_api_url.format(prompt=encoded_prompt)
|
||
|
||
self.LOG.info(f"正在生成图片,提示词: {prompt[:30]}...")
|
||
|
||
# 发送请求
|
||
response = requests.get(url, params=params, timeout=self.default_timeout)
|
||
response.raise_for_status()
|
||
|
||
# 保存图片
|
||
image_filename = f"ai_image_{uuid.uuid4().hex[:8]}.jpg"
|
||
image_path = os.path.join(self.temp_dir, image_filename)
|
||
|
||
with open(image_path, 'wb') as f:
|
||
f.write(response.content)
|
||
|
||
self.LOG.info(f"图片生成成功,保存至: {image_path}")
|
||
return image_path
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"生成图片出错: {e}")
|
||
return ""
|