diff --git a/base/plugin_common/message_plugin_interface.py b/base/plugin_common/message_plugin_interface.py index aa71579..102bc63 100644 --- a/base/plugin_common/message_plugin_interface.py +++ b/base/plugin_common/message_plugin_interface.py @@ -7,6 +7,20 @@ from utils.robot_cmd.robot_command import Feature class MessagePluginInterface(PluginInterface): """消息处理插件接口""" + @staticmethod + def normalize_message_dispatch_mode(raw_mode: Any) -> str: + """把插件声明的消息分发模式标准化为 `sync` 或 `background`。 + + 设计说明: + 1. `sync` 表示沿用当前主链路同步执行,插件会占用当前消息处理协程直到完成; + 2. `background` 表示命中后立即转入后台任务池,主消息链路尽快释放,不再占用前台并发槽位; + 3. 这里集中做别名兼容,后续插件只需要写 `background/async/queue` 这类语义值即可。 + """ + mode = str(raw_mode or "").strip().lower() + if mode in {"background", "async", "queued", "queue", "detached"}: + return "background" + return "sync" + @property def command_prefix(self) -> Optional[str]: """命令前缀,如 '/'""" @@ -73,6 +87,20 @@ class MessagePluginInterface(PluginInterface): """ raise NotImplementedError("子类必须实现此方法") + def get_message_dispatch_mode(self, message: Dict[str, Any]) -> str: + """返回当前消息应采用的执行模式。 + + 默认行为: + 1. 优先读取插件自身 `config.toml` 里的 `[runtime] message_dispatch_mode`; + 2. 若未配置,则回退为 `sync`,保持历史行为不变; + 3. 长任务插件如果需要“按命令动态切换前后台”,可以在子类中覆盖本方法。 + """ + plugin_config = getattr(self, "_config", {}) or {} + runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {} + runtime_config = runtime_config if isinstance(runtime_config, dict) else {} + raw_mode = runtime_config.get("message_dispatch_mode") or runtime_config.get("dispatch_mode") or "sync" + return self.normalize_message_dispatch_mode(raw_mode) + # ---------------- 插件定时调度能力(可选实现) ---------------- def get_schedule_actions(self) -> List[Dict[str, Any]]: """返回插件支持的可调度动作定义列表。 diff --git a/config.yaml b/config.yaml index 3b60420..42c18dd 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,13 @@ environment: "${ABOT_ENVIRONMENT:development}" plugin_dir: "${ABOT_PLUGIN_DIR:plugins}" +runtime: + # 后台长任务插件专用并发池大小: + # 1. 当前前台消息处理仍然保留固定 20 并发槽位; + # 2. 配置为 background 的长任务插件会转入这个独立池执行,不再长期占住前台槽位; + # 3. 如果模型/截图/报表类任务较多,可以适当调大;若机器较弱,建议保守一些。 + background_plugin_max_concurrency: "${ABOT_BACKGROUND_PLUGIN_MAX_CONCURRENCY:6}" + db_config: pool_name: "${ABOT_DB_POOL_NAME:wechat_boot_pool}" pool_size: "${ABOT_DB_POOL_SIZE:10}" diff --git a/plugins/ai_gen_image/config.toml b/plugins/ai_gen_image/config.toml index 653246c..7742419 100644 --- a/plugins/ai_gen_image/config.toml +++ b/plugins/ai_gen_image/config.toml @@ -36,3 +36,11 @@ legacy_model = "turbo" [AIGenImage.llm] scene = "image.generate" image_endpoint = "images/generations" + +[runtime] +# AI 绘图属于长耗时命令: +# 1. 上游模型生成、下载与落盘都可能持续几十秒到数分钟; +# 2. 这里启用 background,让它在后台任务池里跑,避免长期占住前台消息并发槽位; +# 3. 超时时间放宽到 7 分钟,兼容慢模型或高峰期图像生成延迟。 +message_dispatch_mode = "background" +plugin_process_timeout_seconds = 420 diff --git a/plugins/message_summary/config.toml b/plugins/message_summary/config.toml index 52a0763..fa94019 100644 --- a/plugins/message_summary/config.toml +++ b/plugins/message_summary/config.toml @@ -26,3 +26,11 @@ summary_image_template_path = "plugins/message_summary/templates/gemini_summary_ template_viewport_width = 580 template_viewport_height = 960 template_device_scale_factor = 2.0 + +[runtime] +# 群总结属于典型长任务: +# 1. 生成期间会查库、调 LLM、渲染图片,整体耗时明显高于普通命令; +# 2. 因此这里改为 background,让命令命中后转入后台任务池,不再占住前台 20 个消息处理槽位; +# 3. 同时把插件总超时放宽到 10 分钟,避免长总结被全局保护策略过早截断。 +message_dispatch_mode = "background" +plugin_process_timeout_seconds = 600 diff --git a/plugins/robot_menu/config.toml b/plugins/robot_menu/config.toml index 8301164..d632971 100644 --- a/plugins/robot_menu/config.toml +++ b/plugins/robot_menu/config.toml @@ -4,7 +4,7 @@ command = ["菜单", "功能菜单"] # 菜单输出模式: # - text:发送文本菜单(历史行为) # - image:先用 md2image 将 Markdown 渲染为图片后发送 -output_mode = "image" +output_mode = "text" # 图片生成失败时是否回退文本菜单: # - false:严格按图片模式,不发送完整菜单文本 # - true:优先保证可达,失败后改发文本 diff --git a/plugins/robot_menu/main.py b/plugins/robot_menu/main.py index 66b4cfe..8f4720d 100644 --- a/plugins/robot_menu/main.py +++ b/plugins/robot_menu/main.py @@ -94,11 +94,14 @@ class RobotMenuPlugin(MessagePluginInterface): self.sync_send_timeout_seconds = int( self._config.get("RobotMenu", {}).get("sync_send_timeout_seconds", 18) ) - # 对外层插件保护显式声明一个更合适的总超时: - # 1. 内层菜单发送会在 sync_send_timeout_seconds 内决定“成功发图 / 回退文本 / 返回失败提示”; - # 2. 外层 wait_for 必须比内层稍长,给降级发送文本留出缓冲; - # 3. 这样可以避免过去“内外层都卡在 55 秒”时,外层先打断,导致降级逻辑来不及执行。 - self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8) + # 只有在图片模式下,菜单插件才需要声明更紧的外层超时预算: + # 1. 文本模式本身非常快,没有必要覆盖全局默认超时; + # 2. 图片模式下才需要给“渲染 + 文本回退”预留一个略大于内层预算的缓冲区; + # 3. 这样默认文本模式下不会额外引入无意义的超时特化逻辑。 + if RobotMenuRenderTool.normalize_output_mode(output_mode) == "image": + self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8) + elif hasattr(self, "plugin_process_timeout_seconds"): + delattr(self, "plugin_process_timeout_seconds") # 菜单图片模板文件路径(相对仓库根目录): # 调整样式和布局时只改模板,不改 Python 逻辑。 self.image_template_path = str( diff --git a/robot.py b/robot.py index 87e5d3a..e5b007e 100644 --- a/robot.py +++ b/robot.py @@ -36,7 +36,7 @@ from utils.trace_context import set_current_trace_id, reset_current_trace_id from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType -# 定义全局信号量,限制最大并发 10 +# 定义前台消息处理信号量,限制最多 20 条消息同时进入主处理链路 sem = asyncio.Semaphore(20) @@ -144,6 +144,18 @@ class Robot: # 通过类属性设置 admin_list,而不是实例属性 GroupBotManager.admin_list = self.config.wx_config.get("admin", []) self.recent_msg_ids = deque(maxlen=20) + # 长任务插件需要与“前台消息并发槽位”解耦: + # 1. 当前 `sem=20` 保护的是“正在处理消息的协程数”,并不适合被 200 秒级长任务长期占住; + # 2. 因此这里额外引入一个后台插件任务池,让长任务在独立并发池里运行; + # 3. 这样消息接收与轻量命令仍然能继续流动,后台长任务则在单独池子里排队执行。 + runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {} + runtime_config = runtime_config if isinstance(runtime_config, dict) else {} + self.background_plugin_max_concurrency = self._safe_positive_int( + runtime_config.get("background_plugin_max_concurrency"), + 6, + ) + self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency) + self.background_plugin_tasks = set() def apply_runtime_config(self, reload_catalog: bool = False) -> None: """把最新全局配置应用到当前运行中的关键对象。 @@ -164,6 +176,18 @@ class Robot: # 管理员列表走 GroupBotManager 的类级缓存;只 reload Config 不会自动回写到这里。 GroupBotManager.admin_list = self.config.wx_config.get("admin", []) + # 后台插件任务池允许从全局配置热刷新: + # 1. 新值只影响后续新入队的后台任务,不会粗暴中断已在运行中的长任务; + # 2. 这里直接替换为新的 Semaphore,逻辑简单且足够符合“后续生效”的预期; + # 3. 若后续需要做到“平滑缩容”,再在这里补更细的迁移策略。 + runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {} + runtime_config = runtime_config if isinstance(runtime_config, dict) else {} + self.background_plugin_max_concurrency = self._safe_positive_int( + runtime_config.get("background_plugin_max_concurrency"), + self.background_plugin_max_concurrency, + ) + self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency) + # system_context 中保存的是 config 对象引用,reload 后插件读取到的是最新字段。 # 但 LLMRegistry 自己还有一层短 TTL 缓存,因此保存全局 LLM 配置后需要显式清掉。 if reload_catalog: @@ -174,7 +198,8 @@ class Robot: "运行时配置已应用: " f"admin_count={len(GroupBotManager.admin_list)}, " f"email_sender={'ready' if self.email_sender else 'missing'}, " - f"llm_cache_reloaded={reload_catalog}" + f"llm_cache_reloaded={reload_catalog}, " + f"background_plugin_max_concurrency={self.background_plugin_max_concurrency}" ) def _cleanup_migrated_system_jobs(self): @@ -668,84 +693,73 @@ class Robot: ) continue - processed, _ = await asyncio.wait_for( - plugin.process_message(plugin_msg), - timeout=protection_policy["process_timeout_seconds"], - ) - self.plugin_manager.record_plugin_execution_success( - plugin, - process_time_ms=self._elapsed_ms(started_at), - ) - self._record_plugin_call_result( - plugin=plugin, - msg=msg, - command_name=command_name, - # 这里把“无异常执行完成”视为统计意义上的成功: - # 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”; - # 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低; - # 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。 - process_result=True, - process_time_ms=self._elapsed_ms(started_at), - ) - if processed: + dispatch_mode = self._resolve_message_plugin_dispatch_mode(plugin, plugin_msg) + if dispatch_mode == "background": + self._schedule_background_plugin_execution( + plugin=plugin, + plugin_msg=plugin_msg, + msg=msg, + command_name=command_name, + protection_policy=protection_policy, + ) self.LOG.info( self._trace_message( msg, - f"插件命中 plugin={plugin.name} command={command_name} " - f"cost_ms={self._elapsed_ms(started_at)}" + f"插件后台排队 plugin={plugin.name} command={command_name} " + f"timeout={protection_policy['process_timeout_seconds']}s " + f"background_pool={self.background_plugin_max_concurrency} " + f"pending={len(self.background_plugin_tasks)}" ) ) + # 后台模式一旦入队,就视为插件已经“认领”这条消息: + # 1. 这样可以避免后续插件继续命中同一条命令,造成重复回复; + # 2. 该模式适合命令式、排他式的长任务插件; + # 3. 如果未来需要“后台跑但不拦截”的能力,再单独扩展第三种分发模式。 return True - except asyncio.TimeoutError as timeout_error: - protection_policy = self._build_message_plugin_protection_policy(plugin) - failure_record = self.plugin_manager.record_plugin_execution_failure( - plugin, - failure_type="timeout", - error_message=( - f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。" - ), - process_time_ms=self._elapsed_ms(started_at), - timeout_seconds=protection_policy["process_timeout_seconds"], - failure_threshold=protection_policy["failure_threshold"], - recovery_seconds=protection_policy["circuit_recovery_seconds"], - ) - self._record_plugin_call_error( - plugin=plugin, - msg=msg, - command_name=command_name, - error=timeout_error, - ) - self.LOG.error( - self._trace_message( - msg, - f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s " - f"circuit_state={failure_record.get('circuit_state')} " - f"consecutive_failures={failure_record.get('consecutive_failures')}" - ) - ) + + try: + processed, _ = await asyncio.wait_for( + plugin.process_message(plugin_msg), + timeout=protection_policy["process_timeout_seconds"], + ) + process_time_ms = self._record_plugin_execution_success( + plugin=plugin, + msg=msg, + command_name=command_name, + started_at=started_at, + ) + if processed: + self.LOG.info( + self._trace_message( + msg, + f"插件命中 plugin={plugin.name} command={command_name} " + f"cost_ms={process_time_ms}" + ) + ) + return True + except asyncio.TimeoutError as timeout_error: + self._handle_plugin_execution_timeout( + plugin=plugin, + msg=msg, + command_name=command_name, + timeout_error=timeout_error, + started_at=started_at, + protection_policy=protection_policy, + ) + except Exception as e: + self._handle_plugin_execution_exception( + plugin=plugin, + msg=msg, + command_name=command_name, + error=e, + started_at=started_at, + protection_policy=protection_policy, + ) except Exception as e: - protection_policy = self._build_message_plugin_protection_policy(plugin) - failure_record = self.plugin_manager.record_plugin_execution_failure( - plugin, - failure_type="error", - error_message=str(e), - process_time_ms=self._elapsed_ms(started_at), - timeout_seconds=0, - failure_threshold=protection_policy["failure_threshold"], - recovery_seconds=protection_policy["circuit_recovery_seconds"], - ) - self._record_plugin_call_error( - plugin=plugin, - msg=msg, - command_name=command_name, - error=e, - ) self.LOG.error( self._trace_message( msg, - f"插件 {plugin.name} 处理消息失败: {e} " - f"circuit_state={failure_record.get('circuit_state')} " - f"consecutive_failures={failure_record.get('consecutive_failures')}" + f"插件 {plugin.name} 预处理失败: {e}" ) ) @@ -855,11 +869,216 @@ class Robot: ) return { - "process_timeout_seconds": max(10, min(int(resolved_timeout), 180)), + # 这里把上限放宽到 1800 秒: + # 1. 用户已经有 200 秒级别的长任务插件,原先 180 秒硬上限会被无条件截断; + # 2. 现在是否“长时间占住前台消息槽位”主要由 dispatch_mode 决定,而不是靠硬砍超时; + # 3. 仍然保留上限,避免误配置成无限等待把整个系统拖成不可控状态。 + "process_timeout_seconds": max(10, min(int(resolved_timeout), 1800)), "failure_threshold": max(2, min(int(failure_threshold), 10)), "circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)), } + def _resolve_message_plugin_dispatch_mode(self, plugin, plugin_msg: dict) -> str: + """解析插件当前消息的分发模式。""" + try: + if hasattr(plugin, "get_message_dispatch_mode"): + return plugin.get_message_dispatch_mode(plugin_msg) + except Exception as e: + self.LOG.warning( + self._trace_message( + plugin_msg.get("full_wx_msg"), + f"读取插件分发模式失败,已回退 sync: plugin={plugin.name}, error={e}" + ) + ) + return "sync" + + def _schedule_background_plugin_execution( + self, + *, + plugin, + plugin_msg: dict, + msg: WxMessage, + command_name: str, + protection_policy: dict, + ) -> None: + """把插件执行转入后台任务池。""" + task_name = f"plugin-bg:{plugin.name}:{self._get_trace_id(msg) or 'no-trace'}" + task = asyncio.create_task( + self._run_background_plugin_execution( + plugin=plugin, + plugin_msg=plugin_msg, + msg=msg, + command_name=command_name, + protection_policy=protection_policy, + ), + name=task_name, + ) + self.background_plugin_tasks.add(task) + task.add_done_callback(self.background_plugin_tasks.discard) + + async def _run_background_plugin_execution( + self, + *, + plugin, + plugin_msg: dict, + msg: WxMessage, + command_name: str, + protection_policy: dict, + ) -> None: + """在独立后台任务池中执行长任务插件。""" + trace_token = set_current_trace_id(self._get_trace_id(msg)) + queued_started_at = time.perf_counter() + try: + async with self.background_plugin_semaphore: + started_at = time.perf_counter() + queue_wait_ms = round((started_at - queued_started_at) * 1000, 2) + try: + processed, _ = await asyncio.wait_for( + plugin.process_message(plugin_msg), + timeout=protection_policy["process_timeout_seconds"], + ) + process_time_ms = self._record_plugin_execution_success( + plugin=plugin, + msg=msg, + command_name=command_name, + started_at=started_at, + ) + if processed: + self.LOG.info( + self._trace_message( + msg, + f"后台插件完成 plugin={plugin.name} command={command_name} " + f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}" + ) + ) + else: + self.LOG.warning( + self._trace_message( + msg, + f"后台插件执行完成但未拦截消息 plugin={plugin.name} command={command_name} " + f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}" + ) + ) + except asyncio.TimeoutError as timeout_error: + self._handle_plugin_execution_timeout( + plugin=plugin, + msg=msg, + command_name=command_name, + timeout_error=timeout_error, + started_at=started_at, + protection_policy=protection_policy, + ) + except Exception as e: + self._handle_plugin_execution_exception( + plugin=plugin, + msg=msg, + command_name=command_name, + error=e, + started_at=started_at, + protection_policy=protection_policy, + ) + finally: + reset_current_trace_id(trace_token) + + def _record_plugin_execution_success( + self, + *, + plugin, + msg: WxMessage, + command_name: str, + started_at: float, + ) -> float: + """统一记录插件成功执行结果,并返回耗时。""" + process_time_ms = self._elapsed_ms(started_at) + self.plugin_manager.record_plugin_execution_success( + plugin, + process_time_ms=process_time_ms, + ) + self._record_plugin_call_result( + plugin=plugin, + msg=msg, + command_name=command_name, + # 这里把“无异常执行完成”视为统计意义上的成功: + # 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”; + # 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低; + # 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。 + process_result=True, + process_time_ms=process_time_ms, + ) + return process_time_ms + + def _handle_plugin_execution_timeout( + self, + *, + plugin, + msg: WxMessage, + command_name: str, + timeout_error: Exception, + started_at: float, + protection_policy: dict, + ) -> None: + """统一处理插件执行超时。""" + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="timeout", + error_message=( + f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。" + ), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=protection_policy["process_timeout_seconds"], + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=timeout_error, + ) + self.LOG.error( + self._trace_message( + msg, + f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" + ) + ) + + def _handle_plugin_execution_exception( + self, + *, + plugin, + msg: WxMessage, + command_name: str, + error: Exception, + started_at: float, + protection_policy: dict, + ) -> None: + """统一处理插件执行异常。""" + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="error", + error_message=str(error), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=0, + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=error, + ) + self.LOG.error( + self._trace_message( + msg, + f"插件 {plugin.name} 处理消息失败: {error} " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" + ) + ) + def _get_stats_collector_plugin(self): """获取运行中的统计收集插件实例。""" # 统计插件已经从“事件订阅”切到“主链路直接回调”,