from loguru import logger import os import requests from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from wechat_ipad import WechatAPIClient from .bot_ai import InterventionBot class AIAutoResponsePlugin(MessagePluginInterface): """AI自动对话插件""" # 功能权限常量 FEATURE_KEY = "AI_AUTO_RESPONSE" FEATURE_DESCRIPTION = "🤖 AI自动对话功能 [自动对话]" @property def name(self) -> str: return "AI自动对话" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "提供AI自动对话功能,可以在群聊中自动介入对话" @property def author(self) -> str: return "liu.wei" @property def command_prefix(self) -> Optional[str]: return "" # 不需要前缀,直接匹配命令 @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() self.intervention_bot = None self.group_messages = {} # 存储每个群的最近消息 self.max_messages = 100 # 每个群最多存储的消息数量 # 注册功能权限 self.feature = self.register_feature() # DIFY API配置 self.dify_api_url = "" self.dify_api_key = "" # 需要在配置文件中设置 def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件""" self.LOG = logger self.LOG.debug(f"正在初始化 {self.name} 插件...") # 保存上下文对象 self.event_system = context.get("event_system") # 加载配置 config_path = os.path.join(os.path.dirname(__file__), "config.toml") self.enable = self._config.get("enable", True) # 从配置中获取DIFY API密钥 self.dify_api_key = self._config.get("dify_api_key", "") self.dify_api_url = self._config.get("dify_api_url", "") # 初始化介入机器人 self.intervention_bot = InterventionBot(config_path) return True def start(self) -> bool: """启动插件""" self.LOG.debug(f"[{self.name}] 插件已启动") self.status = PluginStatus.RUNNING return True def stop(self) -> bool: """停止插件""" self.LOG.info(f"[{self.name}] 插件已停止") self.status = PluginStatus.STOPPED return True def can_process(self, message: Dict[str, Any]) -> bool: """检查是否可以处理该消息""" if not self.enable: return False content = str(message.get("content", "")).strip() roomid = message.get("roomid", "") if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False # 如果是群消息,且该群启用了自动回复,则处理 if roomid: self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑") # 存储消息 if roomid not in self.group_messages: self.group_messages[roomid] = [] # 添加新消息 current_message = { "timestamp": message.get("timestamp", ""), "message": content, "sender": message.get("sender", "") } # 添加新消息 self.group_messages[roomid].append(current_message) # 限制消息数量 if len(self.group_messages[roomid]) > self.max_messages: self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:] # 判断是否需要介入 messages = [msg["message"] for msg in self.group_messages[roomid]] timestamp = message.get("timestamp", "") # 传递完整的聊天记录给should_intervene方法 can = self.intervention_bot.should_intervene(roomid, timestamp, content, messages, self.group_messages[roomid]) if can: self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复") return True else: self.LOG.debug(f"[{roomid}] 跳过聊天") return False return False async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理消息""" content = str(message.get("content", "")).strip() self.LOG.debug(f"插件执行: {self.name}:{content}") sender = message.get("sender") roomid = message.get("roomid", "") bot: WechatAPIClient = message.get("bot") # 检查权限 if roomid and GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED: return False, "没有权限" # 处理自动回复 try: # 获取最近的消息 (完整的消息对象) chat_history = self.group_messages[roomid] timestamp = message.get("timestamp", "") # 记录触发原因 if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]): self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复") # 生成回复 response = self._generate_response_with_dify(content, chat_history) if response: # 发送回复 await bot.send_text_message(roomid, response, sender) return False, "自动回复成功" else: return False, "生成回复失败" except Exception as e: self.LOG.error(f"处理AI自动对话出错: {e}") return False, f"处理出错: {e}" def _generate_response_with_dify(self, current_message: str, chat_history: List[Dict[str, Any]]) -> str: """使用DIFY API生成自动回复内容""" try: # 构建上下文消息 # 取更多上下文以帮助理解语境 recent_msgs = chat_history[-10:] if len(chat_history) > 10 else chat_history context_str_list = [] for msg in recent_msgs: sender = msg.get("sender", "Unknown") content = msg.get("message", "") context_str_list.append(f"{sender}: {content}") context = "\n".join(context_str_list) # 构建提示词 - 增强拟人化指令 prompt = ( f"当前群聊上下文(格式为 '发言人: 内容',最后一句是最新消息):\n{context}\n\n" f"指令:\n" f"1. 参考上下文。\n" f"2. 保持简短(1-2句话),口语化,不要长篇大论。\n" f"3. 不要重复之前的回复。\n" f"4. 如果最后一句不是对你说的,且你觉得没必要强行接话,可以回个表情或简短的语气词,或者委婉结束话题。\n" f"请生成回复:" ) # 调用DIFY API headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.dify_api_key}" } payload = { "inputs": {}, "query": prompt, "response_mode": "blocking", "user": "ai_auto_response" } response = requests.post(self.dify_api_url, headers=headers, json=payload) if response.status_code == 200: result = response.json() return result.get("answer", "") else: self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}") return "" except Exception as e: self.LOG.error(f"生成回复出错: {e}") return ""