Files
abot/plugins/ai_auto_response/main.py
2026-02-02 16:02:44 +08:00

262 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from loguru import logger
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import MessageType
import xml.etree.ElementTree as ET
from .bot_ai import InterventionBot
class AIAutoResponsePlugin(MessagePluginInterface):
"""AI自动对话插件"""
# 功能权限常量
FEATURE_KEY = "AI_AUTO_RESPONSE"
FEATURE_DESCRIPTION = "🤖 AI自动对话功能 [自动对话]"
@property
def name(self) -> str:
return "AI自动对话"
@property
def version(self) -> str:
return "1.0.0"
@property
def description(self) -> str:
return "提供AI自动对话功能可以在群聊中自动介入对话"
@property
def author(self) -> str:
return "liu.wei"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
@property
def commands(self) -> List[str]:
return self._commands
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.intervention_bot = None
self.group_messages = {} # 存储每个群的最近消息
self.max_messages = 100 # 每个群最多存储的消息数量
# 注册功能权限
self.feature = self.register_feature()
# DIFY API配置
self.dify_api_url = ""
self.dify_api_key = "" # 需要在配置文件中设置
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 加载配置
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
self.enable = self._config.get("enable", True)
# 从配置中获取DIFY API密钥
self.dify_api_key = self._config.get("dify_api_key", "")
self.dify_api_url = self._config.get("dify_api_url", "")
# 初始化介入机器人
self.intervention_bot = InterventionBot(config_path)
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False
# 如果是群消息,且该群启用了自动回复,则处理
if roomid:
self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑")
# 存储消息
if roomid not in self.group_messages:
self.group_messages[roomid] = []
msg_type = message.get("type")
# 获取发送者昵称
sender_id = message.get("sender", "")
try:
members = ContactManager.get_instance().get_group_members(roomid)
sender_name = members.get(sender_id, sender_id)
except Exception:
sender_name = sender_id
# 仅追加文本(1)与应用消息(49)并对49提取标题
content_to_store = None
try:
if msg_type == MessageType.TEXT:
content_to_store = content
elif msg_type == MessageType.APP:
try:
root = ET.fromstring(content)
title_elem = root.find('.//title')
if title_elem is not None and title_elem.text:
content_to_store = title_elem.text
else:
content_to_store = "[应用消息]"
except Exception as e:
self.LOG.error(f"解析消息类型49出错: {e}")
content_to_store = "[应用消息]"
except Exception as e:
self.LOG.error(f"处理消息类型出错: {e}")
content_to_store = None
if content_to_store is not None:
# 添加新消息
current_message = {
"timestamp": message.get("timestamp", ""),
"message": content_to_store,
"sender": sender_id,
"sender_name": sender_name
}
# 添加新消息
if content_to_store is not None:
self.group_messages[roomid].append(current_message)
# 限制消息数量
if len(self.group_messages[roomid]) > self.max_messages:
self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:]
# 判断是否需要介入
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
# 传递完整的聊天记录给should_intervene方法
can = self.intervention_bot.should_intervene(roomid, timestamp, content, messages, self.group_messages[roomid])
if can:
self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复")
return True
else:
self.LOG.debug(f"[{roomid}] 跳过聊天")
return False
return False
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理自动回复
try:
# 获取最近的消息 (完整的消息对象)
chat_history = self.group_messages[roomid]
timestamp = message.get("timestamp", "")
# 记录触发原因
if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]):
self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复")
# 生成回复
response = self._generate_response_with_dify(content, chat_history)
if response:
# 发送回复
await bot.send_text_message(roomid, response, sender)
return False, "自动回复成功"
else:
return False, "生成回复失败"
except Exception as e:
self.LOG.error(f"处理AI自动对话出错: {e}")
return False, f"处理出错: {e}"
def _generate_response_with_dify(self, current_message: str, chat_history: List[Dict[str, Any]]) -> str:
"""使用DIFY API生成自动回复内容"""
try:
# 构建上下文消息
# 取更多上下文以帮助理解语境
recent_msgs = chat_history[-10:] if len(chat_history) > 10 else chat_history
context_str_list = []
for msg in recent_msgs:
# 优先使用昵称如果没有则使用sender ID
sender = msg.get("sender_name") or msg.get("sender", "Unknown")
content = msg.get("message", "")
context_str_list.append(f"{sender}: {content}")
context = "\n".join(context_str_list)
# 构建提示词 - 增强拟人化指令
prompt = (
f"当前群聊上下文(格式为 '发言人: 内容',最后一句是最新消息):\n{context}\n\n"
f"指令:\n"
f"1. 参考上下文。\n"
f"2. 保持简短1-2句话口语化不要长篇大论。\n"
f"3. 不要重复之前的回复。\n"
f"4. 如果最后一句不是对你说的,且你觉得没必要强行接话,可以回个表情或简短的语气词,或者委婉结束话题。\n"
f"请生成回复:"
)
# 调用DIFY API
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.dify_api_key}"
}
payload = {
"inputs": {},
"query": prompt,
"response_mode": "blocking",
"user": "ai_auto_response"
}
response = requests.post(self.dify_api_url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
return result.get("answer", "")
else:
self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}")
return ""
except Exception as e:
self.LOG.error(f"生成回复出错: {e}")
return ""