From 265f3835b40bf04a37e22bbd354ff33d859f63d0 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 15 Apr 2026 10:23:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B7=B2=E6=8C=89=E4=BD=A0=E8=AF=B4=E7=9A=84?= =?UTF-8?q?=E6=94=B9=E6=88=90=E2=80=9C=E5=91=BD=E4=B8=AD=E5=8D=B3=E8=B7=B3?= =?UTF-8?q?=E5=87=BA=20+=20=E5=90=8E=E5=8F=B0=E5=85=A5=E9=98=9F=E5=A4=84?= =?UTF-8?q?=E7=90=86=E2=80=9D=EF=BC=8C=E6=8F=92=E4=BB=B6=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E4=BB=8D=E7=84=B6=E6=98=AF=E7=8B=AC=E5=8D=A0=E7=AD=96=E7=95=A5?= =?UTF-8?q?=EF=BC=8C=E6=B2=A1=E6=9C=89=E5=81=9A=E5=B9=B6=E8=A1=8C=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E5=A4=84=E7=90=86=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主要改动在 main.py: process_message(...) 现在只做入队,立刻返回 True, "queued",主链路立即释放。 原来的完整处理逻辑迁到 _process_message_impl(...),由后台 worker 消费队列执行。 新增队列 worker 循环 _message_worker_loop(...),并在 start()/stop() 管理生命周期。 增加 _ensure_workers_started(),避免插件在无事件循环阶段 create_task 报错(延迟到有 loop 时启动)。 保留了之前的 LLM 防阻塞措施:to_thread + semaphore + timeout。 配置也加在 config.toml 的 [runtime]: llm_max_concurrency = 3 llm_call_timeout_sec = 120 queue_worker_count = 2 queue_maxsize = 500 已做语法校验:py_compile 通过。 --- plugins/ai_auto_response/config.toml | 6 + plugins/ai_auto_response/main.py | 169 ++++++++++++++++++++++++--- 2 files changed, 161 insertions(+), 14 deletions(-) diff --git a/plugins/ai_auto_response/config.toml b/plugins/ai_auto_response/config.toml index 514a7ac..aa1b61a 100644 --- a/plugins/ai_auto_response/config.toml +++ b/plugins/ai_auto_response/config.toml @@ -36,6 +36,12 @@ aliases = ["林志玲", "lingzhiling", "温柔", "温柔版"] [api] backend = "dify_workflow_ai_auto_response" +[runtime] +llm_max_concurrency = 3 +llm_call_timeout_sec = 120 +queue_worker_count = 2 +queue_maxsize = 500 + [mode] group_default_mode = "social" question_reply_timeout_sec = 12 diff --git a/plugins/ai_auto_response/main.py b/plugins/ai_auto_response/main.py index fcfc75e..6edcafe 100644 --- a/plugins/ai_auto_response/main.py +++ b/plugins/ai_auto_response/main.py @@ -1,5 +1,5 @@ from __future__ import annotations - +import asyncio import time import xml.etree.ElementTree as ET from typing import Any, Dict, List, Optional, Tuple @@ -92,6 +92,12 @@ class AIAutoResponsePlugin(MessagePluginInterface): self.group_messages: Dict[str, List[Dict]] = {} self.enable = True self.dedup = DedupManager() + self.llm_semaphore: Optional[asyncio.Semaphore] = None + self.llm_call_timeout_sec = 0 + self.message_queue: Optional[asyncio.Queue] = None + self.queue_worker_count = 1 + self.queue_maxsize = 200 + self.queue_workers: List[asyncio.Task] = [] def initialize(self, context: Dict[str, Any]) -> bool: self.LOG = logger @@ -131,21 +137,40 @@ class AIAutoResponsePlugin(MessagePluginInterface): self.cooldown = CooldownManager(self.cooldown_config) self.image_config = self._config.get("image", {}) or {} self.spam_config = self._config.get("spam_guard", {}) or {} + runtime_config = self._config.get("runtime", {}) or {} + llm_max_concurrency = max(int(runtime_config.get("llm_max_concurrency", 3) or 3), 1) + self.llm_semaphore = asyncio.Semaphore(llm_max_concurrency) + timeout_base = int((self._config.get("api", {}) or {}).get("timeout_seconds", 60) or 60) + timeout_fallback = max(timeout_base * 2, 90) + self.llm_call_timeout_sec = max(int(runtime_config.get("llm_call_timeout_sec", timeout_fallback) or timeout_fallback), 10) + self.queue_worker_count = max(int(runtime_config.get("queue_worker_count", 2) or 2), 1) + self.queue_maxsize = max(int(runtime_config.get("queue_maxsize", 500) or 500), 10) + self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) try: self.redis_client = self.db_manager.get_redis_connection() if self.db_manager else None except Exception: self.redis_client = None self._synced_member_context_versions: Dict[str, str] = {} self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True)) - self.LOG.debug(f"[{self.name}] 初始化完成") + self.LOG.debug( + f"[{self.name}] 初始化完成 llm_max_concurrency={llm_max_concurrency} llm_call_timeout_sec={self.llm_call_timeout_sec} " + f"queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}" + ) return True def start(self) -> bool: self.status = PluginStatus.RUNNING + if self.message_queue is None: + self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) + self._ensure_workers_started() return True def stop(self) -> bool: self.status = PluginStatus.STOPPED + for worker in self.queue_workers: + if not worker.done(): + worker.cancel() + self.queue_workers = [] return True def can_process(self, message: Dict[str, Any]) -> bool: @@ -177,6 +202,32 @@ class AIAutoResponsePlugin(MessagePluginInterface): return True async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + room_id = message.get("roomid", "") + sender = message.get("sender", "") + if self.message_queue is None: + self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) + self._ensure_workers_started() + queued_message = dict(message) + try: + self.message_queue.put_nowait(queued_message) + self._log_event( + "queued", + room_id=room_id, + sender=sender, + queue_size=self.message_queue.qsize(), + ) + return True, "queued" + except asyncio.QueueFull: + self._log_event( + "drop", + room_id=room_id, + sender=sender, + reason="queue_full", + queue_maxsize=self.queue_maxsize, + ) + return True, "queue_full" + + async def _process_message_impl(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: room_id = message.get("roomid", "") sender = message.get("sender", "") bot: WechatAPIClient = message.get("bot") @@ -450,18 +501,31 @@ class AIAutoResponsePlugin(MessagePluginInterface): system_prompt = self.persona_engine.build_system_prompt(group_profile, reply_mode) user_prompt = build_user_prompt(context, memory_hints) - raw_response = self._call_llm( - room_id=room_id, - sender=sender, - sender_name=sender_name, - content=content, - group_profile=group_profile, - memory_hints=memory_hints, - context=context, - system_prompt=system_prompt, - user_prompt=user_prompt, - image_urls=image_urls, - ) + try: + raw_response = await self._call_llm_async( + room_id=room_id, + sender=sender, + sender_name=sender_name, + content=content, + group_profile=group_profile, + memory_hints=memory_hints, + context=context, + system_prompt=system_prompt, + user_prompt=user_prompt, + image_urls=image_urls, + ) + except asyncio.TimeoutError: + self._log_event( + "model_timeout", + room_id=room_id, + sender=sender, + timeout_sec=self.llm_call_timeout_sec, + model=self.llm_client.model, + provider=self.llm_client.provider, + trigger_type=trigger.trigger_type, + reply_mode=reply_mode, + ) + return False, "llm_timeout" response = LLMResultParser.sanitize_response(raw_response, content) if not response: self._log_event( @@ -549,6 +613,48 @@ class AIAutoResponsePlugin(MessagePluginInterface): finally: self.dedup.finish_message_processing(message_key) + async def _message_worker_loop(self, worker_index: int) -> None: + if self.message_queue is None: + return + while self.status == PluginStatus.RUNNING: + try: + message = await self.message_queue.get() + except asyncio.CancelledError: + break + + room_id = message.get("roomid", "") + sender = message.get("sender", "") + try: + await self._process_message_impl(message) + except asyncio.CancelledError: + break + except Exception as exc: + self.LOG.exception(f"[{self.name}] 后台处理失败 worker={worker_index} room={room_id} sender={sender}: {exc}") + finally: + self.message_queue.task_done() + + def _ensure_workers_started(self) -> None: + if self.status != PluginStatus.RUNNING: + return + if self.message_queue is None: + self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize) + + alive_workers = [worker for worker in self.queue_workers if not worker.done()] + self.queue_workers = alive_workers + missing = self.queue_worker_count - len(self.queue_workers) + if missing <= 0: + return + + try: + asyncio.get_running_loop() + except RuntimeError: + return + + start_index = len(self.queue_workers) + 1 + for i in range(missing): + worker = asyncio.create_task(self._message_worker_loop(worker_index=start_index + i)) + self.queue_workers.append(worker) + def _append_group_message(self, room_id: str, message: Dict) -> None: items = self.group_messages.setdefault(room_id, []) items.append(message) @@ -599,6 +705,41 @@ class AIAutoResponsePlugin(MessagePluginInterface): image_urls=image_urls, ) + async def _call_llm_async( + self, + *, + room_id: str, + sender: str, + sender_name: str, + content: str, + group_profile: Dict, + memory_hints: Dict, + context: Dict, + system_prompt: str, + user_prompt: str, + image_urls: List[str], + ) -> str: + if self.llm_semaphore is None: + self.llm_semaphore = asyncio.Semaphore(1) + + async with self.llm_semaphore: + return await asyncio.wait_for( + asyncio.to_thread( + self._call_llm, + room_id=room_id, + sender=sender, + sender_name=sender_name, + content=content, + group_profile=group_profile, + memory_hints=memory_hints, + context=context, + system_prompt=system_prompt, + user_prompt=user_prompt, + image_urls=image_urls, + ), + timeout=self.llm_call_timeout_sec, + ) + def _build_dify_simple_inputs( self, *,