优化 ai_auto_response 队列过期丢弃与超时策略

- 为 ai_auto_response 队列消息记录入队时刻,并增加消息过期丢弃机制
- 在出队前与发送前各检查一次消息是否过期,避免补发过时群聊回复
- 为 skip 日志补充 age 字段,便于观察排队老化情况
- 将 ai_auto_response 外层总超时收紧到 20 秒,并新增 message_expire_sec=12 配置
- 将 Dify auto_reply.group 请求超时收紧到 15 秒,并关闭重试以提升群聊时效性
This commit is contained in:
liuwei
2026-04-24 15:03:35 +08:00
parent ed7af06555
commit 7cee7a99e2
4 changed files with 74 additions and 4 deletions

View File

@@ -115,8 +115,11 @@ llm:
endpoint: "workflows/run"
workflow_output_key: "result_json"
response_mode: "blocking"
request_timeout: 60
max_retries: 3
# 群聊自动回复强调时效性:
# 1. Dify 请求不能等太久,否则容易出现“过了场子再补回”的违和感;
# 2. 这里把单次请求超时收紧,并关闭重试,让过期消息尽快放弃。
request_timeout: 15
max_retries: 1
retry_delay_seconds: 1.0
# 场景路由层:插件建议优先使用 scene而不是直接绑定 backend。
# 这样当模型或供应商切换时,只需要改这里,不需要逐个改插件配置。

View File

@@ -39,7 +39,8 @@ scene = "auto_reply.group"
[runtime]
llm_max_concurrency = 3
llm_call_timeout_sec = 120
llm_call_timeout_sec = 20
message_expire_sec = 12
queue_worker_count = 2
queue_maxsize = 500

View File

@@ -101,6 +101,7 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.queue_workers: List[asyncio.Task] = []
self.reply_limits: Dict[str, Any] = {}
self.prompt_compact_config: Dict[str, Any] = {}
self.message_expire_sec = 0.0
def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger
@@ -148,6 +149,20 @@ class AIAutoResponsePlugin(MessagePluginInterface):
timeout_base = int((self._config.get("api", {}) or {}).get("timeout_seconds", 60) or 60)
timeout_fallback = max(timeout_base * 2, 90)
self.llm_call_timeout_sec = max(int(runtime_config.get("llm_call_timeout_sec", timeout_fallback) or timeout_fallback), 10)
# 群聊是强时效场景:
# 1. 如果一条消息已经在队列里放太久,再回往往比“不回”更奇怪;
# 2. 因此这里引入消息过期时间,后续会在“出队前”和“发送前”各检查一次;
# 3. 默认沿用 question_reply_timeout_sec 的时效感,再允许 runtime 单独覆盖。
self.message_expire_sec = max(
float(
runtime_config.get(
"message_expire_sec",
(self._config.get("mode", {}) or {}).get("question_reply_timeout_sec", 12),
)
or 12
),
1.0,
)
self.queue_worker_count = max(int(runtime_config.get("queue_worker_count", 2) or 2), 1)
self.queue_maxsize = max(int(runtime_config.get("queue_maxsize", 500) or 500), 10)
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
@@ -159,7 +174,7 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True))
self.LOG.debug(
f"[{self.name}] 初始化完成 llm_max_concurrency={llm_max_concurrency} llm_call_timeout_sec={self.llm_call_timeout_sec} "
f"queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}"
f"message_expire_sec={self.message_expire_sec} queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}"
)
return True
@@ -213,6 +228,9 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
self._ensure_workers_started()
queued_message = dict(message)
# 记录入队时刻,供后续判断这条消息是否已经“聊过时”。
# 使用 monotonic 避免系统时间调整影响队列老化判断。
queued_message["_queued_at_mono"] = time.monotonic()
try:
self.message_queue.put_nowait(queued_message)
self._log_event(
@@ -240,6 +258,18 @@ class AIAutoResponsePlugin(MessagePluginInterface):
bot: WechatAPIClient = message.get("bot")
is_at = bool(message.get("is_at", False))
content = self._normalize_content(message)
stale_age_sec = self._get_message_queue_age_sec(message)
if self._is_message_stale(message):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="stale_queued_message",
trigger_type="stale_guard",
reply_mode="drop",
age_sec=round(stale_age_sec, 2),
)
return False, "stale_queued_message"
message_key = self._build_message_key(message, content)
dedup_expiry = int(self.cooldown_config.get("message_dedup_window_sec", 180))
if not self.dedup.begin_message_processing(message_key, dedup_expiry):
@@ -584,6 +614,22 @@ class AIAutoResponsePlugin(MessagePluginInterface):
reply_chunks = finalize_reply(reply_text, reply_mode, self.reply_limits)
final_response_text = "\n".join(reply_chunks)
# 第二次过期判断:
# 1. 这一步专门防止“LLM 慢返回后补发过时回复”;
# 2. 即使消息进模型时还新鲜,等模型回完也可能已经跟不上群聊了;
# 3. 这种情况下直接放弃发送,比突然补回旧话更自然。
if self._is_message_stale(message):
self._log_event(
"skip",
room_id=room_id,
sender=sender,
reason="stale_before_send",
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
topic=selected_topic,
age_sec=round(self._get_message_queue_age_sec(message), 2),
)
return False, "stale_before_send"
reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90))
if not reply_chunks or self.dedup.should_skip_duplicate_reply(
room_id=room_id,
@@ -1218,6 +1264,22 @@ class AIAutoResponsePlugin(MessagePluginInterface):
timestamp = str(int(float(message.get("timestamp") or 0)))
return f"{room_id}:{sender}:{timestamp}:{preview_text(content, 48)}"
def _get_message_queue_age_sec(self, message: Dict[str, Any]) -> float:
queued_at = message.get("_queued_at_mono")
if queued_at in (None, ""):
return 0.0
try:
return max(time.monotonic() - float(queued_at), 0.0)
except (TypeError, ValueError):
return 0.0
def _is_message_stale(self, message: Dict[str, Any]) -> bool:
# 这里只看“排队/等待总时长”,不依赖消息业务时间戳:
# 1. 队列老化才是补发过时回复的直接原因;
# 2. 不同上游消息时间字段格式不统一,而入队时间一定可控;
# 3. 这样实现最稳定,也最符合“超过多久就别回了”的产品语义。
return self._get_message_queue_age_sec(message) >= float(self.message_expire_sec)
def _normalize_content(self, message: Dict[str, Any]) -> str:
msg_type = message.get("type")
content = str(message.get("content", "")).strip()

View File

@@ -40,6 +40,9 @@ def build_log_summary(event: str, data: Dict[str, Any]) -> str:
).strip()
if event == "skip":
age_text = ""
if data.get("age_sec") not in (None, ""):
age_text = f" age={data.get('age_sec')}"
return (
f"[XIAONIU] SKIP room={room} user={sender} "
f"reason={data.get('reason', '')} "
@@ -48,6 +51,7 @@ def build_log_summary(event: str, data: Dict[str, Any]) -> str:
f"topic={data.get('topic', '-') or '-'} "
f"acc={data.get('acceptance_state', '-') or '-'} "
f"solver={data.get('solver', '-') or '-'}"
f"{age_text}"
).strip()
if event == "context":