refactor ai_auto_response into xiaoniu group bot

This commit is contained in:
liuwei
2026-04-07 11:15:29 +08:00
parent 1c052a7d16
commit d616846098
13 changed files with 2150 additions and 445 deletions

View File

@@ -1,49 +1,56 @@
from __future__ import annotations
import re
import time
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Tuple
from loguru import logger
import os
import requests
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import MessageType
import xml.etree.ElementTree as ET
from .bot_ai import InterventionBot
from .context_builder import ContextBuilder
from .flow_manager import FlowManager
from .llm_client import LLMClient
from .memory_store import MemoryStore
from .persona_engine import PersonaEngine
from .response_planner import ResponsePlanner
from .triggers import TriggerRouter
from .vector_memory import VectorMemoryStore
class AIAutoResponsePlugin(MessagePluginInterface):
"""AI自动对话插件"""
# 功能权限常量
FEATURE_KEY = "AI_AUTO_RESPONSE"
FEATURE_DESCRIPTION = "🤖 AI自动对话功能 [自动对话]"
FEATURE_DESCRIPTION = "🐮 小牛拟人群聊BOT [群聊拟真、及时答疑、长期记忆]"
@property
def name(self) -> str:
return "AI自动对话"
return "小牛群聊BOT"
@property
def version(self) -> str:
return "1.0.0"
return "2.0.0"
@property
def description(self) -> str:
return "提供AI自动对话功能可以在群聊中自动介入对话"
return "拟人化群聊BOT支持心流、长期记忆和回归成员识别"
@property
def author(self) -> str:
return "liu.wei"
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return "" # 不需要前缀,直接匹配命令
return None
@property
def commands(self) -> List[str]:
return self._commands
return []
@property
def feature_key(self) -> Optional[str]:
@@ -55,207 +62,452 @@ class AIAutoResponsePlugin(MessagePluginInterface):
def __init__(self):
super().__init__()
self.intervention_bot = None
self.group_messages = {} # 存储每个群的最近消息
self.max_messages = 100 # 每个群最多存储的消息数量
# 注册功能权限
self.feature = self.register_feature()
# DIFY API配置
self.dify_api_url = ""
self.dify_api_key = "" # 需要在配置文件中设置
self.group_messages: Dict[str, List[Dict]] = {}
self.enable = True
self.last_reply_at: Dict[str, float] = {}
def initialize(self, context: Dict[str, Any]) -> bool:
"""初始化插件"""
self.LOG = logger
self.LOG.debug(f"正在初始化 {self.name} 插件...")
# 保存上下文对象
self.event_system = context.get("event_system")
# 加载配置
config_path = os.path.join(os.path.dirname(__file__), "config.toml")
self.enable = self._config.get("enable", True)
# 从配置中获取DIFY API密钥
self.dify_api_key = self._config.get("dify_api_key", "")
self.dify_api_url = self._config.get("dify_api_url", "")
# 初始化介入机器人
self.intervention_bot = InterventionBot(config_path)
self.db_manager = context.get("db_manager")
self.enable = bool(self._config.get("enable", True))
self.persona_engine = PersonaEngine(self.get_plugin_path(), self._config.get("persona", {}))
self.flow_manager = FlowManager({
**(self._config.get("flow", {}) or {}),
"night_silent_hours": (self._config.get("cooldown", {}) or {}).get("night_silent_hours", []),
})
merged_trigger_config = dict(self._config.get("priority", {}) or {})
merged_trigger_config.update(self._config.get("topics", {}) or {})
self.trigger_router = TriggerRouter(merged_trigger_config)
merged_memory_config = dict(self._config.get("mode", {}) or {})
merged_memory_config.update(self._config.get("memory", {}) or {})
self.memory_store = MemoryStore(self.db_manager, merged_memory_config)
self.vector_memory = VectorMemoryStore(self._config.get("memory", {}) or {})
self.context_builder = ContextBuilder()
self.response_planner = ResponsePlanner()
self.llm_client = LLMClient(self._config.get("api", {}) or {})
self.filters = self._config.get("filters", {}) or {}
self.mode_config = self._config.get("mode", {}) or {}
self.cooldown_config = self._config.get("cooldown", {}) or {}
self._synced_member_context_versions: Dict[str, str] = {}
self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True))
self.LOG.debug(f"[{self.name}] 初始化完成")
return True
def start(self) -> bool:
"""启动插件"""
self.LOG.debug(f"[{self.name}] 插件已启动")
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
"""停止插件"""
self.LOG.info(f"[{self.name}] 插件已停止")
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""检查是否可以处理该消息"""
if not self.enable:
return False
content = str(message.get("content", "")).strip()
roomid = message.get("roomid", "")
if GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
room_id = message.get("roomid", "")
if not room_id:
return False
if GroupBotManager.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED:
return False
# 如果是群消息,且该群启用了自动回复,则处理
if roomid:
self.LOG.debug(f"[{roomid}] 进入AI自动回复逻辑")
# 存储消息
if roomid not in self.group_messages:
self.group_messages[roomid] = []
msg_type = message.get("type")
# 获取发送者昵称
sender_id = message.get("sender", "")
try:
members = ContactManager.get_instance().get_group_members(roomid)
sender_name = members.get(sender_id, sender_id)
except Exception:
sender_name = sender_id
msg_type = message.get("type")
if msg_type not in (MessageType.TEXT, MessageType.APP):
return False
# 仅追加文本(1)与应用消息(49)并对49提取标题
content_to_store = None
try:
if msg_type == MessageType.TEXT:
content_to_store = content
elif msg_type == MessageType.APP:
try:
root = ET.fromstring(content)
title_elem = root.find('.//title')
if title_elem is not None and title_elem.text:
content_to_store = title_elem.text
else:
content_to_store = "[应用消息]"
except Exception as e:
self.LOG.error(f"解析消息类型49出错: {e}")
content_to_store = "[应用消息]"
except Exception as e:
self.LOG.error(f"处理消息类型出错: {e}")
content_to_store = None
full_msg = message.get("full_wx_msg")
if full_msg and full_msg.from_self():
return False
if content_to_store is not None:
# 添加新消息
current_message = {
"timestamp": message.get("timestamp", ""),
"message": content_to_store,
"sender": sender_id,
"sender_name": sender_name
}
# 添加新消息
if content_to_store is not None:
self.group_messages[roomid].append(current_message)
# 限制消息数量
if len(self.group_messages[roomid]) > self.max_messages:
self.group_messages[roomid] = self.group_messages[roomid][-self.max_messages:]
# 判断是否需要介入
messages = [msg["message"] for msg in self.group_messages[roomid]]
timestamp = message.get("timestamp", "")
# 传递完整的聊天记录给should_intervene方法
can = self.intervention_bot.should_intervene(roomid, timestamp, content, messages, self.group_messages[roomid])
if can:
self.LOG.debug(f"[{roomid}] 触发自动回复规则,准备生成回复")
return True
else:
self.LOG.debug(f"[{roomid}] 跳过聊天")
return False
return False
content = self._normalize_content(message)
if not content:
return False
if self._should_ignore(content):
return False
return True
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""处理消息"""
content = str(message.get("content", "")).strip()
self.LOG.debug(f"插件执行: {self.name}{content}")
sender = message.get("sender")
roomid = message.get("roomid", "")
room_id = message.get("roomid", "")
sender = message.get("sender", "")
bot: WechatAPIClient = message.get("bot")
# 检查权限
if roomid and GroupBotManager.get_group_permission(roomid, self.feature) == PermissionStatus.DISABLED:
return False, "没有权限"
# 处理自动回复
try:
# 获取最近的消息 (完整的消息对象)
chat_history = self.group_messages[roomid]
timestamp = message.get("timestamp", "")
content = self._normalize_content(message)
sender_name = self._get_sender_name(room_id, sender)
self._log_event(
"recv",
room_id=room_id,
sender=sender,
sender_name=sender_name,
is_at=message.get("is_at", False),
content_preview=self._preview(content),
msg_type=str(message.get("type")),
)
# 记录触发原因
if self.intervention_bot.rule_high_reply_rate(timestamp, self.group_messages[roomid]):
self.LOG.info(f"[{roomid}] 触发高频率回复规则,准备生成回复")
normalized_message = {
"sender": sender,
"sender_name": sender_name,
"content": content,
"timestamp": message.get("timestamp"),
}
self._append_group_message(room_id, normalized_message)
# 生成回复
response = self._generate_response_with_dify(content, chat_history)
if response:
# 发送回复
await bot.send_text_message(roomid, response, sender)
return False, "自动回复成功"
else:
return False, "生成回复失败"
memory_hints = self.memory_store.build_memory_hints(room_id, sender)
self._sync_member_memory(room_id, sender, sender_name, memory_hints.get("member_context", {}))
self._log_event(
"memory",
room_id=room_id,
sender=sender,
returning_state=memory_hints.get("returning_member_state", "") or "none",
has_member_context=bool(memory_hints.get("member_context")),
is_followup=memory_hints.get("is_followup", False),
last_active_at=memory_hints.get("last_active_at", "") or "",
)
trigger = self.trigger_router.route(message | {"content": content}, memory_hints)
flow_state = self.flow_manager.apply_message_event(room_id, {
"is_at": message.get("is_at", False),
"is_question": trigger.is_question,
"is_followup": trigger.is_followup,
"topic_hit": bool(trigger.topic),
"topic": trigger.topic,
"is_returning_member": trigger.is_returning_member,
"message_after_bot": True,
})
self._log_event(
"decision",
room_id=room_id,
sender=sender,
trigger_type=trigger.trigger_type,
priority=trigger.priority,
reasons="|".join(trigger.reasons),
flow_state=flow_state.state,
flow_score=round(flow_state.score, 2),
topic=trigger.topic or "",
)
except Exception as e:
self.LOG.error(f"处理AI自动对话出错: {e}")
return False, f"处理出错: {e}"
def _generate_response_with_dify(self, current_message: str, chat_history: List[Dict[str, Any]]) -> str:
"""使用DIFY API生成自动回复内容"""
try:
# 构建上下文消息
# 取更多上下文以帮助理解语境
recent_msgs = chat_history[-10:] if len(chat_history) > 10 else chat_history
context_str_list = []
for msg in recent_msgs:
# 优先使用昵称如果没有则使用sender ID
sender = msg.get("sender_name") or msg.get("sender", "Unknown")
content = msg.get("message", "")
context_str_list.append(f"{sender}: {content}")
context = "\n".join(context_str_list)
# 构建提示词 - 增强拟人化指令
prompt = (
f"当前群聊上下文(格式为 '发言人: 内容',最后一句是最新消息):\n{context}\n\n"
f"指令:\n"
f"1. 参考上下文。\n"
f"2. 保持简短1-2句话口语化不要长篇大论。\n"
f"3. 不要重复之前的回复。\n"
f"4. 如果最后一句不是对你说的,且你觉得没必要强行接话,可以回个表情或简短的语气词,或者委婉结束话题。\n"
f"请生成回复:"
allow_proactive = bool(self.mode_config.get("allow_proactive_reply", True))
reply_mode = self.response_planner.choose_reply_mode(trigger.__dict__, flow_state.state)
should_reply = self.response_planner.should_reply(trigger.__dict__, flow_state.state, allow_proactive)
if not should_reply:
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="planner_skip",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
flow_state=flow_state.state,
)
return False, "skip"
if not self._pass_cooldown(room_id, trigger.__dict__):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="cooldown",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
)
return False, "cooldown"
# 调用DIFY API
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.dify_api_key}"
}
recent_messages = self.group_messages.get(room_id) or self.memory_store.get_recent_messages(room_id)
vector_memories = []
if self.vector_memory.should_search(reply_mode, trigger.trigger_type, memory_hints.get("returning_member_state", "")):
vector_memories = self.vector_memory.search(content, room_id, sender)
self._log_event(
"context",
room_id=room_id,
sender=sender,
reply_mode=reply_mode,
recent_message_count=len(recent_messages),
vector_hit_count=len(vector_memories),
)
payload = {
"inputs": {},
"query": prompt,
"response_mode": "blocking",
"user": "ai_auto_response"
}
context = self.context_builder.build(
room_id=room_id,
sender=sender,
sender_name=sender_name,
content=content,
recent_messages=recent_messages,
member_context=memory_hints.get("member_context", {}),
trigger=trigger.__dict__,
flow_state=flow_state.state,
reply_mode=reply_mode,
vector_memories=vector_memories,
)
response = requests.post(self.dify_api_url, headers=headers, json=payload)
system_prompt = self.persona_engine.build_system_prompt()
user_prompt = self._build_user_prompt(context, memory_hints)
response = self._sanitize_response(self.llm_client.chat(system_prompt, user_prompt, user_id=f"{room_id}:{sender}"))
if not response:
self._log_event(
"model_empty",
room_id=room_id,
sender=sender,
model=self.llm_client.model,
last_error=self.llm_client.last_error,
reply_mode=reply_mode,
)
return False, "empty_response"
if response.status_code == 200:
result = response.json()
return result.get("answer", "")
else:
self.LOG.error(f"DIFY API调用失败: {response.status_code} - {response.text}")
return ""
await bot.send_text_message(room_id, response, sender)
self.last_reply_at[room_id] = time.time()
self.flow_manager.note_bot_reply(room_id)
self.memory_store.note_bot_reply(room_id, sender, trigger.topic)
self._upsert_interaction_memory(room_id, sender, sender_name, content, response, trigger.trigger_type, trigger.topic)
self._log_event(
"sent",
room_id=room_id,
sender=sender,
sender_name=sender_name,
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
response_preview=self._preview(response),
response_len=len(response),
)
return False, "replied"
except Exception as e:
self.LOG.error(f"生成回复出错: {e}")
def _append_group_message(self, room_id: str, message: Dict) -> None:
items = self.group_messages.setdefault(room_id, [])
items.append(message)
size = int(self.mode_config.get("recent_context_size", 30))
if len(items) > size:
self.group_messages[room_id] = items[-size:]
def _normalize_content(self, message: Dict[str, Any]) -> str:
msg_type = message.get("type")
content = str(message.get("content", "")).strip()
if msg_type == MessageType.TEXT:
return self._strip_at_prefix(content)
if msg_type == MessageType.APP:
try:
root = ET.fromstring(content)
title = root.find(".//title")
return (title.text or "").strip() if title is not None else "[应用消息]"
except Exception:
return "[应用消息]"
return content
@staticmethod
def _strip_at_prefix(content: str) -> str:
return re.sub(r"@.*?[\u2005\s]+", "", content).strip()
def _should_ignore(self, content: str) -> bool:
if len(content) < int(self.filters.get("min_text_length", 1)):
return True
if content in set(self.filters.get("ignore_exact", [])):
return True
return any(content.startswith(prefix) for prefix in self.filters.get("ignore_prefixes", []))
def _get_sender_name(self, room_id: str, sender: str) -> str:
try:
members = ContactManager.get_instance().get_group_members(room_id)
return members.get(sender, sender)
except Exception:
return sender
def _pass_cooldown(self, room_id: str, trigger: Dict) -> bool:
current_ts = time.time()
room_cd = int(self.cooldown_config.get("group_reply_cooldown_sec", 45))
user_cd = int(self.cooldown_config.get("same_user_followup_cooldown_sec", 10))
last_room_reply = self.last_reply_at.get(room_id, 0.0)
if trigger.get("is_question") or trigger.get("is_followup") or trigger.get("trigger_type") == "at_trigger":
return (current_ts - last_room_reply) >= user_cd
return (current_ts - last_room_reply) >= room_cd
def _build_user_prompt(self, context: Dict, memory_hints: Dict) -> str:
recent_text = "\n".join(context.get("recent_messages", [])[-20:]) or "暂无"
reply_mode = context.get("reply_mode", "social_short")
length_rule = self._build_length_rule(reply_mode)
return (
f"当前群聊消息:\n{recent_text}\n\n"
f"当前发言:{context.get('current_message', '')}\n"
f"触发类型:{context.get('trigger_type', 'none')}\n"
f"回复模式:{context.get('reply_mode', 'social_short')}\n"
f"当前心流状态:{context.get('flow_state', 'idle')}\n"
f"成员稳定记忆:\n{context.get('memory_prompt', '暂无')}\n\n"
f"向量召回记忆:\n{context.get('vector_memory_prompt', '') or '暂无'}\n\n"
f"补充信息:回归状态={memory_hints.get('returning_member_state', '') or 'none'}\n"
f"要求:\n"
f"1. 如果是明确问题,先给清楚答案。\n"
f"2. 如果只是轻量接话,保持自然短句。\n"
f"3. 不要暴露系统记忆来源。\n"
f"4. 如果信息不足,不要硬编。\n"
f"5. 输出最终可直接发到群里的内容,不要解释你的思路。\n"
f"6. {length_rule}\n"
)
@staticmethod
def _sanitize_response(response: str) -> str:
if not response:
return ""
response = response.strip()
response = re.sub(r"\n{3,}", "\n\n", response)
return response[:500].strip()
@staticmethod
def _build_length_rule(reply_mode: str) -> str:
if reply_mode == "social_short":
return "默认只回一句短话最好控制在2到12个字除非非常不自然。"
if reply_mode == "qa_fast":
return "尽量只回1句话必要时最多2句先给结论不要展开成长教程。"
if reply_mode == "qa_with_context":
return "优先控制在1到2句除非对方明显在等详细步骤。"
return "尽量短,像群友临时接一句,不要长篇大论。"
def _sync_member_memory(self, room_id: str, sender: str, sender_name: str, member_context: Dict) -> None:
if not member_context:
return
version = str(member_context.get("last_profiled_at", ""))
cache_key = f"{room_id}:{sender}"
if version and self._synced_member_context_versions.get(cache_key) == version:
return
text = self.context_builder._build_member_memory_prompt(member_context)
if not text or text == "暂无稳定成员画像。":
return
payload = {
"chatroom_id": room_id,
"wxid": sender,
"display_name": sender_name,
"memory_type": "member_context_snapshot",
"source_id": cache_key,
"last_active_at": member_context.get("last_profiled_at", ""),
"topic_tags": member_context.get("topics_of_interest", [])[:5],
"summary_text": member_context.get("summary_text", ""),
}
ok = self.vector_memory.upsert_memory(f"member_context:{cache_key}:{version}", text, payload)
self._log_event(
"memory_upsert",
room_id=room_id,
sender=sender,
memory_type="member_context_snapshot",
ok=ok,
)
if ok and version:
self._synced_member_context_versions[cache_key] = version
def _upsert_interaction_memory(
self,
room_id: str,
sender: str,
sender_name: str,
content: str,
response: str,
trigger_type: str,
topic: str,
) -> None:
text = f"{sender_name}说:{content}\n小牛回复:{response}"
payload = {
"chatroom_id": room_id,
"wxid": sender,
"display_name": sender_name,
"memory_type": "interaction_memory",
"topic_tags": [item for item in [topic, trigger_type] if item],
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"source_id": f"{room_id}:{sender}:{int(time.time())}",
"summary_text": text[:500],
}
ok = self.vector_memory.upsert_memory(payload["source_id"], text, payload)
self._log_event(
"memory_upsert",
room_id=room_id,
sender=sender,
memory_type="interaction_memory",
ok=ok,
trigger_type=trigger_type,
)
def _log_event(self, event: str, **kwargs: Any) -> None:
if not self.log_debug:
return
summary = self._build_log_summary(event, kwargs)
self.LOG.info(summary)
@staticmethod
def _preview(text: str, limit: int = 80) -> str:
text = (text or "").replace("\n", "\\n").strip()
if len(text) <= limit:
return text
return text[: limit - 3] + "..."
def _build_log_summary(self, event: str, data: Dict[str, Any]) -> str:
room = self._short_id(data.get("room_id", ""))
sender_name = data.get("sender_name", "") or self._short_id(data.get("sender", ""))
sender = self._short_id(data.get("sender", ""))
if event == "recv":
return (
f"[XIAONIU] RECV room={room} user={sender_name}/{sender} "
f"at={self._yn(data.get('is_at'))} msg={data.get('content_preview', '')}"
).strip()
if event == "memory":
return (
f"[XIAONIU] MEMORY room={room} user={sender} "
f"ctx={self._yn(data.get('has_member_context'))} "
f"follow={self._yn(data.get('is_followup'))} "
f"return={data.get('returning_state', 'none')}"
).strip()
if event == "decision":
return (
f"[XIAONIU] DECIDE room={room} user={sender} "
f"trigger={data.get('trigger_type', 'none')} "
f"flow={data.get('flow_state', '')}:{data.get('flow_score', '')} "
f"topic={data.get('topic', '-') or '-'} "
f"reasons={data.get('reasons', '-') or '-'}"
).strip()
if event == "skip":
return (
f"[XIAONIU] SKIP room={room} user={sender} "
f"reason={data.get('reason', '')} "
f"trigger={data.get('trigger_type', 'none')} "
f"mode={data.get('reply_mode', '')}"
).strip()
if event == "context":
return (
f"[XIAONIU] CTX room={room} user={sender} "
f"mode={data.get('reply_mode', '')} "
f"recent={data.get('recent_message_count', 0)} "
f"vector={data.get('vector_hit_count', 0)}"
).strip()
if event == "model_empty":
return (
f"[XIAONIU] MODEL_EMPTY room={room} user={sender} "
f"model={data.get('model', '')} "
f"mode={data.get('reply_mode', '')} "
f"err={data.get('last_error', '')}"
).strip()
if event == "sent":
return (
f"[XIAONIU] SENT room={room} user={sender_name}/{sender} "
f"trigger={data.get('trigger_type', 'none')} "
f"mode={data.get('reply_mode', '')} "
f"len={data.get('response_len', 0)} "
f"reply={data.get('response_preview', '')}"
).strip()
if event == "memory_upsert":
return (
f"[XIAONIU] MEM_UPSERT room={room} user={sender} "
f"type={data.get('memory_type', '')} "
f"ok={self._yn(data.get('ok'))} "
f"trigger={data.get('trigger_type', '-') or '-'}"
).strip()
compact = " ".join(f"{key}={data[key]}" for key in sorted(data) if data.get(key) not in (None, ""))
return f"[XIAONIU] {event.upper()} {compact}".strip()
@staticmethod
def _yn(value: Any) -> str:
return "Y" if bool(value) else "N"
@staticmethod
def _short_id(value: str) -> str:
value = str(value or "")
if len(value) <= 10:
return value
return value[:4] + "..." + value[-4:]