Revert "为长任务插件接入后台任务模式"

This reverts commit 0d1362f97e.
This commit is contained in:
Liu
2026-05-01 12:12:54 +08:00
parent b59b61dade
commit a9c3518b4a
7 changed files with 78 additions and 351 deletions

View File

@@ -7,20 +7,6 @@ from utils.robot_cmd.robot_command import Feature
class MessagePluginInterface(PluginInterface):
"""消息处理插件接口"""
@staticmethod
def normalize_message_dispatch_mode(raw_mode: Any) -> str:
"""把插件声明的消息分发模式标准化为 `sync` 或 `background`。
设计说明:
1. `sync` 表示沿用当前主链路同步执行,插件会占用当前消息处理协程直到完成;
2. `background` 表示命中后立即转入后台任务池,主消息链路尽快释放,不再占用前台并发槽位;
3. 这里集中做别名兼容,后续插件只需要写 `background/async/queue` 这类语义值即可。
"""
mode = str(raw_mode or "").strip().lower()
if mode in {"background", "async", "queued", "queue", "detached"}:
return "background"
return "sync"
@property
def command_prefix(self) -> Optional[str]:
"""命令前缀,如 '/'"""
@@ -87,20 +73,6 @@ class MessagePluginInterface(PluginInterface):
"""
raise NotImplementedError("子类必须实现此方法")
def get_message_dispatch_mode(self, message: Dict[str, Any]) -> str:
"""返回当前消息应采用的执行模式。
默认行为:
1. 优先读取插件自身 `config.toml` 里的 `[runtime] message_dispatch_mode`
2. 若未配置,则回退为 `sync`,保持历史行为不变;
3. 长任务插件如果需要“按命令动态切换前后台”,可以在子类中覆盖本方法。
"""
plugin_config = getattr(self, "_config", {}) or {}
runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {}
runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
raw_mode = runtime_config.get("message_dispatch_mode") or runtime_config.get("dispatch_mode") or "sync"
return self.normalize_message_dispatch_mode(raw_mode)
# ---------------- 插件定时调度能力(可选实现) ----------------
def get_schedule_actions(self) -> List[Dict[str, Any]]:
"""返回插件支持的可调度动作定义列表。

View File

@@ -1,13 +1,6 @@
environment: "${ABOT_ENVIRONMENT:development}"
plugin_dir: "${ABOT_PLUGIN_DIR:plugins}"
runtime:
# 后台长任务插件专用并发池大小:
# 1. 当前前台消息处理仍然保留固定 20 并发槽位;
# 2. 配置为 background 的长任务插件会转入这个独立池执行,不再长期占住前台槽位;
# 3. 如果模型/截图/报表类任务较多,可以适当调大;若机器较弱,建议保守一些。
background_plugin_max_concurrency: "${ABOT_BACKGROUND_PLUGIN_MAX_CONCURRENCY:6}"
db_config:
pool_name: "${ABOT_DB_POOL_NAME:wechat_boot_pool}"
pool_size: "${ABOT_DB_POOL_SIZE:10}"

View File

@@ -36,11 +36,3 @@ legacy_model = "turbo"
[AIGenImage.llm]
scene = "image.generate"
image_endpoint = "images/generations"
[runtime]
# AI 绘图属于长耗时命令:
# 1. 上游模型生成、下载与落盘都可能持续几十秒到数分钟;
# 2. 这里启用 background让它在后台任务池里跑避免长期占住前台消息并发槽位
# 3. 超时时间放宽到 7 分钟,兼容慢模型或高峰期图像生成延迟。
message_dispatch_mode = "background"
plugin_process_timeout_seconds = 420

View File

@@ -26,11 +26,3 @@ summary_image_template_path = "plugins/message_summary/templates/gemini_summary_
template_viewport_width = 580
template_viewport_height = 960
template_device_scale_factor = 2.0
[runtime]
# 群总结属于典型长任务:
# 1. 生成期间会查库、调 LLM、渲染图片整体耗时明显高于普通命令
# 2. 因此这里改为 background让命令命中后转入后台任务池不再占住前台 20 个消息处理槽位;
# 3. 同时把插件总超时放宽到 10 分钟,避免长总结被全局保护策略过早截断。
message_dispatch_mode = "background"
plugin_process_timeout_seconds = 600

