添加新的插件。

This commit is contained in:
liuwei
2025-05-21 16:03:18 +08:00
parent f208a51bbd
commit c1b9517990
5 changed files with 410 additions and 0 deletions

View File

@@ -0,0 +1,219 @@
from loguru import logger
import os
import json
import requests
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
from .bot_ai import InterventionBot
class AIAutoResponsePlugin(MessagePluginInterface):
"""AI自动对话插件"""
@property
def name(self) -> str:
return "AI自动对话"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供AI自动对话功能可以在群聊中自动介入对话"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.intervention_bot = None
self.group_messages = {} # 存储每个群的最近消息
self.max_messages = 20 # 每个群最多存储的消息数量
self.auto_response_enabled = {} # 存储每个群是否启用自动回复
# DIFY API配置
self.dify_api_url = "http://192.168.2.240/v1/chat-messages"
self.dify_api_key = "" # 需要在配置文件中设置
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 加载配置
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
self._commands = self._config.get("AIAutoResponse", {}).get("command", ["ai介入", "ai对话", "ai自动回复"])
self.command_format = self._config.get("AIAutoResponse", {}).get("command-format", "ai介入 开启/关闭")
self.enable = self._config.get("AIAutoResponse", {}).get("enable", True)
# 从配置中获取DIFY API密钥
self.dify_api_key = self._config.get("AIAutoResponse", {}).get("dify_api_key", "")
self.dify_api_url = self._config.get("AIAutoResponse", {}).get("dify_api_url",
"http://192.168.2.240/v1/chat-messages")
# 初始化介入机器人
self.intervention_bot = InterventionBot(config_path)
self.LOG.info(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
command = content.split(" ")[0]
roomid = message.get("roomid", "")
# 如果是命令,直接处理
if command in self._commands:
return True
# 如果是群消息,且该群启用了自动回复,则处理
if roomid and self.auto_response_enabled.get(roomid, False):
# 存储消息
if roomid not in self.group_messages:
self.group_messages[roomid] = []
# 添加新消息
self.group_messages[roomid].append({
"timestamp": message.get("timestamp", ""),
"message": content,
"sender": message.get("sender", "")
})
# 限制消息数量
if len(self.group_messages[roomid]) > self.max_messages:
self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:]
# 判断是否需要介入
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
return self.intervention_bot.should_intervene(timestamp, content, messages)
return False
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
command = content.split(" ")[0]
sender = message.get("sender")
roomid = message.get("roomid", "")
gbm: GroupBotManager = message.get("gbm")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and gbm.get_group_permission(roomid, Feature.AI_AUTO) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理命令
if command in self._commands:
# 检查命令格式
if len(content.split(" ")) == 1:
await bot.send_text_message((roomid if roomid else sender), f"❌命令格式错误!\n{self.command_format}",
sender)
return False, "命令格式错误"
# 提取参数
param = content[len(command):].strip()
if param == "开启":
self.auto_response_enabled[roomid] = True
await bot.send_text_message(roomid, "✅AI自动对话已开启", sender)
return True, "开启成功"
elif param == "关闭":
self.auto_response_enabled[roomid] = False
await bot.send_text_message(roomid, "❌AI自动对话已关闭", sender)
return True, "关闭成功"
else:
await bot.send_text_message(roomid, f"❌参数错误!\n{self.command_format}", sender)
return False, "参数错误"
# 处理自动回复
try:
# 获取最近的消息
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
# 生成回复
response = self._generate_response_with_dify(content, messages)
if response:
# 发送回复
await bot.send_text_message(roomid, response, sender)
return True, "自动回复成功"
except Exception as e:
self.LOG.error(f"处理AI自动对话出错: {e}")
return False, f"处理出错: {e}"
def _generate_response_with_dify(self, message: str, messages: List[str]) -> str:
"""使用DIFY API生成自动回复内容"""
try:
# 检测话题类型
topic_type = self.intervention_bot.detect_topic(message)
# 构建上下文消息
context_messages = messages[-5:] if len(messages) > 5 else messages
context = "\n".join(context_messages)
# 构建提示词
prompt = f"请根据以下群聊上下文,生成一个自然、友好的回复。\n\n上下文:\n{context}\n\n当前话题类型:{topic_type or '一般聊天'}\n\n请生成回复:"
# 调用DIFY API
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.dify_api_key}"
}
payload = {
"inputs": {},
"query": prompt,
"response_mode": "blocking",
"user": "ai_auto_response"
}
response = requests.post(self.dify_api_url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
return result.get("answer", "")
else:
self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}")
return ""
except Exception as e:
self.LOG.error(f"生成回复出错: {e}")
return ""