Files
abot/plugins/ai_auto_response/main.py
2026-04-10 09:04:22 +08:00

638 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import time
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Tuple
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from utils.wechat.contact_manager import ContactManager
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import MessageType
from .context.context_builder import ContextBuilder
from .context.image_context import (
build_image_safety_hints,
build_local_image_data_url,
build_recent_image_context,
prepare_quote_image_inputs,
)
from .context.quote_context import parse_quote_context
from .core.llm_client import LLMClient
from .memory.memory_store import MemoryStore
from .memory.vector_memory import VectorMemoryStore
from .profile.persona_engine import PersonaEngine
from .runtime.flow_manager import FlowManager
from .runtime.cooldown import CooldownManager
from .runtime.logging import build_log_summary, yn
from .memory.group_memory import GroupMemoryCoordinator
from .memory.group_memory_profile import GroupMemoryService
from .memory.group_facts import GroupFactsService
from .memory.memory_ranker import MemoryRanker
from .memory.social_memory import SocialMemoryService
from .profile.group_profile import GroupProfileResolver
from .context.conversation_hints import build_conversation_hints
from .core.decision_flow import DecisionFlow
from .core.triggers import TriggerRouter
from .core.llm_result_parser import LLMResultParser
from .core.prompt_builder import build_user_prompt
from .core.reply_formatter import finalize_reply, preview_text
from .safety.dedup import DedupManager
from .safety.filters import (
is_coding_work_request,
is_prompt_attack,
is_targeting_other_user,
should_ignore,
strip_at_prefix,
)
class AIAutoResponsePlugin(MessagePluginInterface):
FEATURE_KEY = "AI_AUTO_RESPONSE"
FEATURE_DESCRIPTION = "🐮 小牛拟人群聊BOT [群聊拟真、及时答疑、长期记忆]"
@property
def name(self) -> str:
return "小牛群聊BOT"
@property
def version(self) -> str:
return "2.0.0"
@property
def description(self) -> str:
return "拟人化群聊BOT支持心流、长期记忆和回归成员识别"
@property
def author(self) -> str:
return "ABOT Team"
@property
def command_prefix(self) -> Optional[str]:
return None
@property
def commands(self) -> List[str]:
return []
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
self.group_messages: Dict[str, List[Dict]] = {}
self.enable = True
self.dedup = DedupManager()
def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger
self.db_manager = context.get("db_manager")
self.enable = bool(self._config.get("enable", True))
self.persona_engine = PersonaEngine(self.get_plugin_path(), self._config.get("persona", {}))
self.group_memory_service = GroupMemoryService(self.db_manager, self._config.get("group_profiles", {}) or {})
self.group_profile_resolver = GroupProfileResolver(self._config.get("group_profiles", {}) or {})
self.flow_manager = FlowManager({
**(self._config.get("flow", {}) or {}),
"night_silent_hours": (self._config.get("cooldown", {}) or {}).get("night_silent_hours", []),
})
merged_trigger_config = dict(self._config.get("priority", {}) or {})
merged_trigger_config.update(self._config.get("topics", {}) or {})
self.trigger_router = TriggerRouter(merged_trigger_config)
merged_memory_config = dict(self._config.get("mode", {}) or {})
merged_memory_config.update(self._config.get("memory", {}) or {})
self.memory_store = MemoryStore(self.db_manager, merged_memory_config)
self.vector_memory = VectorMemoryStore(self._config.get("memory", {}) or {})
self.context_builder = ContextBuilder(int((self._config.get("mode", {}) or {}).get("recent_context_size", 30)))
self.decision_flow = DecisionFlow()
self.llm_client = LLMClient(self._config.get("api", {}) or {})
self.social_memory = SocialMemoryService(self.db_manager, self._config.get("memory", {}) or {})
self.group_facts = GroupFactsService(self._config.get("memory", {}) or {})
self.memory_ranker = MemoryRanker(self._config.get("memory", {}) or {})
self.group_memory = GroupMemoryCoordinator(
group_memory_service=self.group_memory_service,
group_profile_resolver=self.group_profile_resolver,
social_memory_service=self.social_memory,
group_facts_service=self.group_facts,
vector_memory=self.vector_memory,
memory_config=self._config.get("memory", {}) or {},
)
self.filters = self._config.get("filters", {}) or {}
self.mode_config = self._config.get("mode", {}) or {}
self.cooldown_config = self._config.get("cooldown", {}) or {}
self.cooldown = CooldownManager(self.cooldown_config)
self.image_config = self._config.get("image", {}) or {}
self._synced_member_context_versions: Dict[str, str] = {}
self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True))
self.LOG.debug(f"[{self.name}] 初始化完成")
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
return True
def can_process(self, message: Dict[str, Any]) -> bool:
if not self.enable:
return False
room_id = message.get("roomid", "")
if not room_id:
return False
if GroupBotManager.get_group_permission(room_id, self.feature) == PermissionStatus.DISABLED:
return False
msg_type = message.get("type")
if msg_type not in (MessageType.TEXT, MessageType.APP):
return False
full_msg = message.get("full_wx_msg")
if full_msg and full_msg.from_self():
return False
content = self._normalize_content(message)
if not content:
return False
if should_ignore(content, self.filters):
return False
if is_targeting_other_user(message):
return False
return True
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
room_id = message.get("roomid", "")
sender = message.get("sender", "")
bot: WechatAPIClient = message.get("bot")
is_at = bool(message.get("is_at", False))
content = self._normalize_content(message)
message_key = self._build_message_key(message, content)
dedup_expiry = int(self.cooldown_config.get("message_dedup_window_sec", 180))
if not self.dedup.begin_message_processing(message_key, dedup_expiry):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="duplicate_message",
message_key=message_key,
)
return False, "duplicate_message"
try:
if is_prompt_attack(content):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="prompt_attack_ignore",
trigger_type="prompt_attack_block",
reply_mode="defense",
)
return False, "ignored_prompt_attack"
coding_work_request = is_coding_work_request(content)
if coding_work_request and not is_at:
return False, "skip_coding_work"
quote_context = parse_quote_context(message.get("full_wx_msg"), room_id, self._get_sender_name)
sender_name = self._get_sender_name(room_id, sender)
group_name = self._get_group_name(room_id, message)
normalized_message = {
"sender": sender,
"sender_name": sender_name,
"content": content,
"is_at": is_at,
"timestamp": message.get("timestamp"),
}
self._append_group_message(room_id, normalized_message)
recent_messages = self.group_messages.get(room_id) or self.memory_store.get_recent_messages(room_id)
group_name_map = self._build_group_name_map(room_id)
group_memory_bundle = self.group_memory.build(
room_id=room_id,
group_name=group_name,
sender=sender,
current_content=content,
recent_messages=recent_messages,
name_map=group_name_map,
)
group_profile = group_memory_bundle.get("group_profile", {}) or {}
social_context = group_memory_bundle.get("social_context", {}) or {"items": [], "prompt": ""}
group_facts = group_memory_bundle.get("group_facts", {}) or {"items": [], "prompt": ""}
self._log_event(
"recv",
room_id=room_id,
sender=sender,
sender_name=sender_name,
group_mode=group_profile.get("mode", ""),
knowledge_domain=group_profile.get("knowledge_domain", ""),
memory_domain=group_profile.get("group_memory_domain", ""),
humor_style=group_profile.get("humor_style", ""),
sharpness_style=group_profile.get("sharpness_style", ""),
is_at=is_at,
content_preview=preview_text(content),
quote_type=quote_context.get("quote_type_label", ""),
msg_type=str(message.get("type")),
message_key=message_key,
coding_work=yn(coding_work_request),
)
conversation_hints = build_conversation_hints(
recent_messages,
sender,
content,
quote_context,
self.persona_engine.config.get("name", "小牛"),
)
memory_hints = self.memory_store.build_memory_hints(room_id, sender)
self._sync_member_memory(room_id, sender, sender_name, memory_hints.get("member_context", {}))
self.group_memory.sync_snapshots(
room_id=room_id,
social_context=social_context,
group_facts=group_facts,
log_event=self._log_event,
)
self._log_event(
"memory",
room_id=room_id,
sender=sender,
returning_state=memory_hints.get("returning_member_state", "") or "none",
has_member_context=bool(memory_hints.get("member_context")),
is_followup=memory_hints.get("is_followup", False),
last_active_at=memory_hints.get("last_active_at", "") or "",
social_links=len(social_context.get("items", [])),
group_facts=len(group_facts.get("items", [])),
)
trigger = self.trigger_router.route(message | {"content": content}, memory_hints, conversation_hints)
flow_state = self.flow_manager.apply_message_event(room_id, {
"is_at": is_at,
"is_question": trigger.is_question,
"is_followup": trigger.is_followup,
"topic_hit": bool(trigger.topic),
"topic": trigger.topic,
"is_returning_member": trigger.is_returning_member,
"message_after_bot": True,
})
self._log_event(
"decision",
room_id=room_id,
sender=sender,
trigger_type=trigger.trigger_type,
priority=trigger.priority,
reasons="|".join(trigger.reasons),
directed=yn(trigger.is_directed),
flow_state=flow_state.state,
flow_score=round(flow_state.score, 2),
topic=trigger.topic or "",
)
allow_proactive = bool(self.mode_config.get("allow_proactive_reply", True))
acceptance_state = self.flow_manager.get_acceptance_state(room_id)
decision = self.decision_flow.prepare(
trigger.__dict__,
flow_state.state,
allow_proactive,
acceptance_state,
conversation_hints,
)
reply_mode = str(decision.get("reply_mode", "social_short") or "social_short")
should_reply = bool(decision.get("should_consider_model"))
if not should_reply:
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="planner_skip",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=trigger.topic or "",
flow_state=flow_state.state,
acceptance_state=acceptance_state,
solver=yn(conversation_hints.get("has_recent_human_solver")),
)
return False, "skip"
if not self.cooldown.pass_cooldown(room_id, sender, trigger.__dict__):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason=trigger.__dict__.get("_cooldown_reason", "cooldown"),
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=trigger.topic or "",
)
return False, "cooldown"
vector_memories = []
if self.vector_memory.should_search(reply_mode, trigger.trigger_type, memory_hints.get("returning_member_state", "")):
vector_memories = self.vector_memory.search(content, room_id, sender)
ranked_memory = self.memory_ranker.rank(
content=content,
quote_context=quote_context,
group_profile=group_profile,
member_context=memory_hints.get("member_context", {}) or {},
vector_memories=vector_memories,
social_context=social_context,
group_facts=group_facts,
trigger=trigger.__dict__,
)
vector_memories = ranked_memory.get("vector_memories", []) or []
social_context = ranked_memory.get("social_context", social_context) or {"items": [], "prompt": ""}
group_facts = ranked_memory.get("group_facts", group_facts) or {"items": [], "prompt": ""}
member_memory_focus = ranked_memory.get("member_memory_focus", []) or []
memory_rank_summary = self.group_memory.build_debug_summary(ranked_memory.get("debug", {}))
image_context = build_recent_image_context(
message=message,
room_id=room_id,
content=content,
quote_context=quote_context,
get_latest_image_message=self.memory_store.get_latest_image_message,
get_sender_name=self._get_sender_name,
image_config=self.image_config,
)
image_urls = await prepare_quote_image_inputs(
bot=bot,
quote_context=quote_context,
log_event=self._log_event,
)
if not image_urls and image_context:
recent_image_url = build_local_image_data_url(
str(image_context.get("image_path", "") or ""),
self.get_main_path(),
)
if recent_image_url:
image_urls = [recent_image_url]
image_safety = build_image_safety_hints(
message=message,
content=content,
quote_context=quote_context,
image_context=image_context,
image_urls=image_urls,
get_latest_image_message=self.memory_store.get_latest_image_message,
image_config=self.image_config,
)
self._log_event(
"context",
room_id=room_id,
sender=sender,
group_mode=group_profile.get("mode", ""),
knowledge_domain=group_profile.get("knowledge_domain", ""),
acceptance_state=acceptance_state,
reply_mode=reply_mode,
recent_message_count=len(recent_messages),
vector_hit_count=len(vector_memories),
member_focus_count=len(member_memory_focus),
social_hit_count=len((social_context or {}).get("items", []) or []),
group_fact_hit_count=len((group_facts or {}).get("items", []) or []),
image_input_count=len(image_urls),
image_risk=yn(image_safety.get("suspected")),
image_visible=yn(image_safety.get("has_visual_context")),
memory_rank_summary=memory_rank_summary,
)
context = self.context_builder.build(
room_id=room_id,
group_profile=group_profile,
sender=sender,
sender_name=sender_name,
content=content,
recent_messages=recent_messages,
member_context=memory_hints.get("member_context", {}),
member_memory_focus=member_memory_focus,
trigger=trigger.__dict__,
flow_state=flow_state.state,
reply_mode=reply_mode,
vector_memories=vector_memories,
social_memory=social_context,
group_facts=group_facts,
quote_context=quote_context | {
"has_image_attachment": bool(image_urls),
"image_safety": image_safety,
},
image_context=image_context,
)
context["coding_work_request"] = coding_work_request
system_prompt = self.persona_engine.build_system_prompt(group_profile, reply_mode)
user_prompt = build_user_prompt(context, memory_hints)
raw_response = self.llm_client.chat(
system_prompt,
user_prompt,
user_id=f"{room_id}:{sender}",
image_urls=image_urls,
)
response = LLMResultParser.sanitize_response(raw_response, content)
if not response:
self._log_event(
"model_empty",
room_id=room_id,
sender=sender,
model=self.llm_client.model,
last_error=self.llm_client.last_error,
reply_mode=reply_mode,
)
return False, "empty_response"
llm_result = LLMResultParser.parse_llm_result(
response,
current_content=content,
fallback_reply_mode=reply_mode,
fallback_topic=trigger.topic or "",
)
if not llm_result.get("should_reply", True):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="llm_no_reply",
trigger_type=trigger.trigger_type,
reply_mode=llm_result.get("reply_mode", reply_mode),
topic=llm_result.get("topic_summary", "") or llm_result.get("topic_id", ""),
)
return False, "llm_no_reply"
reply_mode = str(llm_result.get("reply_mode", reply_mode) or reply_mode)
reply_text = str(llm_result.get("reply", "") or "").strip()
selected_topic = str(llm_result.get("topic_summary", "") or llm_result.get("topic_id", "") or trigger.topic or "")
if not reply_text:
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="llm_empty_reply",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=selected_topic,
)
return False, "llm_empty_reply"
reply_chunks = finalize_reply(reply_text, reply_mode)
final_response_text = "\n".join(reply_chunks)
reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90))
if not reply_chunks or self.dedup.should_skip_duplicate_reply(
room_id=room_id,
sender=sender,
reply_text=final_response_text,
expiry_sec=reply_dedup_expiry,
):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="duplicate_reply",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
response_preview=preview_text(final_response_text),
)
return False, "duplicate_reply"
for chunk in reply_chunks:
await bot.send_text_message(room_id, chunk, sender)
self.cooldown.note_reply(room_id)
self.flow_manager.note_bot_reply(room_id)
self.memory_store.note_bot_reply(room_id, sender, selected_topic)
self._upsert_interaction_memory(room_id, sender, sender_name, content, final_response_text, trigger.trigger_type, selected_topic)
self._log_event(
"sent",
room_id=room_id,
sender=sender,
sender_name=sender_name,
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=selected_topic,
response_preview=preview_text(final_response_text),
response_len=len(final_response_text),
chunk_count=len(reply_chunks),
)
return False, "replied"
finally:
self.dedup.finish_message_processing(message_key)
def _append_group_message(self, room_id: str, message: Dict) -> None:
items = self.group_messages.setdefault(room_id, [])
items.append(message)
size = int(self.mode_config.get("recent_context_size", 30))
if len(items) > size:
self.group_messages[room_id] = items[-size:]
def _build_message_key(self, message: Dict[str, Any], content: str) -> str:
full_msg = message.get("full_wx_msg")
if full_msg is not None:
msg_id = str(getattr(full_msg, "msg_id", "") or "")
create_time = str(getattr(full_msg, "create_time", "") or "")
if msg_id:
return f"{msg_id}:{create_time}"
room_id = str(message.get("roomid", "") or "")
sender = str(message.get("sender", "") or "")
timestamp = str(int(float(message.get("timestamp") or 0)))
return f"{room_id}:{sender}:{timestamp}:{preview_text(content, 48)}"
def _normalize_content(self, message: Dict[str, Any]) -> str:
msg_type = message.get("type")
content = str(message.get("content", "")).strip()
if msg_type == MessageType.TEXT:
return strip_at_prefix(content)
if msg_type == MessageType.APP:
try:
root = ET.fromstring(content)
title = root.find(".//title")
return (title.text or "").strip() if title is not None else "[应用消息]"
except Exception:
return "[应用消息]"
return content
def _get_sender_name(self, room_id: str, sender: str) -> str:
try:
members = ContactManager.get_instance().get_group_members(room_id)
return members.get(sender, sender)
except Exception:
return sender
def _build_group_name_map(self, room_id: str) -> Dict[str, str]:
try:
members = ContactManager.get_instance().get_group_members(room_id)
return {str(wxid): str(name) for wxid, name in (members or {}).items()}
except Exception:
return {}
@staticmethod
def _get_group_name(room_id: str, message: Dict[str, Any]) -> str:
all_contacts = message.get("all_contacts", {}) or {}
return str(all_contacts.get(room_id, room_id))
def _sync_member_memory(self, room_id: str, sender: str, sender_name: str, member_context: Dict) -> None:
if not member_context:
return
version = str(member_context.get("last_profiled_at", ""))
cache_key = f"{room_id}:{sender}"
if version and self._synced_member_context_versions.get(cache_key) == version:
return
text = self.context_builder._build_member_memory_prompt(member_context)
if not text or text == "暂无稳定成员画像。":
return
payload = {
"chatroom_id": room_id,
"wxid": sender,
"display_name": sender_name,
"memory_type": "member_context_snapshot",
"source_id": cache_key,
"last_active_at": member_context.get("last_profiled_at", ""),
"topic_tags": member_context.get("topics_of_interest", [])[:5],
"summary_text": member_context.get("summary_text", ""),
}
ok = self.vector_memory.upsert_memory(f"member_context:{cache_key}:{version}", text, payload)
self._log_event(
"memory_upsert",
room_id=room_id,
sender=sender,
memory_type="member_context_snapshot",
ok=ok,
error=self.vector_memory.last_error,
)
if ok and version:
self._synced_member_context_versions[cache_key] = version
def _upsert_interaction_memory(
self,
room_id: str,
sender: str,
sender_name: str,
content: str,
response: str,
trigger_type: str,
topic: str,
) -> None:
text = f"{sender_name}说:{content}\n小牛回复:{response}"
payload = {
"chatroom_id": room_id,
"wxid": sender,
"display_name": sender_name,
"memory_type": "interaction_memory",
"topic_tags": [item for item in [topic, trigger_type] if item],
"created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"source_id": f"{room_id}:{sender}:{int(time.time())}",
"summary_text": text[:500],
}
ok = self.vector_memory.upsert_memory(payload["source_id"], text, payload)
self._log_event(
"memory_upsert",
room_id=room_id,
sender=sender,
memory_type="interaction_memory",
ok=ok,
trigger_type=trigger_type,
error=self.vector_memory.last_error,
)
def _log_event(self, event: str, **kwargs: Any) -> None:
if not self.log_debug:
return
summary = build_log_summary(event, kwargs)
self.LOG.debug(summary)