为长任务插件接入后台任务模式

1. 为消息插件新增可配置的前台/后台分发模式,并在 robot 主链路中加入独立后台任务池,避免长任务长期占用前台 20 个消息处理槽位。

2. 放宽插件执行超时上限到 1800 秒,支持 200 秒以上长任务,同时保留熔断、统计和异常记录。

3. 为群聊总结和 AI 绘图启用后台执行配置,并将菜单插件默认输出改回文本模式。
This commit is contained in:
Liu
2026-05-01 11:23:52 +08:00
parent 1b6da6db1f
commit 0d1362f97e
7 changed files with 350 additions and 77 deletions

361
robot.py
View File

@@ -36,7 +36,7 @@ from utils.trace_context import set_current_trace_id, reset_current_trace_id
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import WxMessage, MessageType
# 定义全局信号量,限制最大并发 10
# 定义前台消息处理信号量,限制最多 20 条消息同时进入主处理链路
sem = asyncio.Semaphore(20)
@@ -144,6 +144,18 @@ class Robot:
# 通过类属性设置 admin_list而不是实例属性
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
self.recent_msg_ids = deque(maxlen=20)
# 长任务插件需要与“前台消息并发槽位”解耦:
# 1. 当前 `sem=20` 保护的是“正在处理消息的协程数”,并不适合被 200 秒级长任务长期占住;
# 2. 因此这里额外引入一个后台插件任务池,让长任务在独立并发池里运行;
# 3. 这样消息接收与轻量命令仍然能继续流动,后台长任务则在单独池子里排队执行。
runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {}
runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
self.background_plugin_max_concurrency = self._safe_positive_int(
runtime_config.get("background_plugin_max_concurrency"),
6,
)
self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency)
self.background_plugin_tasks = set()
def apply_runtime_config(self, reload_catalog: bool = False) -> None:
"""把最新全局配置应用到当前运行中的关键对象。
@@ -164,6 +176,18 @@ class Robot:
# 管理员列表走 GroupBotManager 的类级缓存;只 reload Config 不会自动回写到这里。
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
# 后台插件任务池允许从全局配置热刷新:
# 1. 新值只影响后续新入队的后台任务,不会粗暴中断已在运行中的长任务;
# 2. 这里直接替换为新的 Semaphore逻辑简单且足够符合“后续生效”的预期
# 3. 若后续需要做到“平滑缩容”,再在这里补更细的迁移策略。
runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {}
runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
self.background_plugin_max_concurrency = self._safe_positive_int(
runtime_config.get("background_plugin_max_concurrency"),
self.background_plugin_max_concurrency,
)
self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency)
# system_context 中保存的是 config 对象引用reload 后插件读取到的是最新字段。
# 但 LLMRegistry 自己还有一层短 TTL 缓存,因此保存全局 LLM 配置后需要显式清掉。
if reload_catalog:
@@ -174,7 +198,8 @@ class Robot:
"运行时配置已应用: "
f"admin_count={len(GroupBotManager.admin_list)}, "
f"email_sender={'ready' if self.email_sender else 'missing'}, "
f"llm_cache_reloaded={reload_catalog}"
f"llm_cache_reloaded={reload_catalog}, "
f"background_plugin_max_concurrency={self.background_plugin_max_concurrency}"
)
def _cleanup_migrated_system_jobs(self):
@@ -668,84 +693,73 @@ class Robot:
)
continue
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
self.plugin_manager.record_plugin_execution_success(
plugin,
process_time_ms=self._elapsed_ms(started_at),
)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=self._elapsed_ms(started_at),
)
if processed:
dispatch_mode = self._resolve_message_plugin_dispatch_mode(plugin, plugin_msg)
if dispatch_mode == "background":
self._schedule_background_plugin_execution(
plugin=plugin,
plugin_msg=plugin_msg,
msg=msg,
command_name=command_name,
protection_policy=protection_policy,
)
self.LOG.info(
self._trace_message(
msg,
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={self._elapsed_ms(started_at)}"
f"插件后台排队 plugin={plugin.name} command={command_name} "
f"timeout={protection_policy['process_timeout_seconds']}s "
f"background_pool={self.background_plugin_max_concurrency} "
f"pending={len(self.background_plugin_tasks)}"
)
)
# 后台模式一旦入队,就视为插件已经“认领”这条消息:
# 1. 这样可以避免后续插件继续命中同一条命令,造成重复回复;
# 2. 该模式适合命令式、排他式的长任务插件;
# 3. 如果未来需要“后台跑但不拦截”的能力,再单独扩展第三种分发模式。
return True
except asyncio.TimeoutError as timeout_error:
protection_policy = self._build_message_plugin_protection_policy(plugin)
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="timeout",
error_message=(
f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=protection_policy["process_timeout_seconds"],
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=timeout_error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 执行超时timeout={protection_policy['process_timeout_seconds']}s "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
try:
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
process_time_ms = self._record_plugin_execution_success(
plugin=plugin,
msg=msg,
command_name=command_name,
started_at=started_at,
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms}"
)
)
return True
except asyncio.TimeoutError as timeout_error:
self._handle_plugin_execution_timeout(
plugin=plugin,
msg=msg,
command_name=command_name,
timeout_error=timeout_error,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
self._handle_plugin_execution_exception(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
protection_policy = self._build_message_plugin_protection_policy(plugin)
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="error",
error_message=str(e),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=0,
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 处理消息失败: {e} "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
f"插件 {plugin.name} 处理失败: {e}"
)
)
@@ -855,11 +869,216 @@ class Robot:
)
return {
"process_timeout_seconds": max(10, min(int(resolved_timeout), 180)),
# 这里把上限放宽到 1800 秒:
# 1. 用户已经有 200 秒级别的长任务插件,原先 180 秒硬上限会被无条件截断;
# 2. 现在是否“长时间占住前台消息槽位”主要由 dispatch_mode 决定,而不是靠硬砍超时;
# 3. 仍然保留上限,避免误配置成无限等待把整个系统拖成不可控状态。
"process_timeout_seconds": max(10, min(int(resolved_timeout), 1800)),
"failure_threshold": max(2, min(int(failure_threshold), 10)),
"circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)),
}
def _resolve_message_plugin_dispatch_mode(self, plugin, plugin_msg: dict) -> str:
"""解析插件当前消息的分发模式。"""
try:
if hasattr(plugin, "get_message_dispatch_mode"):
return plugin.get_message_dispatch_mode(plugin_msg)
except Exception as e:
self.LOG.warning(
self._trace_message(
plugin_msg.get("full_wx_msg"),
f"读取插件分发模式失败,已回退 sync: plugin={plugin.name}, error={e}"
)
)
return "sync"
def _schedule_background_plugin_execution(
self,
*,
plugin,
plugin_msg: dict,
msg: WxMessage,
command_name: str,
protection_policy: dict,
) -> None:
"""把插件执行转入后台任务池。"""
task_name = f"plugin-bg:{plugin.name}:{self._get_trace_id(msg) or 'no-trace'}"
task = asyncio.create_task(
self._run_background_plugin_execution(
plugin=plugin,
plugin_msg=plugin_msg,
msg=msg,
command_name=command_name,
protection_policy=protection_policy,
),
name=task_name,
)
self.background_plugin_tasks.add(task)
task.add_done_callback(self.background_plugin_tasks.discard)
async def _run_background_plugin_execution(
self,
*,
plugin,
plugin_msg: dict,
msg: WxMessage,
command_name: str,
protection_policy: dict,
) -> None:
"""在独立后台任务池中执行长任务插件。"""
trace_token = set_current_trace_id(self._get_trace_id(msg))
queued_started_at = time.perf_counter()
try:
async with self.background_plugin_semaphore:
started_at = time.perf_counter()
queue_wait_ms = round((started_at - queued_started_at) * 1000, 2)
try:
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
process_time_ms = self._record_plugin_execution_success(
plugin=plugin,
msg=msg,
command_name=command_name,
started_at=started_at,
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"后台插件完成 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}"
)
)
else:
self.LOG.warning(
self._trace_message(
msg,
f"后台插件执行完成但未拦截消息 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}"
)
)
except asyncio.TimeoutError as timeout_error:
self._handle_plugin_execution_timeout(
plugin=plugin,
msg=msg,
command_name=command_name,
timeout_error=timeout_error,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
self._handle_plugin_execution_exception(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
started_at=started_at,
protection_policy=protection_policy,
)
finally:
reset_current_trace_id(trace_token)
def _record_plugin_execution_success(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
started_at: float,
) -> float:
"""统一记录插件成功执行结果,并返回耗时。"""
process_time_ms = self._elapsed_ms(started_at)
self.plugin_manager.record_plugin_execution_success(
plugin,
process_time_ms=process_time_ms,
)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=process_time_ms,
)
return process_time_ms
def _handle_plugin_execution_timeout(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
timeout_error: Exception,
started_at: float,
protection_policy: dict,
) -> None:
"""统一处理插件执行超时。"""
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="timeout",
error_message=(
f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=protection_policy["process_timeout_seconds"],
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=timeout_error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 执行超时timeout={protection_policy['process_timeout_seconds']}s "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
def _handle_plugin_execution_exception(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
error: Exception,
started_at: float,
protection_policy: dict,
) -> None:
"""统一处理插件执行异常。"""
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="error",
error_message=str(error),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=0,
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 处理消息失败: {error} "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,