From 7cee7a99e27cb081405cd3d5f6143555bc772c82 Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 24 Apr 2026 15:03:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20ai=5Fauto=5Fresponse=20?= =?UTF-8?q?=E9=98=9F=E5=88=97=E8=BF=87=E6=9C=9F=E4=B8=A2=E5=BC=83=E4=B8=8E?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 为 ai_auto_response 队列消息记录入队时刻,并增加消息过期丢弃机制 - 在出队前与发送前各检查一次消息是否过期,避免补发过时群聊回复 - 为 skip 日志补充 age 字段,便于观察排队老化情况 - 将 ai_auto_response 外层总超时收紧到 20 秒,并新增 message_expire_sec=12 配置 - 将 Dify auto_reply.group 请求超时收紧到 15 秒,并关闭重试以提升群聊时效性 --- config.yaml | 7 ++- plugins/ai_auto_response/config.toml | 3 +- plugins/ai_auto_response/main.py | 64 ++++++++++++++++++++- plugins/ai_auto_response/runtime/logging.py | 4 ++ 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/config.yaml b/config.yaml index 2cffee8..eb42905 100644 --- a/config.yaml +++ b/config.yaml @@ -115,8 +115,11 @@ llm: endpoint: "workflows/run" workflow_output_key: "result_json" response_mode: "blocking" - request_timeout: 60 - max_retries: 3 + # 群聊自动回复强调时效性: + # 1. Dify 请求不能等太久,否则容易出现“过了场子再补回”的违和感; + # 2. 这里把单次请求超时收紧,并关闭重试,让过期消息尽快放弃。 + request_timeout: 15 + max_retries: 1 retry_delay_seconds: 1.0 # 场景路由层:插件建议优先使用 scene,而不是直接绑定 backend。 # 这样当模型或供应商切换时,只需要改这里,不需要逐个改插件配置。 diff --git a/plugins/ai_auto_response/config.toml b/plugins/ai_auto_response/config.toml index 5ad3106..a9144ea 100644 --- a/plugins/ai_auto_response/config.toml +++ b/plugins/ai_auto_response/config.toml @@ -39,7 +39,8 @@ scene = "auto_reply.group" [runtime] llm_max_concurrency = 3 -llm_call_timeout_sec = 120 +llm_call_timeout_sec = 20 +message_expire_sec = 12 queue_worker_count = 2 queue_maxsize = 500 diff --git a/plugins/ai_auto_response/main.py b/plugins/ai_auto_response/main.py index 964e70f..6b7a6b5 100644 --- a/plugins/ai_auto_response/main.py +++ b/plugins/ai_auto_response/main.py @@ -101,6 +101,7 @@ class AIAutoResponsePlugin(MessagePluginInterface): self.queue_workers: List[asyncio.Task] = [] self.reply_limits: Dict[str, Any] = {} self.prompt_compact_config: Dict[str, Any] = {} + self.message_expire_sec = 0.0 def initialize(self, context: Dict[str, Any]) -> bool: self.LOG = logger @@ -148,6 +149,20 @@ class AIAutoResponsePlugin(MessagePluginInterface): timeout_base = int((self._config.get("api", {}) or {}).get("timeout_seconds", 60) or 60) timeout_fallback = max(timeout_base * 2, 90) self.llm_call_timeout_sec = max(int(runtime_config.get("llm_call_timeout_sec", timeout_fallback) or timeout_fallback), 10) + # 群聊是强时效场景: + # 1. 如果一条消息已经在队列里放太久,再回往往比“不回”更奇怪; + # 2. 因此这里引入消息过期时间,后续会在“出队前”和“发送前”各检查一次; + # 3. 默认沿用 question_reply_timeout_sec 的时效感,再允许 runtime 单独覆盖。 + self.message_expire_sec = max( + float( + runtime_config.get( + "message_expire_sec", + (self._config.get("mode", {}) or {}).get("question_reply_timeout_sec", 12), + ) + or 12 + ), + 1.0, + ) self.queue_worker_count = max(int(runtime_config.get("queue_worker_count", 2) or 2), 1) self.queue_maxsize = max(int(runtime_config.get("queue_maxsize", 500) or 500), 10) self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) @@ -159,7 +174,7 @@ class AIAutoResponsePlugin(MessagePluginInterface): self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True)) self.LOG.debug( f"[{self.name}] 初始化完成 llm_max_concurrency={llm_max_concurrency} llm_call_timeout_sec={self.llm_call_timeout_sec} " - f"queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}" + f"message_expire_sec={self.message_expire_sec} queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}" ) return True @@ -213,6 +228,9 @@ class AIAutoResponsePlugin(MessagePluginInterface): self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) self._ensure_workers_started() queued_message = dict(message) + # 记录入队时刻,供后续判断这条消息是否已经“聊过时”。 + # 使用 monotonic 避免系统时间调整影响队列老化判断。 + queued_message["_queued_at_mono"] = time.monotonic() try: self.message_queue.put_nowait(queued_message) self._log_event( @@ -240,6 +258,18 @@ class AIAutoResponsePlugin(MessagePluginInterface): bot: WechatAPIClient = message.get("bot") is_at = bool(message.get("is_at", False)) content = self._normalize_content(message) + stale_age_sec = self._get_message_queue_age_sec(message) + if self._is_message_stale(message): + self._log_event( + "skip", + room_id=room_id, + sender=sender, + reason="stale_queued_message", + trigger_type="stale_guard", + reply_mode="drop", + age_sec=round(stale_age_sec, 2), + ) + return False, "stale_queued_message" message_key = self._build_message_key(message, content) dedup_expiry = int(self.cooldown_config.get("message_dedup_window_sec", 180)) if not self.dedup.begin_message_processing(message_key, dedup_expiry): @@ -584,6 +614,22 @@ class AIAutoResponsePlugin(MessagePluginInterface): reply_chunks = finalize_reply(reply_text, reply_mode, self.reply_limits) final_response_text = "\n".join(reply_chunks) + # 第二次过期判断: + # 1. 这一步专门防止“LLM 慢返回后补发过时回复”; + # 2. 即使消息进模型时还新鲜,等模型回完也可能已经跟不上群聊了; + # 3. 这种情况下直接放弃发送,比突然补回旧话更自然。 + if self._is_message_stale(message): + self._log_event( + "skip", + room_id=room_id, + sender=sender, + reason="stale_before_send", + trigger_type=trigger.trigger_type, + reply_mode=reply_mode, + topic=selected_topic, + age_sec=round(self._get_message_queue_age_sec(message), 2), + ) + return False, "stale_before_send" reply_dedup_expiry = int(self.cooldown_config.get("reply_dedup_window_sec", 90)) if not reply_chunks or self.dedup.should_skip_duplicate_reply( room_id=room_id, @@ -1218,6 +1264,22 @@ class AIAutoResponsePlugin(MessagePluginInterface): timestamp = str(int(float(message.get("timestamp") or 0))) return f"{room_id}:{sender}:{timestamp}:{preview_text(content, 48)}" + def _get_message_queue_age_sec(self, message: Dict[str, Any]) -> float: + queued_at = message.get("_queued_at_mono") + if queued_at in (None, ""): + return 0.0 + try: + return max(time.monotonic() - float(queued_at), 0.0) + except (TypeError, ValueError): + return 0.0 + + def _is_message_stale(self, message: Dict[str, Any]) -> bool: + # 这里只看“排队/等待总时长”,不依赖消息业务时间戳: + # 1. 队列老化才是补发过时回复的直接原因; + # 2. 不同上游消息时间字段格式不统一,而入队时间一定可控; + # 3. 这样实现最稳定,也最符合“超过多久就别回了”的产品语义。 + return self._get_message_queue_age_sec(message) >= float(self.message_expire_sec) + def _normalize_content(self, message: Dict[str, Any]) -> str: msg_type = message.get("type") content = str(message.get("content", "")).strip() diff --git a/plugins/ai_auto_response/runtime/logging.py b/plugins/ai_auto_response/runtime/logging.py index 27dab94..c9353a3 100644 --- a/plugins/ai_auto_response/runtime/logging.py +++ b/plugins/ai_auto_response/runtime/logging.py @@ -40,6 +40,9 @@ def build_log_summary(event: str, data: Dict[str, Any]) -> str: ).strip() if event == "skip": + age_text = "" + if data.get("age_sec") not in (None, ""): + age_text = f" age={data.get('age_sec')}" return ( f"[XIAONIU] SKIP room={room} user={sender} " f"reason={data.get('reason', '')} " @@ -48,6 +51,7 @@ def build_log_summary(event: str, data: Dict[str, Any]) -> str: f"topic={data.get('topic', '-') or '-'} " f"acc={data.get('acceptance_state', '-') or '-'} " f"solver={data.get('solver', '-') or '-'}" + f"{age_text}" ).strip() if event == "context":