已按你说的改成“命中即跳出 + 后台入队处理”,插件调度仍然是独占策略,没有做并行插件处理。

主要改动在 main.py:

process_message(...) 现在只做入队,立刻返回 True, "queued",主链路立即释放。
原来的完整处理逻辑迁到 _process_message_impl(...),由后台 worker 消费队列执行。
新增队列 worker 循环 _message_worker_loop(...),并在 start()/stop() 管理生命周期。
增加 _ensure_workers_started(),避免插件在无事件循环阶段 create_task 报错(延迟到有 loop 时启动)。
保留了之前的 LLM 防阻塞措施:to_thread + semaphore + timeout。
配置也加在 config.toml 的 [runtime]:

llm_max_concurrency = 3
llm_call_timeout_sec = 120
queue_worker_count = 2
queue_maxsize = 500
已做语法校验:py_compile 通过。
This commit is contained in:
liuwei
2026-04-15 10:23:05 +08:00
parent 4cf5a05088
commit 265f3835b4
2 changed files with 161 additions and 14 deletions

View File

@@ -36,6 +36,12 @@ aliases = ["林志玲", "lingzhiling", "温柔", "温柔版"]
[api]
backend = "dify_workflow_ai_auto_response"
[runtime]
llm_max_concurrency = 3
llm_call_timeout_sec = 120
queue_worker_count = 2
queue_maxsize = 500
[mode]
group_default_mode = "social"
question_reply_timeout_sec = 12

View File