View File

@@ -4,7 +4,7 @@ command = ["菜单", "功能菜单"]
# 菜单输出模式:
# - text发送文本菜单历史行为
# - image先用 md2image 将 Markdown 渲染为图片后发送
output_mode = "text"
output_mode = "image"
# 图片生成失败时是否回退文本菜单:
# - false严格按图片模式不发送完整菜单文本
# - true优先保证可达失败后改发文本

View File

@@ -94,14 +94,11 @@ class RobotMenuPlugin(MessagePluginInterface):
self.sync_send_timeout_seconds = int(
self._config.get("RobotMenu", {}).get("sync_send_timeout_seconds", 18)
)
# 只有在图片模式下,菜单插件才需要声明更紧的外层超时预算
# 1. 文本模式本身非常快,没有必要覆盖全局默认超时
# 2. 图片模式下才需要给“渲染 + 文本回退”预留一个略大于内层预算的缓冲
# 3. 这样默认文本模式下不会额外引入无意义的超时特化逻辑
if RobotMenuRenderTool.normalize_output_mode(output_mode) == "image":
self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8)
elif hasattr(self, "plugin_process_timeout_seconds"):
delattr(self, "plugin_process_timeout_seconds")
# 对外层插件保护显式声明一个更合适的总超时
# 1. 内层菜单发送会在 sync_send_timeout_seconds 内决定“成功发图 / 回退文本 / 返回失败提示”
# 2. 外层 wait_for 必须比内层稍长,给降级发送文本留出缓冲;
# 3. 这样可以避免过去“内外层都卡在 55 秒”时,外层先打断,导致降级逻辑来不及执行
self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8)
# 菜单图片模板文件路径(相对仓库根目录):
# 调整样式和布局时只改模板,不改 Python 逻辑。
self.image_template_path = str(

363
robot.py
View File

@@ -36,7 +36,7 @@ from utils.trace_context import set_current_trace_id, reset_current_trace_id
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import WxMessage, MessageType
# 定义前台消息处理信号量,限制最多 20 条消息同时进入主处理链路
# 定义全局信号量,限制最大并发 10
sem = asyncio.Semaphore(20)
@@ -144,18 +144,6 @@ class Robot:
# 通过类属性设置 admin_list而不是实例属性
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
self.recent_msg_ids = deque(maxlen=20)
# 长任务插件需要与“前台消息并发槽位”解耦:
# 1. 当前 `sem=20` 保护的是“正在处理消息的协程数”,并不适合被 200 秒级长任务长期占住;
# 2. 因此这里额外引入一个后台插件任务池,让长任务在独立并发池里运行;
# 3. 这样消息接收与轻量命令仍然能继续流动,后台长任务则在单独池子里排队执行。
runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {}
runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
self.background_plugin_max_concurrency = self._safe_positive_int(
runtime_config.get("background_plugin_max_concurrency"),
6,
)
self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency)
self.background_plugin_tasks = set()
def apply_runtime_config(self, reload_catalog: bool = False) -> None:
"""把最新全局配置应用到当前运行中的关键对象。
@@ -176,18 +164,6 @@ class Robot:
# 管理员列表走 GroupBotManager 的类级缓存;只 reload Config 不会自动回写到这里。
GroupBotManager.admin_list = self.config.wx_config.get("admin", [])
# 后台插件任务池允许从全局配置热刷新:
# 1. 新值只影响后续新入队的后台任务,不会粗暴中断已在运行中的长任务;
# 2. 这里直接替换为新的 Semaphore逻辑简单且足够符合“后续生效”的预期
# 3. 若后续需要做到“平滑缩容”,再在这里补更细的迁移策略。
runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {}
runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
self.background_plugin_max_concurrency = self._safe_positive_int(
runtime_config.get("background_plugin_max_concurrency"),
self.background_plugin_max_concurrency,
)
self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency)
# system_context 中保存的是 config 对象引用reload 后插件读取到的是最新字段。
# 但 LLMRegistry 自己还有一层短 TTL 缓存,因此保存全局 LLM 配置后需要显式清掉。
if reload_catalog:
@@ -198,8 +174,7 @@ class Robot:
"运行时配置已应用: "
f"admin_count={len(GroupBotManager.admin_list)}, "
f"email_sender={'ready' if self.email_sender else 'missing'}, "
f"llm_cache_reloaded={reload_catalog}, "
f"background_plugin_max_concurrency={self.background_plugin_max_concurrency}"
f"llm_cache_reloaded={reload_catalog}"
)
def _cleanup_migrated_system_jobs(self):
@@ -693,73 +668,84 @@ class Robot:
)
continue
dispatch_mode = self._resolve_message_plugin_dispatch_mode(plugin, plugin_msg)
if dispatch_mode == "background":
self._schedule_background_plugin_execution(
plugin=plugin,
plugin_msg=plugin_msg,
msg=msg,
command_name=command_name,
protection_policy=protection_policy,
)
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
self.plugin_manager.record_plugin_execution_success(
plugin,
process_time_ms=self._elapsed_ms(started_at),
)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=self._elapsed_ms(started_at),
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"插件后台排队 plugin={plugin.name} command={command_name} "
f"timeout={protection_policy['process_timeout_seconds']}s "
f"background_pool={self.background_plugin_max_concurrency} "
f"pending={len(self.background_plugin_tasks)}"
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={self._elapsed_ms(started_at)}"
)
)
# 后台模式一旦入队,就视为插件已经“认领”这条消息:
# 1. 这样可以避免后续插件继续命中同一条命令,造成重复回复;
# 2. 该模式适合命令式、排他式的长任务插件;
# 3. 如果未来需要“后台跑但不拦截”的能力,再单独扩展第三种分发模式。
return True
try:
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
process_time_ms = self._record_plugin_execution_success(
plugin=plugin,
msg=msg,
command_name=command_name,
started_at=started_at,
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"插件命中 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms}"
)
)
return True
except asyncio.TimeoutError as timeout_error:
self._handle_plugin_execution_timeout(
plugin=plugin,
msg=msg,
command_name=command_name,
timeout_error=timeout_error,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
self._handle_plugin_execution_exception(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
except asyncio.TimeoutError as timeout_error:
protection_policy = self._build_message_plugin_protection_policy(plugin)
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="timeout",
error_message=(
f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=protection_policy["process_timeout_seconds"],
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=timeout_error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 预处理失败: {e}"
f"插件 {plugin.name} 执行超时timeout={protection_policy['process_timeout_seconds']}s "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
except Exception as e:
protection_policy = self._build_message_plugin_protection_policy(plugin)
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="error",
error_message=str(e),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=0,
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 处理消息失败: {e} "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
@@ -869,216 +855,11 @@ class Robot:
)
return {
# 这里把上限放宽到 1800 秒:
# 1. 用户已经有 200 秒级别的长任务插件,原先 180 秒硬上限会被无条件截断;
# 2. 现在是否“长时间占住前台消息槽位”主要由 dispatch_mode 决定,而不是靠硬砍超时;
# 3. 仍然保留上限,避免误配置成无限等待把整个系统拖成不可控状态。
"process_timeout_seconds": max(10, min(int(resolved_timeout), 1800)),
"process_timeout_seconds": max(10, min(int(resolved_timeout), 180)),
"failure_threshold": max(2, min(int(failure_threshold), 10)),
"circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)),
}
def _resolve_message_plugin_dispatch_mode(self, plugin, plugin_msg: dict) -> str:
"""解析插件当前消息的分发模式。"""
try:
if hasattr(plugin, "get_message_dispatch_mode"):
return plugin.get_message_dispatch_mode(plugin_msg)
except Exception as e:
self.LOG.warning(
self._trace_message(
plugin_msg.get("full_wx_msg"),
f"读取插件分发模式失败,已回退 sync: plugin={plugin.name}, error={e}"
)
)
return "sync"
def _schedule_background_plugin_execution(
self,
*,
plugin,
plugin_msg: dict,
msg: WxMessage,
command_name: str,
protection_policy: dict,
) -> None:
"""把插件执行转入后台任务池。"""
task_name = f"plugin-bg:{plugin.name}:{self._get_trace_id(msg) or 'no-trace'}"
task = asyncio.create_task(
self._run_background_plugin_execution(
plugin=plugin,
plugin_msg=plugin_msg,
msg=msg,
command_name=command_name,
protection_policy=protection_policy,
),
name=task_name,
)
self.background_plugin_tasks.add(task)
task.add_done_callback(self.background_plugin_tasks.discard)
async def _run_background_plugin_execution(
self,
*,
plugin,
plugin_msg: dict,
msg: WxMessage,
command_name: str,
protection_policy: dict,
) -> None:
"""在独立后台任务池中执行长任务插件。"""
trace_token = set_current_trace_id(self._get_trace_id(msg))
queued_started_at = time.perf_counter()
try:
async with self.background_plugin_semaphore:
started_at = time.perf_counter()
queue_wait_ms = round((started_at - queued_started_at) * 1000, 2)
try:
processed, _ = await asyncio.wait_for(
plugin.process_message(plugin_msg),
timeout=protection_policy["process_timeout_seconds"],
)
process_time_ms = self._record_plugin_execution_success(
plugin=plugin,
msg=msg,
command_name=command_name,
started_at=started_at,
)
if processed:
self.LOG.info(
self._trace_message(
msg,
f"后台插件完成 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}"
)
)
else:
self.LOG.warning(
self._trace_message(
msg,
f"后台插件执行完成但未拦截消息 plugin={plugin.name} command={command_name} "
f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}"
)
)
except asyncio.TimeoutError as timeout_error:
self._handle_plugin_execution_timeout(
plugin=plugin,
msg=msg,
command_name=command_name,
timeout_error=timeout_error,
started_at=started_at,
protection_policy=protection_policy,
)
except Exception as e:
self._handle_plugin_execution_exception(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
started_at=started_at,
protection_policy=protection_policy,
)
finally:
reset_current_trace_id(trace_token)
def _record_plugin_execution_success(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
started_at: float,
) -> float:
"""统一记录插件成功执行结果,并返回耗时。"""
process_time_ms = self._elapsed_ms(started_at)
self.plugin_manager.record_plugin_execution_success(
plugin,
process_time_ms=process_time_ms,
)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
command_name=command_name,
# 这里把“无异常执行完成”视为统计意义上的成功:
# 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”;
# 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低;
# 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。
process_result=True,
process_time_ms=process_time_ms,
)
return process_time_ms
def _handle_plugin_execution_timeout(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
timeout_error: Exception,
started_at: float,
protection_policy: dict,
) -> None:
"""统一处理插件执行超时。"""
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="timeout",
error_message=(
f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=protection_policy["process_timeout_seconds"],
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=timeout_error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 执行超时timeout={protection_policy['process_timeout_seconds']}s "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
def _handle_plugin_execution_exception(
self,
*,
plugin,
msg: WxMessage,
command_name: str,
error: Exception,
started_at: float,
protection_policy: dict,
) -> None:
"""统一处理插件执行异常。"""
failure_record = self.plugin_manager.record_plugin_execution_failure(
plugin,
failure_type="error",
error_message=str(error),
process_time_ms=self._elapsed_ms(started_at),
timeout_seconds=0,
failure_threshold=protection_policy["failure_threshold"],
recovery_seconds=protection_policy["circuit_recovery_seconds"],
)
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=error,
)
self.LOG.error(
self._trace_message(
msg,
f"插件 {plugin.name} 处理消息失败: {error} "
f"circuit_state={failure_record.get('circuit_state')} "
f"consecutive_failures={failure_record.get('consecutive_failures')}"
)
)
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,