Files
abot/plugins/ai_auto_response/safety/dedup.py
2026-04-09 17:46:30 +08:00

54 lines
1.8 KiB
Python

from __future__ import annotations
import time
from typing import Dict, Set
class DedupManager:
def __init__(self):
self.inflight_message_keys: Set[str] = set()
self.recent_message_keys: Dict[str, float] = {}
self.recent_reply_signatures: Dict[str, float] = {}
def begin_message_processing(self, message_key: str, expiry_sec: int) -> bool:
if not message_key:
return True
now = time.time()
stale_keys = [key for key, ts in self.recent_message_keys.items() if now - ts > expiry_sec]
for key in stale_keys:
self.recent_message_keys.pop(key, None)
if message_key in self.inflight_message_keys:
return False
if message_key in self.recent_message_keys:
return False
self.inflight_message_keys.add(message_key)
return True
def finish_message_processing(self, message_key: str) -> None:
if not message_key:
return
self.inflight_message_keys.discard(message_key)
self.recent_message_keys[message_key] = time.time()
def should_skip_duplicate_reply(
self,
*,
room_id: str,
sender: str,
reply_text: str,
expiry_sec: int,
scope: str = "sender",
) -> bool:
text = str(reply_text or "").strip()
if not text:
return False
now = time.time()
stale_keys = [key for key, ts in self.recent_reply_signatures.items() if now - ts > expiry_sec]
for key in stale_keys:
self.recent_reply_signatures.pop(key, None)
signature = f"{room_id}:{text}" if scope == "room" else f"{room_id}:{sender}:{text}"
if signature in self.recent_reply_signatures:
return True
self.recent_reply_signatures[signature] = now
return False