优化 ai_auto_response 仅回复同群最新消息

- 为同群消息引入递增序号,记录当前群的最新消息版本
- 在出队前跳过已被同群更新消息覆盖的旧消息
- 在发送前再次检查消息是否已被覆盖,避免 LLM 慢返回后补发旧回复
- 保持实现仅影响同一群内的消息时效,不干扰其他群的并行处理
This commit is contained in:
liuwei
2026-04-24 15:06:05 +08:00
parent 7cee7a99e2
commit cd2024dfb5

View File

@@ -102,6 +102,8 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.reply_limits: Dict[str, Any] = {}
self.prompt_compact_config: Dict[str, Any] = {}
self.message_expire_sec = 0.0
self.room_message_seq_counter = 0
self.latest_room_message_seq: Dict[str, int] = {}
def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger
@@ -231,6 +233,11 @@ class AIAutoResponsePlugin(MessagePluginInterface):
# 记录入队时刻,供后续判断这条消息是否已经“聊过时”。
# 使用 monotonic 避免系统时间调整影响队列老化判断。
queued_message["_queued_at_mono"] = time.monotonic()
# 记录“同群最新消息版本号”:
# 1. 每来一条新消息,就给当前群分配一个更大的序号;
# 2. 后续旧消息即使已经排队甚至已经进模型,只要序号落后,就视为过时;
# 3. 这样可以保证群里只会优先回应最新现场,避免补发旧话。
queued_message["_room_message_seq"] = self._next_room_message_seq(room_id)
try:
self.message_queue.put_nowait(queued_message)
self._log_event(
@@ -270,6 +277,16 @@ class AIAutoResponsePlugin(MessagePluginInterface):
age_sec=round(stale_age_sec, 2),
)
return False, "stale_queued_message"
if self._is_message_superseded(message):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="superseded_by_newer_message",
trigger_type="latest_only_guard",
reply_mode="drop",
)
return False, "superseded_by_newer_message"
message_key = self._build_message_key(message, content)
dedup_expiry = int(self.cooldown_config.get("message_dedup_window_sec", 180))
if not self.dedup.begin_message_processing(message_key, dedup_expiry):
@@ -630,6 +647,21 @@ class AIAutoResponsePlugin(MessagePluginInterface):
age_sec=round(self._get_message_queue_age_sec(message), 2),
)
return False, "stale_before_send"
# 第二次“只回最新消息”判断:
# 1. 旧消息可能已经进了 LLM但这期间同群又来了更新内容
# 2. 这时即使模型产出了结果,也不应该再把旧回复补发出去;
# 3. 直接丢弃旧结果,让群里只看到贴着最新现场的回复。
if self._is_message_superseded(message):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="superseded_before_send",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=selected_topic,
)
return False, "superseded_before_send"
reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90))
if not reply_chunks or self.dedup.should_skip_duplicate_reply(
room_id=room_id,
@@ -1280,6 +1312,24 @@ class AIAutoResponsePlugin(MessagePluginInterface):
# 3. 这样实现最稳定,也最符合“超过多久就别回了”的产品语义。
return self._get_message_queue_age_sec(message) >= float(self.message_expire_sec)
def _next_room_message_seq(self, room_id: str) -> int:
self.room_message_seq_counter += 1
seq = self.room_message_seq_counter
if room_id:
self.latest_room_message_seq[room_id] = seq
return seq
def _is_message_superseded(self, message: Dict[str, Any]) -> bool:
room_id = str(message.get("roomid", "") or "")
if not room_id:
return False
current_seq = message.get("_room_message_seq")
latest_seq = self.latest_room_message_seq.get(room_id)
try:
return int(current_seq or 0) < int(latest_seq or 0)
except (TypeError, ValueError):
return False
def _normalize_content(self, message: Dict[str, Any]) -> str:
msg_type = message.get("type")
content = str(message.get("content", "")).strip()