@@ -1,5 +1,5 @@
from __future__ import annotations
import asyncio
import time
import xml.etree.ElementTree as ET
from typing import Any, Dict, List, Optional, Tuple
@@ -92,6 +92,12 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.group_messages: Dict[str, List[Dict]] = {}
self.enable = True
self.dedup = DedupManager()
self.llm_semaphore: Optional[asyncio.Semaphore] = None
self.llm_call_timeout_sec = 0
self.message_queue: Optional[asyncio.Queue] = None
self.queue_worker_count = 1
self.queue_maxsize = 200
self.queue_workers: List[asyncio.Task] = []
def initialize(self, context: Dict[str, Any]) -> bool:
self.LOG = logger
@@ -131,21 +137,40 @@ class AIAutoResponsePlugin(MessagePluginInterface):
self.cooldown = CooldownManager(self.cooldown_config)
self.image_config = self._config.get("image", {}) or {}
self.spam_config = self._config.get("spam_guard", {}) or {}
runtime_config = self._config.get("runtime", {}) or {}
llm_max_concurrency = max(int(runtime_config.get("llm_max_concurrency", 3) or 3), 1)
self.llm_semaphore = asyncio.Semaphore(llm_max_concurrency)
timeout_base = int((self._config.get("api", {}) or {}).get("timeout_seconds", 60) or 60)
timeout_fallback = max(timeout_base * 2, 90)
self.llm_call_timeout_sec = max(int(runtime_config.get("llm_call_timeout_sec", timeout_fallback) or timeout_fallback), 10)
self.queue_worker_count = max(int(runtime_config.get("queue_worker_count", 2) or 2), 1)
self.queue_maxsize = max(int(runtime_config.get("queue_maxsize", 500) or 500), 10)
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
try:
self.redis_client = self.db_manager.get_redis_connection() if self.db_manager else None
except Exception:
self.redis_client = None
self._synced_member_context_versions: Dict[str, str] = {}
self.log_debug = bool((self._config.get("logging", {}) or {}).get("debug", True))
self.LOG.debug(f"[{self.name}] 初始化完成")
self.LOG.debug(
f"[{self.name}] 初始化完成 llm_max_concurrency={llm_max_concurrency} llm_call_timeout_sec={self.llm_call_timeout_sec} "
f"queue_worker_count={self.queue_worker_count} queue_maxsize={self.queue_maxsize}"
)
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
if self.message_queue is None:
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
self._ensure_workers_started()
return True
def stop(self) -> bool:
self.status = PluginStatus.STOPPED
for worker in self.queue_workers:
if not worker.done():
worker.cancel()
self.queue_workers = []
return True
def can_process(self, message: Dict[str, Any]) -> bool:
@@ -177,6 +202,32 @@ class AIAutoResponsePlugin(MessagePluginInterface):
return True
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
room_id = message.get("roomid", "")
sender = message.get("sender", "")
if self.message_queue is None:
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
self._ensure_workers_started()
queued_message = dict(message)
try:
self.message_queue.put_nowait(queued_message)
self._log_event(
"queued",
room_id=room_id,
sender=sender,
queue_size=self.message_queue.qsize(),
)
return True, "queued"
except asyncio.QueueFull:
self._log_event(
"drop",
room_id=room_id,
sender=sender,
reason="queue_full",
queue_maxsize=self.queue_maxsize,
)
return True, "queue_full"
async def _process_message_impl(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
room_id = message.get("roomid", "")
sender = message.get("sender", "")
bot: WechatAPIClient = message.get("bot")
@@ -450,18 +501,31 @@ class AIAutoResponsePlugin(MessagePluginInterface):
system_prompt = self.persona_engine.build_system_prompt(group_profile, reply_mode)
user_prompt = build_user_prompt(context, memory_hints)
raw_response = self._call_llm(
room_id=room_id,
sender=sender,
sender_name=sender_name,
content=content,
group_profile=group_profile,
memory_hints=memory_hints,
context=context,
system_prompt=system_prompt,
user_prompt=user_prompt,
image_urls=image_urls,
)
try:
raw_response = await self._call_llm_async(
room_id=room_id,
sender=sender,
sender_name=sender_name,
content=content,
group_profile=group_profile,
memory_hints=memory_hints,
context=context,
system_prompt=system_prompt,
user_prompt=user_prompt,
image_urls=image_urls,
)
except asyncio.TimeoutError:
self._log_event(
"model_timeout",
room_id=room_id,
sender=sender,
timeout_sec=self.llm_call_timeout_sec,
model=self.llm_client.model,
provider=self.llm_client.provider,
trigger_type=trigger.trigger_type,
reply_mode=reply_mode,
)
return False, "llm_timeout"
response = LLMResultParser.sanitize_response(raw_response, content)
if not response:
self._log_event(
@@ -549,6 +613,48 @@ class AIAutoResponsePlugin(MessagePluginInterface):
finally:
self.dedup.finish_message_processing(message_key)
async def _message_worker_loop(self, worker_index: int) -> None:
if self.message_queue is None:
return
while self.status == PluginStatus.RUNNING:
try:
message = await self.message_queue.get()
except asyncio.CancelledError:
break
room_id = message.get("roomid", "")
sender = message.get("sender", "")
try:
await self._process_message_impl(message)
except asyncio.CancelledError:
break
except Exception as exc:
self.LOG.exception(f"[{self.name}] 后台处理失败 worker={worker_index} room={room_id} sender={sender}: {exc}")
finally:
self.message_queue.task_done()
def _ensure_workers_started(self) -> None:
if self.status != PluginStatus.RUNNING:
return
if self.message_queue is None:
self.message_queue = asyncio.Queue(maxsize=self.queue_maxsize)
alive_workers = [worker for worker in self.queue_workers if not worker.done()]
self.queue_workers = alive_workers
missing = self.queue_worker_count - len(self.queue_workers)
if missing <= 0:
return
try:
asyncio.get_running_loop()
except RuntimeError:
return
start_index = len(self.queue_workers) + 1
for i in range(missing):
worker = asyncio.create_task(self._message_worker_loop(worker_index=start_index + i))
self.queue_workers.append(worker)
def _append_group_message(self, room_id: str, message: Dict) -> None:
items = self.group_messages.setdefault(room_id, [])
items.append(message)
@@ -599,6 +705,41 @@ class AIAutoResponsePlugin(MessagePluginInterface):
image_urls=image_urls,
)
async def _call_llm_async(
self,
*,
room_id: str,
sender: str,
sender_name: str,
content: str,
group_profile: Dict,
memory_hints: Dict,
context: Dict,
system_prompt: str,
user_prompt: str,
image_urls: List[str],
) -> str:
if self.llm_semaphore is None:
self.llm_semaphore = asyncio.Semaphore(1)
async with self.llm_semaphore:
return await asyncio.wait_for(
asyncio.to_thread(
self._call_llm,
room_id=room_id,
sender=sender,
sender_name=sender_name,
content=content,
group_profile=group_profile,
memory_hints=memory_hints,
context=context,
system_prompt=system_prompt,
user_prompt=user_prompt,
image_urls=image_urls,
),
timeout=self.llm_call_timeout_sec,
)
def _build_dify_simple_inputs(
self,
*,