From e75fe04b775ad4689d1da6eefb063d465a46bccf Mon Sep 17 00:00:00 2001 From: Liu Date: Fri, 1 May 2026 12:45:34 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"=E4=B8=BA=E9=95=BF=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E6=8E=A5=E5=85=A5=E5=90=8E=E5=8F=B0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=A8=A1=E5=BC=8F"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 0d1362f97e96c7ca25c017faf1a24a13f1df0e45. --- .../plugin_common/message_plugin_interface.py | 28 -- config.yaml | 7 - plugins/ai_gen_image/config.toml | 8 - plugins/message_summary/config.toml | 8 - plugins/robot_menu/config.toml | 2 +- plugins/robot_menu/main.py | 13 +- robot.py | 363 ++++-------------- 7 files changed, 78 insertions(+), 351 deletions(-) diff --git a/base/plugin_common/message_plugin_interface.py b/base/plugin_common/message_plugin_interface.py index 102bc63..aa71579 100644 --- a/base/plugin_common/message_plugin_interface.py +++ b/base/plugin_common/message_plugin_interface.py @@ -7,20 +7,6 @@ from utils.robot_cmd.robot_command import Feature class MessagePluginInterface(PluginInterface): """消息处理插件接口""" - @staticmethod - def normalize_message_dispatch_mode(raw_mode: Any) -> str: - """把插件声明的消息分发模式标准化为 `sync` 或 `background`。 - - 设计说明: - 1. `sync` 表示沿用当前主链路同步执行,插件会占用当前消息处理协程直到完成; - 2. `background` 表示命中后立即转入后台任务池,主消息链路尽快释放,不再占用前台并发槽位; - 3. 这里集中做别名兼容,后续插件只需要写 `background/async/queue` 这类语义值即可。 - """ - mode = str(raw_mode or "").strip().lower() - if mode in {"background", "async", "queued", "queue", "detached"}: - return "background" - return "sync" - @property def command_prefix(self) -> Optional[str]: """命令前缀,如 '/'""" @@ -87,20 +73,6 @@ class MessagePluginInterface(PluginInterface): """ raise NotImplementedError("子类必须实现此方法") - def get_message_dispatch_mode(self, message: Dict[str, Any]) -> str: - """返回当前消息应采用的执行模式。 - - 默认行为: - 1. 优先读取插件自身 `config.toml` 里的 `[runtime] message_dispatch_mode`; - 2. 若未配置,则回退为 `sync`,保持历史行为不变; - 3. 长任务插件如果需要“按命令动态切换前后台”,可以在子类中覆盖本方法。 - """ - plugin_config = getattr(self, "_config", {}) or {} - runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {} - runtime_config = runtime_config if isinstance(runtime_config, dict) else {} - raw_mode = runtime_config.get("message_dispatch_mode") or runtime_config.get("dispatch_mode") or "sync" - return self.normalize_message_dispatch_mode(raw_mode) - # ---------------- 插件定时调度能力(可选实现) ---------------- def get_schedule_actions(self) -> List[Dict[str, Any]]: """返回插件支持的可调度动作定义列表。 diff --git a/config.yaml b/config.yaml index 42c18dd..3b60420 100644 --- a/config.yaml +++ b/config.yaml @@ -1,13 +1,6 @@ environment: "${ABOT_ENVIRONMENT:development}" plugin_dir: "${ABOT_PLUGIN_DIR:plugins}" -runtime: - # 后台长任务插件专用并发池大小: - # 1. 当前前台消息处理仍然保留固定 20 并发槽位; - # 2. 配置为 background 的长任务插件会转入这个独立池执行,不再长期占住前台槽位; - # 3. 如果模型/截图/报表类任务较多,可以适当调大;若机器较弱,建议保守一些。 - background_plugin_max_concurrency: "${ABOT_BACKGROUND_PLUGIN_MAX_CONCURRENCY:6}" - db_config: pool_name: "${ABOT_DB_POOL_NAME:wechat_boot_pool}" pool_size: "${ABOT_DB_POOL_SIZE:10}" diff --git a/plugins/ai_gen_image/config.toml b/plugins/ai_gen_image/config.toml index 7742419..653246c 100644 --- a/plugins/ai_gen_image/config.toml +++ b/plugins/ai_gen_image/config.toml @@ -36,11 +36,3 @@ legacy_model = "turbo" [AIGenImage.llm] scene = "image.generate" image_endpoint = "images/generations" - -[runtime] -# AI 绘图属于长耗时命令: -# 1. 上游模型生成、下载与落盘都可能持续几十秒到数分钟; -# 2. 这里启用 background,让它在后台任务池里跑,避免长期占住前台消息并发槽位; -# 3. 超时时间放宽到 7 分钟,兼容慢模型或高峰期图像生成延迟。 -message_dispatch_mode = "background" -plugin_process_timeout_seconds = 420 diff --git a/plugins/message_summary/config.toml b/plugins/message_summary/config.toml index fa94019..52a0763 100644 --- a/plugins/message_summary/config.toml +++ b/plugins/message_summary/config.toml @@ -26,11 +26,3 @@ summary_image_template_path = "plugins/message_summary/templates/gemini_summary_ template_viewport_width = 580 template_viewport_height = 960 template_device_scale_factor = 2.0 - -[runtime] -# 群总结属于典型长任务: -# 1. 生成期间会查库、调 LLM、渲染图片,整体耗时明显高于普通命令; -# 2. 因此这里改为 background,让命令命中后转入后台任务池,不再占住前台 20 个消息处理槽位; -# 3. 同时把插件总超时放宽到 10 分钟,避免长总结被全局保护策略过早截断。 -message_dispatch_mode = "background" -plugin_process_timeout_seconds = 600 diff --git a/plugins/robot_menu/config.toml b/plugins/robot_menu/config.toml index d632971..8301164 100644 --- a/plugins/robot_menu/config.toml +++ b/plugins/robot_menu/config.toml @@ -4,7 +4,7 @@ command = ["菜单", "功能菜单"] # 菜单输出模式: # - text:发送文本菜单(历史行为) # - image:先用 md2image 将 Markdown 渲染为图片后发送 -output_mode = "text" +output_mode = "image" # 图片生成失败时是否回退文本菜单: # - false:严格按图片模式,不发送完整菜单文本 # - true:优先保证可达,失败后改发文本 diff --git a/plugins/robot_menu/main.py b/plugins/robot_menu/main.py index 8f4720d..66b4cfe 100644 --- a/plugins/robot_menu/main.py +++ b/plugins/robot_menu/main.py @@ -94,14 +94,11 @@ class RobotMenuPlugin(MessagePluginInterface): self.sync_send_timeout_seconds = int( self._config.get("RobotMenu", {}).get("sync_send_timeout_seconds", 18) ) - # 只有在图片模式下,菜单插件才需要声明更紧的外层超时预算: - # 1. 文本模式本身非常快,没有必要覆盖全局默认超时; - # 2. 图片模式下才需要给“渲染 + 文本回退”预留一个略大于内层预算的缓冲区; - # 3. 这样默认文本模式下不会额外引入无意义的超时特化逻辑。 - if RobotMenuRenderTool.normalize_output_mode(output_mode) == "image": - self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8) - elif hasattr(self, "plugin_process_timeout_seconds"): - delattr(self, "plugin_process_timeout_seconds") + # 对外层插件保护显式声明一个更合适的总超时: + # 1. 内层菜单发送会在 sync_send_timeout_seconds 内决定“成功发图 / 回退文本 / 返回失败提示”; + # 2. 外层 wait_for 必须比内层稍长,给降级发送文本留出缓冲; + # 3. 这样可以避免过去“内外层都卡在 55 秒”时,外层先打断,导致降级逻辑来不及执行。 + self.plugin_process_timeout_seconds = max(12, self.sync_send_timeout_seconds + 8) # 菜单图片模板文件路径(相对仓库根目录): # 调整样式和布局时只改模板,不改 Python 逻辑。 self.image_template_path = str( diff --git a/robot.py b/robot.py index e5b007e..87e5d3a 100644 --- a/robot.py +++ b/robot.py @@ -36,7 +36,7 @@ from utils.trace_context import set_current_trace_id, reset_current_trace_id from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType -# 定义前台消息处理信号量,限制最多 20 条消息同时进入主处理链路 +# 定义全局信号量,限制最大并发 10 sem = asyncio.Semaphore(20) @@ -144,18 +144,6 @@ class Robot: # 通过类属性设置 admin_list,而不是实例属性 GroupBotManager.admin_list = self.config.wx_config.get("admin", []) self.recent_msg_ids = deque(maxlen=20) - # 长任务插件需要与“前台消息并发槽位”解耦: - # 1. 当前 `sem=20` 保护的是“正在处理消息的协程数”,并不适合被 200 秒级长任务长期占住; - # 2. 因此这里额外引入一个后台插件任务池,让长任务在独立并发池里运行; - # 3. 这样消息接收与轻量命令仍然能继续流动,后台长任务则在单独池子里排队执行。 - runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {} - runtime_config = runtime_config if isinstance(runtime_config, dict) else {} - self.background_plugin_max_concurrency = self._safe_positive_int( - runtime_config.get("background_plugin_max_concurrency"), - 6, - ) - self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency) - self.background_plugin_tasks = set() def apply_runtime_config(self, reload_catalog: bool = False) -> None: """把最新全局配置应用到当前运行中的关键对象。 @@ -176,18 +164,6 @@ class Robot: # 管理员列表走 GroupBotManager 的类级缓存;只 reload Config 不会自动回写到这里。 GroupBotManager.admin_list = self.config.wx_config.get("admin", []) - # 后台插件任务池允许从全局配置热刷新: - # 1. 新值只影响后续新入队的后台任务,不会粗暴中断已在运行中的长任务; - # 2. 这里直接替换为新的 Semaphore,逻辑简单且足够符合“后续生效”的预期; - # 3. 若后续需要做到“平滑缩容”,再在这里补更细的迁移策略。 - runtime_config = self.config.resolved_config.get("runtime", {}) if isinstance(self.config.resolved_config, dict) else {} - runtime_config = runtime_config if isinstance(runtime_config, dict) else {} - self.background_plugin_max_concurrency = self._safe_positive_int( - runtime_config.get("background_plugin_max_concurrency"), - self.background_plugin_max_concurrency, - ) - self.background_plugin_semaphore = asyncio.Semaphore(self.background_plugin_max_concurrency) - # system_context 中保存的是 config 对象引用,reload 后插件读取到的是最新字段。 # 但 LLMRegistry 自己还有一层短 TTL 缓存,因此保存全局 LLM 配置后需要显式清掉。 if reload_catalog: @@ -198,8 +174,7 @@ class Robot: "运行时配置已应用: " f"admin_count={len(GroupBotManager.admin_list)}, " f"email_sender={'ready' if self.email_sender else 'missing'}, " - f"llm_cache_reloaded={reload_catalog}, " - f"background_plugin_max_concurrency={self.background_plugin_max_concurrency}" + f"llm_cache_reloaded={reload_catalog}" ) def _cleanup_migrated_system_jobs(self): @@ -693,73 +668,84 @@ class Robot: ) continue - dispatch_mode = self._resolve_message_plugin_dispatch_mode(plugin, plugin_msg) - if dispatch_mode == "background": - self._schedule_background_plugin_execution( - plugin=plugin, - plugin_msg=plugin_msg, - msg=msg, - command_name=command_name, - protection_policy=protection_policy, - ) + processed, _ = await asyncio.wait_for( + plugin.process_message(plugin_msg), + timeout=protection_policy["process_timeout_seconds"], + ) + self.plugin_manager.record_plugin_execution_success( + plugin, + process_time_ms=self._elapsed_ms(started_at), + ) + self._record_plugin_call_result( + plugin=plugin, + msg=msg, + command_name=command_name, + # 这里把“无异常执行完成”视为统计意义上的成功: + # 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”; + # 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低; + # 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。 + process_result=True, + process_time_ms=self._elapsed_ms(started_at), + ) + if processed: self.LOG.info( self._trace_message( msg, - f"插件后台排队 plugin={plugin.name} command={command_name} " - f"timeout={protection_policy['process_timeout_seconds']}s " - f"background_pool={self.background_plugin_max_concurrency} " - f"pending={len(self.background_plugin_tasks)}" + f"插件命中 plugin={plugin.name} command={command_name} " + f"cost_ms={self._elapsed_ms(started_at)}" ) ) - # 后台模式一旦入队,就视为插件已经“认领”这条消息: - # 1. 这样可以避免后续插件继续命中同一条命令,造成重复回复; - # 2. 该模式适合命令式、排他式的长任务插件; - # 3. 如果未来需要“后台跑但不拦截”的能力,再单独扩展第三种分发模式。 return True - - try: - processed, _ = await asyncio.wait_for( - plugin.process_message(plugin_msg), - timeout=protection_policy["process_timeout_seconds"], - ) - process_time_ms = self._record_plugin_execution_success( - plugin=plugin, - msg=msg, - command_name=command_name, - started_at=started_at, - ) - if processed: - self.LOG.info( - self._trace_message( - msg, - f"插件命中 plugin={plugin.name} command={command_name} " - f"cost_ms={process_time_ms}" - ) - ) - return True - except asyncio.TimeoutError as timeout_error: - self._handle_plugin_execution_timeout( - plugin=plugin, - msg=msg, - command_name=command_name, - timeout_error=timeout_error, - started_at=started_at, - protection_policy=protection_policy, - ) - except Exception as e: - self._handle_plugin_execution_exception( - plugin=plugin, - msg=msg, - command_name=command_name, - error=e, - started_at=started_at, - protection_policy=protection_policy, - ) - except Exception as e: + except asyncio.TimeoutError as timeout_error: + protection_policy = self._build_message_plugin_protection_policy(plugin) + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="timeout", + error_message=( + f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。" + ), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=protection_policy["process_timeout_seconds"], + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=timeout_error, + ) self.LOG.error( self._trace_message( msg, - f"插件 {plugin.name} 预处理失败: {e}" + f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" + ) + ) + except Exception as e: + protection_policy = self._build_message_plugin_protection_policy(plugin) + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="error", + error_message=str(e), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=0, + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=e, + ) + self.LOG.error( + self._trace_message( + msg, + f"插件 {plugin.name} 处理消息失败: {e} " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" ) ) @@ -869,216 +855,11 @@ class Robot: ) return { - # 这里把上限放宽到 1800 秒: - # 1. 用户已经有 200 秒级别的长任务插件,原先 180 秒硬上限会被无条件截断; - # 2. 现在是否“长时间占住前台消息槽位”主要由 dispatch_mode 决定,而不是靠硬砍超时; - # 3. 仍然保留上限,避免误配置成无限等待把整个系统拖成不可控状态。 - "process_timeout_seconds": max(10, min(int(resolved_timeout), 1800)), + "process_timeout_seconds": max(10, min(int(resolved_timeout), 180)), "failure_threshold": max(2, min(int(failure_threshold), 10)), "circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)), } - def _resolve_message_plugin_dispatch_mode(self, plugin, plugin_msg: dict) -> str: - """解析插件当前消息的分发模式。""" - try: - if hasattr(plugin, "get_message_dispatch_mode"): - return plugin.get_message_dispatch_mode(plugin_msg) - except Exception as e: - self.LOG.warning( - self._trace_message( - plugin_msg.get("full_wx_msg"), - f"读取插件分发模式失败,已回退 sync: plugin={plugin.name}, error={e}" - ) - ) - return "sync" - - def _schedule_background_plugin_execution( - self, - *, - plugin, - plugin_msg: dict, - msg: WxMessage, - command_name: str, - protection_policy: dict, - ) -> None: - """把插件执行转入后台任务池。""" - task_name = f"plugin-bg:{plugin.name}:{self._get_trace_id(msg) or 'no-trace'}" - task = asyncio.create_task( - self._run_background_plugin_execution( - plugin=plugin, - plugin_msg=plugin_msg, - msg=msg, - command_name=command_name, - protection_policy=protection_policy, - ), - name=task_name, - ) - self.background_plugin_tasks.add(task) - task.add_done_callback(self.background_plugin_tasks.discard) - - async def _run_background_plugin_execution( - self, - *, - plugin, - plugin_msg: dict, - msg: WxMessage, - command_name: str, - protection_policy: dict, - ) -> None: - """在独立后台任务池中执行长任务插件。""" - trace_token = set_current_trace_id(self._get_trace_id(msg)) - queued_started_at = time.perf_counter() - try: - async with self.background_plugin_semaphore: - started_at = time.perf_counter() - queue_wait_ms = round((started_at - queued_started_at) * 1000, 2) - try: - processed, _ = await asyncio.wait_for( - plugin.process_message(plugin_msg), - timeout=protection_policy["process_timeout_seconds"], - ) - process_time_ms = self._record_plugin_execution_success( - plugin=plugin, - msg=msg, - command_name=command_name, - started_at=started_at, - ) - if processed: - self.LOG.info( - self._trace_message( - msg, - f"后台插件完成 plugin={plugin.name} command={command_name} " - f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}" - ) - ) - else: - self.LOG.warning( - self._trace_message( - msg, - f"后台插件执行完成但未拦截消息 plugin={plugin.name} command={command_name} " - f"cost_ms={process_time_ms} queue_wait_ms={queue_wait_ms}" - ) - ) - except asyncio.TimeoutError as timeout_error: - self._handle_plugin_execution_timeout( - plugin=plugin, - msg=msg, - command_name=command_name, - timeout_error=timeout_error, - started_at=started_at, - protection_policy=protection_policy, - ) - except Exception as e: - self._handle_plugin_execution_exception( - plugin=plugin, - msg=msg, - command_name=command_name, - error=e, - started_at=started_at, - protection_policy=protection_policy, - ) - finally: - reset_current_trace_id(trace_token) - - def _record_plugin_execution_success( - self, - *, - plugin, - msg: WxMessage, - command_name: str, - started_at: float, - ) -> float: - """统一记录插件成功执行结果,并返回耗时。""" - process_time_ms = self._elapsed_ms(started_at) - self.plugin_manager.record_plugin_execution_success( - plugin, - process_time_ms=process_time_ms, - ) - self._record_plugin_call_result( - plugin=plugin, - msg=msg, - command_name=command_name, - # 这里把“无异常执行完成”视为统计意义上的成功: - # 1. 很多插件返回 False 只是表示“本次不拦截”或“异步排队后继续放行”; - # 2. 若直接把 processed=False 记成失败,会把成功率统计严重拉低; - # 3. 真正的失败已经会走异常分支,因此统计层这里按“未抛错即成功”更合理。 - process_result=True, - process_time_ms=process_time_ms, - ) - return process_time_ms - - def _handle_plugin_execution_timeout( - self, - *, - plugin, - msg: WxMessage, - command_name: str, - timeout_error: Exception, - started_at: float, - protection_policy: dict, - ) -> None: - """统一处理插件执行超时。""" - failure_record = self.plugin_manager.record_plugin_execution_failure( - plugin, - failure_type="timeout", - error_message=( - f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。" - ), - process_time_ms=self._elapsed_ms(started_at), - timeout_seconds=protection_policy["process_timeout_seconds"], - failure_threshold=protection_policy["failure_threshold"], - recovery_seconds=protection_policy["circuit_recovery_seconds"], - ) - self._record_plugin_call_error( - plugin=plugin, - msg=msg, - command_name=command_name, - error=timeout_error, - ) - self.LOG.error( - self._trace_message( - msg, - f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s " - f"circuit_state={failure_record.get('circuit_state')} " - f"consecutive_failures={failure_record.get('consecutive_failures')}" - ) - ) - - def _handle_plugin_execution_exception( - self, - *, - plugin, - msg: WxMessage, - command_name: str, - error: Exception, - started_at: float, - protection_policy: dict, - ) -> None: - """统一处理插件执行异常。""" - failure_record = self.plugin_manager.record_plugin_execution_failure( - plugin, - failure_type="error", - error_message=str(error), - process_time_ms=self._elapsed_ms(started_at), - timeout_seconds=0, - failure_threshold=protection_policy["failure_threshold"], - recovery_seconds=protection_policy["circuit_recovery_seconds"], - ) - self._record_plugin_call_error( - plugin=plugin, - msg=msg, - command_name=command_name, - error=error, - ) - self.LOG.error( - self._trace_message( - msg, - f"插件 {plugin.name} 处理消息失败: {error} " - f"circuit_state={failure_record.get('circuit_state')} " - f"consecutive_failures={failure_record.get('consecutive_failures')}" - ) - ) - def _get_stats_collector_plugin(self): """获取运行中的统计收集插件实例。""" # 统计插件已经从“事件订阅”切到“主链路直接回调”,