Files
abot/plugins/ai_auto_response/main.py
2025-05-21 17:27:09 +08:00

202 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import os
import json
import requests
from typing import Dict, Any, List, Optional, Tuple
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.plugin_interface import PluginStatus
from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from wechat_ipad import WechatAPIClient
from .bot_ai import InterventionBot
class AIAutoResponsePlugin(MessagePluginInterface):
"""AI自动对话插件"""
@property
def name(self) -> str:
return "AI自动对话"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供AI自动对话功能可以在群聊中自动介入对话"
@property
def author(self) -> str:
return "Trae AI"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
def __init__(self):
super().__init__()
self.intervention_bot = None
self.group_messages = {} # 存储每个群的最近消息
self.max_messages = 20 # 每个群最多存储的消息数量
# DIFY API配置
self.dify_api_url = "http://192.168.2.240/v1/chat-messages"
self.dify_api_key = "" # 需要在配置文件中设置
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.info(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 加载配置
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
self.enable = self._config.get("enable", True)
# 从配置中获取DIFY API密钥
self.dify_api_key = self._config.get("dify_api_key", "")
self.dify_api_url = self._config.get("dify_api_url", "http://192.168.2.240/v1/chat-messages")
# 初始化介入机器人
self.intervention_bot = InterventionBot(config_path)
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.info(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
if GroupBotManager.get_group_permission(roomid, Feature.AI_AUTO) == PermissionStatus.DISABLED:
return False
# 如果是群消息,且该群启用了自动回复,则处理
if roomid:
self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑")
# 存储消息
if roomid not in self.group_messages:
self.group_messages[roomid] = []
# 添加新消息
current_message = {
"timestamp": message.get("timestamp", ""),
"message": content,
"sender": message.get("sender", "")
}
# 添加新消息
self.group_messages[roomid].append(current_message)
# 限制消息数量
if len(self.group_messages[roomid]) > self.max_messages:
self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:]
# 判断是否需要介入
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
# 传递完整的聊天记录给should_intervene方法
can = self.intervention_bot.should_intervene(timestamp, content, messages, self.group_messages[roomid])
if can:
self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复")
return True
else:
self.LOG.debug(f"[{roomid}] 跳过聊天")
return False
return False
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and GroupBotManager.get_group_permission(roomid, Feature.AI_AUTO) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理自动回复
try:
# 获取最近的消息
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
# 记录触发原因
if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]):
self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复")
# 生成回复
response = self._generate_response_with_dify(content, messages)
if response:
# 发送回复
await bot.send_text_message(roomid, response, sender)
return True, "自动回复成功"
else:
return False, "生成回复失败"
except Exception as e:
self.LOG.error(f"处理AI自动对话出错: {e}")
return False, f"处理出错: {e}"
def _generate_response_with_dify(self, message: str, messages: List[str]) -> str:
"""使用DIFY API生成自动回复内容"""
try:
# 检测话题类型
topic_type = self.intervention_bot.detect_topic(message)
# 构建上下文消息
context_messages = messages[-5:] if len(messages) > 5 else messages
context = "\n".join(context_messages)
# 构建提示词
prompt = f"请根据以下群聊上下文,生成一个自然、友好的回复,主要关注最后一句消息,前面的作为参考信息。\n上下文:\n{context}\n\n当前话题类型:{topic_type or '一般聊天'}\n\n请生成回复:"
# 调用DIFY API
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.dify_api_key}"
}
payload = {
"inputs": {},
"query": prompt,
"response_mode": "blocking",
"user": "ai_auto_response"
}
response = requests.post(self.dify_api_url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
return result.get("answer", "")
else:
self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}")
return ""
except Exception as e:
self.LOG.error(f"生成回复出错: {e}")
return ""