262 lines
9.8 KiB
Python
262 lines
9.8 KiB
Python
from loguru import logger
|
||
import os
|
||
import requests
|
||
from typing import Dict, Any, List, Optional, Tuple
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
|
||
from utils.wechat.contact_manager import ContactManager
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.message import MessageType
|
||
import xml.etree.ElementTree as ET
|
||
|
||
from .bot_ai import InterventionBot
|
||
|
||
|
||
class AIAutoResponsePlugin(MessagePluginInterface):
|
||
"""AI自动对话插件"""
|
||
|
||
# 功能权限常量
|
||
FEATURE_KEY = "AI_AUTO_RESPONSE"
|
||
FEATURE_DESCRIPTION = "🤖 AI自动对话功能 [自动对话]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "AI自动对话"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "1.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "提供AI自动对话功能,可以在群聊中自动介入对话"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "liu.wei"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
return "" # 不需要前缀,直接匹配命令
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return self._commands
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.intervention_bot = None
|
||
self.group_messages = {} # 存储每个群的最近消息
|
||
self.max_messages = 100 # 每个群最多存储的消息数量
|
||
# 注册功能权限
|
||
self.feature = self.register_feature()
|
||
|
||
# DIFY API配置
|
||
self.dify_api_url = ""
|
||
self.dify_api_key = "" # 需要在配置文件中设置
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""初始化插件"""
|
||
self.LOG = logger
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
# 保存上下文对象
|
||
self.event_system = context.get("event_system")
|
||
|
||
# 加载配置
|
||
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
|
||
self.enable = self._config.get("enable", True)
|
||
|
||
# 从配置中获取DIFY API密钥
|
||
self.dify_api_key = self._config.get("dify_api_key", "")
|
||
self.dify_api_url = self._config.get("dify_api_url", "")
|
||
|
||
# 初始化介入机器人
|
||
self.intervention_bot = InterventionBot(config_path)
|
||
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
"""启动插件"""
|
||
self.LOG.debug(f"[{self.name}] 插件已启动")
|
||
self.status = PluginStatus.RUNNING
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
"""停止插件"""
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
self.status = PluginStatus.STOPPED
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""检查是否可以处理该消息"""
|
||
if not self.enable:
|
||
return False
|
||
|
||
content = str(message.get("content", "")).strip()
|
||
roomid = message.get("roomid", "")
|
||
|
||
if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False
|
||
# 如果是群消息,且该群启用了自动回复,则处理
|
||
if roomid:
|
||
self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑")
|
||
# 存储消息
|
||
if roomid not in self.group_messages:
|
||
self.group_messages[roomid] = []
|
||
|
||
msg_type = message.get("type")
|
||
# 获取发送者昵称
|
||
sender_id = message.get("sender", "")
|
||
try:
|
||
members = ContactManager.get_instance().get_group_members(roomid)
|
||
sender_name = members.get(sender_id, sender_id)
|
||
except Exception:
|
||
sender_name = sender_id
|
||
|
||
# 仅追加文本(1)与应用消息(49),并对49提取标题
|
||
content_to_store = None
|
||
try:
|
||
if msg_type == MessageType.TEXT:
|
||
content_to_store = content
|
||
elif msg_type == MessageType.APP:
|
||
try:
|
||
root = ET.fromstring(content)
|
||
title_elem = root.find('.//title')
|
||
if title_elem is not None and title_elem.text:
|
||
content_to_store = title_elem.text
|
||
else:
|
||
content_to_store = "[应用消息]"
|
||
except Exception as e:
|
||
self.LOG.error(f"解析消息类型49出错: {e}")
|
||
content_to_store = "[应用消息]"
|
||
except Exception as e:
|
||
self.LOG.error(f"处理消息类型出错: {e}")
|
||
content_to_store = None
|
||
|
||
if content_to_store is not None:
|
||
# 添加新消息
|
||
current_message = {
|
||
"timestamp": message.get("timestamp", ""),
|
||
"message": content_to_store,
|
||
"sender": sender_id,
|
||
"sender_name": sender_name
|
||
}
|
||
|
||
# 添加新消息
|
||
if content_to_store is not None:
|
||
self.group_messages[roomid].append(current_message)
|
||
|
||
# 限制消息数量
|
||
if len(self.group_messages[roomid]) > self.max_messages:
|
||
self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:]
|
||
|
||
# 判断是否需要介入
|
||
messages = [msg["message"] for msg in self.group_messages[roomid]]
|
||
timestamp = message.get("timestamp", "")
|
||
# 传递完整的聊天记录给should_intervene方法
|
||
can = self.intervention_bot.should_intervene(roomid, timestamp, content, messages, self.group_messages[roomid])
|
||
if can:
|
||
self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复")
|
||
return True
|
||
else:
|
||
self.LOG.debug(f"[{roomid}] 跳过聊天")
|
||
return False
|
||
return False
|
||
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""处理消息"""
|
||
content = str(message.get("content", "")).strip()
|
||
self.LOG.debug(f"插件执行: {self.name}:{content}")
|
||
sender = message.get("sender")
|
||
roomid = message.get("roomid", "")
|
||
bot: WechatAPIClient = message.get("bot")
|
||
# 检查权限
|
||
if roomid and GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
|
||
return False, "没有权限"
|
||
# 处理自动回复
|
||
try:
|
||
# 获取最近的消息 (完整的消息对象)
|
||
chat_history = self.group_messages[roomid]
|
||
timestamp = message.get("timestamp", "")
|
||
|
||
# 记录触发原因
|
||
if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]):
|
||
self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复")
|
||
|
||
# 生成回复
|
||
response = self._generate_response_with_dify(content, chat_history)
|
||
if response:
|
||
# 发送回复
|
||
await bot.send_text_message(roomid, response, sender)
|
||
return False, "自动回复成功"
|
||
else:
|
||
return False, "生成回复失败"
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"处理AI自动对话出错: {e}")
|
||
return False, f"处理出错: {e}"
|
||
|
||
def _generate_response_with_dify(self, current_message: str, chat_history: List[Dict[str, Any]]) -> str:
|
||
"""使用DIFY API生成自动回复内容"""
|
||
try:
|
||
# 构建上下文消息
|
||
# 取更多上下文以帮助理解语境
|
||
recent_msgs = chat_history[-10:] if len(chat_history) > 10 else chat_history
|
||
|
||
context_str_list = []
|
||
for msg in recent_msgs:
|
||
# 优先使用昵称,如果没有则使用sender ID
|
||
sender = msg.get("sender_name") or msg.get("sender", "Unknown")
|
||
content = msg.get("message", "")
|
||
context_str_list.append(f"{sender}: {content}")
|
||
|
||
context = "\n".join(context_str_list)
|
||
|
||
# 构建提示词 - 增强拟人化指令
|
||
prompt = (
|
||
f"当前群聊上下文(格式为 '发言人: 内容',最后一句是最新消息):\n{context}\n\n"
|
||
f"指令:\n"
|
||
f"1. 参考上下文。\n"
|
||
f"2. 保持简短(1-2句话),口语化,不要长篇大论。\n"
|
||
f"3. 不要重复之前的回复。\n"
|
||
f"4. 如果最后一句不是对你说的,且你觉得没必要强行接话,可以回个表情或简短的语气词,或者委婉结束话题。\n"
|
||
f"请生成回复:"
|
||
)
|
||
|
||
# 调用DIFY API
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Bearer {self.dify_api_key}"
|
||
}
|
||
|
||
payload = {
|
||
"inputs": {},
|
||
"query": prompt,
|
||
"response_mode": "blocking",
|
||
"user": "ai_auto_response"
|
||
}
|
||
|
||
response = requests.post(self.dify_api_url, headers=headers, json=payload)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
return result.get("answer", "")
|
||
else:
|
||
self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}")
|
||
return ""
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"生成回复出错: {e}")
|
||
return ""
|