From 0d7fe5d6f0ae6091b148cfe19f0d03277e070b41 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 30 Apr 2026 16:15:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E6=8F=92=E4=BB=B6=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E4=BF=9D=E6=8A=A4=E4=B8=8E=E7=86=94=E6=96=AD=E6=81=A2?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 为消息插件执行增加统一超时保护,避免单插件长时间卡住消息主链路 - 增加连续失败熔断、冷却后半开探测与成功自动恢复逻辑 - 将插件执行保护状态接入治理快照与后台详情,便于查看连续失败和恢复剩余时间 - 更新工程优化文档,记录 7.2 第一阶段当前进展 --- admin/dashboard/templates/plugins_manage.html | 20 ++ base/plugin_common/plugin_manager.py | 250 ++++++++++++++++++ docs/工程优化与Feature清单.md | 7 + robot.py | 138 +++++++++- 4 files changed, 413 insertions(+), 2 deletions(-) diff --git a/admin/dashboard/templates/plugins_manage.html b/admin/dashboard/templates/plugins_manage.html index 678fd58..5021119 100644 --- a/admin/dashboard/templates/plugins_manage.html +++ b/admin/dashboard/templates/plugins_manage.html @@ -261,6 +261,26 @@ {% raw %}{{ `解析错误:${selectedPlugin.config_overview.parse_error}` }}{% endraw %} + +
+
+ 熔断状态 + {% raw %}{{ selectedPlugin.execution_guard.circuit_state || 'closed' }}{% endraw %} +
+
+ 连续失败 + {% raw %}{{ selectedPlugin.execution_guard.consecutive_failures || 0 }}{% endraw %} +
+
+ 连续超时 + {% raw %}{{ selectedPlugin.execution_guard.consecutive_timeouts || 0 }}{% endraw %} +
+
+ 恢复剩余 + {% raw %}{{ `${selectedPlugin.execution_guard.open_remaining_seconds || 0}s` }}{% endraw %} +
+
+
Dict[str, Any]: + """构建插件执行保护的默认记录。""" + return { + "circuit_state": "closed", + "consecutive_failures": 0, + "consecutive_timeouts": 0, + "failure_count_total": 0, + "timeout_count_total": 0, + "success_count_total": 0, + "last_failure_at": 0.0, + "last_success_at": 0.0, + "last_error_message": "", + "last_failure_type": "", + "last_process_time_ms": 0.0, + "last_timeout_seconds": 0, + "circuit_opened_at": 0.0, + "circuit_until": 0.0, + "half_open_in_flight": False, + } + + def _get_or_create_plugin_guard_record(self, module_name: str) -> Dict[str, Any]: + """读取或初始化插件执行保护记录。""" + if module_name not in self.plugin_guard_records: + self.plugin_guard_records[module_name] = self._build_default_guard_record() + return self.plugin_guard_records[module_name] + + def reset_plugin_guard_state(self, plugin_or_module_name) -> None: + """重置插件执行保护状态。""" + module_name = "" + if isinstance(plugin_or_module_name, PluginInterface): + module_name = self._get_module_name_from_plugin(plugin_or_module_name) or "" + else: + module_name = str(plugin_or_module_name or "").strip() + if not module_name: + return + + with self._guard_lock: + self.plugin_guard_records[module_name] = self._build_default_guard_record() + + def get_plugin_guard_snapshot(self, plugin_or_module_name) -> Dict[str, Any]: + """读取适合后台展示的插件执行保护快照。""" + module_name = "" + if isinstance(plugin_or_module_name, PluginInterface): + module_name = self._get_module_name_from_plugin(plugin_or_module_name) or "" + else: + module_name = str(plugin_or_module_name or "").strip() + + if not module_name: + return self._build_default_guard_record() + + with self._guard_lock: + record = dict(self._get_or_create_plugin_guard_record(module_name)) + + now_ts = time.time() + circuit_until = float(record.get("circuit_until") or 0.0) + open_remaining_seconds = max(0, int(circuit_until - now_ts)) if circuit_until > 0 else 0 + record["open_remaining_seconds"] = open_remaining_seconds + return record + + def try_acquire_plugin_execution( + self, + plugin: PluginInterface, + *, + recovery_seconds: int, + ) -> Dict[str, Any]: + """判断插件当前是否允许继续执行。""" + module_name = self._get_module_name_from_plugin(plugin) or plugin.name + now_ts = time.time() + + with self._guard_lock: + record = self._get_or_create_plugin_guard_record(module_name) + circuit_state = str(record.get("circuit_state", "closed") or "closed").strip().lower() + circuit_until = float(record.get("circuit_until") or 0.0) + recovery_seconds = max(int(recovery_seconds or 0), 30) + + # 熔断打开期间直接拒绝新流量: + # 1. 目标是尽快切断持续失败插件对主链路的影响; + # 2. 若冷却时间未到,则所有新消息都应跳过该插件; + # 3. 返回剩余秒数,便于日志和后台展示更可读。 + if circuit_state == "open" and circuit_until > now_ts: + return { + "allowed": False, + "reason": "circuit_open", + "open_remaining_seconds": max(0, int(circuit_until - now_ts)), + "circuit_state": circuit_state, + } + + # 熔断窗口已过,放一个半开探测请求: + # 1. 只允许一个探测请求进入,避免刚恢复时被瞬时并发打爆; + # 2. 成功则后续 record_success 会关闭熔断并清零计数; + # 3. 失败则 record_failure 会重新打开熔断窗口。 + if circuit_state == "open" and circuit_until <= now_ts: + record["circuit_state"] = "half_open" + record["half_open_in_flight"] = True + record["circuit_until"] = now_ts + recovery_seconds + return { + "allowed": True, + "reason": "half_open_probe", + "open_remaining_seconds": 0, + "circuit_state": "half_open", + } + + # 半开状态下,如果已经有探测请求在飞,就继续拒绝后续请求。 + if circuit_state == "half_open": + if bool(record.get("half_open_in_flight", False)): + return { + "allowed": False, + "reason": "half_open_probe_in_flight", + "open_remaining_seconds": max(0, int(float(record.get("circuit_until") or 0.0) - now_ts)), + "circuit_state": "half_open", + } + record["half_open_in_flight"] = True + return { + "allowed": True, + "reason": "half_open_probe", + "open_remaining_seconds": 0, + "circuit_state": "half_open", + } + + return { + "allowed": True, + "reason": "closed", + "open_remaining_seconds": 0, + "circuit_state": "closed", + } + + def record_plugin_execution_success( + self, + plugin: PluginInterface, + *, + process_time_ms: float, + ) -> None: + """记录插件一次成功执行。""" + module_name = self._get_module_name_from_plugin(plugin) or plugin.name + now_ts = time.time() + + with self._guard_lock: + record = self._get_or_create_plugin_guard_record(module_name) + record["success_count_total"] = int(record.get("success_count_total", 0) or 0) + 1 + record["consecutive_failures"] = 0 + record["consecutive_timeouts"] = 0 + record["last_success_at"] = now_ts + record["last_process_time_ms"] = float(process_time_ms or 0.0) + record["last_failure_type"] = "" + record["last_error_message"] = "" + record["last_timeout_seconds"] = 0 + record["half_open_in_flight"] = False + # 半开探测成功后立即闭合熔断: + # 1. 说明插件已经至少恢复到可以完成一次请求; + # 2. 这里不保留历史 consecutive_failures,避免恢复后仍长期背着旧债; + # 3. 同时把 open 时间清空,后台能立即看到状态回归 closed。 + record["circuit_state"] = "closed" + record["circuit_opened_at"] = 0.0 + record["circuit_until"] = 0.0 + + def record_plugin_execution_failure( + self, + plugin: PluginInterface, + *, + failure_type: str, + error_message: str, + process_time_ms: float, + timeout_seconds: int, + failure_threshold: int, + recovery_seconds: int, + ) -> Dict[str, Any]: + """记录插件一次失败执行,并根据连续失败次数决定是否打开熔断。""" + module_name = self._get_module_name_from_plugin(plugin) or plugin.name + now_ts = time.time() + normalized_failure_type = str(failure_type or "error").strip().lower() or "error" + failure_threshold = max(int(failure_threshold or 0), 2) + recovery_seconds = max(int(recovery_seconds or 0), 30) + + with self._guard_lock: + record = self._get_or_create_plugin_guard_record(module_name) + record["failure_count_total"] = int(record.get("failure_count_total", 0) or 0) + 1 + record["consecutive_failures"] = int(record.get("consecutive_failures", 0) or 0) + 1 + record["last_failure_at"] = now_ts + record["last_failure_type"] = normalized_failure_type + record["last_error_message"] = str(error_message or "").strip() + record["last_process_time_ms"] = float(process_time_ms or 0.0) + record["last_timeout_seconds"] = int(timeout_seconds or 0) + record["half_open_in_flight"] = False + + if normalized_failure_type == "timeout": + record["timeout_count_total"] = int(record.get("timeout_count_total", 0) or 0) + 1 + record["consecutive_timeouts"] = int(record.get("consecutive_timeouts", 0) or 0) + 1 + else: + record["consecutive_timeouts"] = 0 + + should_open_circuit = ( + str(record.get("circuit_state", "closed") or "closed").strip().lower() == "half_open" + or int(record.get("consecutive_failures", 0) or 0) >= failure_threshold + ) + + if should_open_circuit: + record["circuit_state"] = "open" + record["circuit_opened_at"] = now_ts + record["circuit_until"] = now_ts + recovery_seconds + else: + record["circuit_state"] = "closed" + + return dict(record) + @staticmethod def _is_sensitive_config_key(key: str) -> bool: """判断配置键是否属于敏感信息。""" @@ -242,11 +452,13 @@ class PluginManager: module_name: str, config_overview: Dict[str, Any], runtime_record: Dict[str, Any], + guard_snapshot: Optional[Dict[str, Any]] = None, ) -> List[Dict[str, str]]: """根据插件元信息、配置和运行态生成治理诊断。""" diagnostics = [] runtime_state = str(runtime_record.get("state", "") or "").strip().lower() runtime_message = str(runtime_record.get("message", "") or "").strip() + guard_snapshot = dict(guard_snapshot or {}) if runtime_state == "load_failed": diagnostics.append( @@ -292,6 +504,34 @@ class PluginManager: ) ) + circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower() + consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0) + open_remaining_seconds = int(guard_snapshot.get("open_remaining_seconds", 0) or 0) + if circuit_state == "open": + diagnostics.append( + self._build_diagnostic( + "error", + "circuit_open", + f"插件保护熔断已打开,预计 {open_remaining_seconds} 秒后再尝试半开恢复。", + ) + ) + elif circuit_state == "half_open": + diagnostics.append( + self._build_diagnostic( + "warning", + "circuit_half_open", + "插件当前处于半开探测状态,正在等待恢复结果。", + ) + ) + elif consecutive_failures > 0: + diagnostics.append( + self._build_diagnostic( + "info", + "recent_failures", + f"插件最近存在连续失败记录,当前连续失败次数为 {consecutive_failures}。", + ) + ) + if plugin is None: return diagnostics @@ -366,6 +606,7 @@ class PluginManager: """为已加载插件生成标准治理快照。""" module_name = self._get_module_name_from_plugin(plugin) or "unknown" runtime_record = self._get_module_runtime_state(module_name) + guard_snapshot = self.get_plugin_guard_snapshot(module_name) config_path = plugin.get_config_path() config_overview = self._read_plugin_config_overview(config_path) commands = self._collect_plugin_commands(plugin) @@ -376,6 +617,7 @@ class PluginManager: module_name=module_name, config_overview=config_overview, runtime_record=runtime_record, + guard_snapshot=guard_snapshot, ) governance_summary = self._summarize_governance_status(governance_diagnostics) @@ -405,11 +647,13 @@ class PluginManager: "governance_info_count": governance_summary["info_count"], "runtime_state": runtime_record.get("state", "loaded"), "runtime_message": runtime_record.get("message", ""), + "execution_guard": guard_snapshot, } def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]: """为未成功加载的插件模块生成治理快照。""" runtime_record = self._get_module_runtime_state(module_name) + guard_snapshot = self.get_plugin_guard_snapshot(module_name) config_path = os.path.join(self.plugin_dir, module_name, "config.toml") if not os.path.exists(config_path): config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml") @@ -419,6 +663,7 @@ class PluginManager: module_name=module_name, config_overview=config_overview, runtime_record=runtime_record, + guard_snapshot=guard_snapshot, ) governance_summary = self._summarize_governance_status(governance_diagnostics) runtime_state = str(runtime_record.get("state", "") or "").strip().lower() @@ -454,6 +699,7 @@ class PluginManager: "governance_info_count": governance_summary["info_count"], "runtime_state": runtime_state or "discovered", "runtime_message": runtime_record.get("message", ""), + "execution_guard": guard_snapshot, } @staticmethod @@ -793,6 +1039,7 @@ class PluginManager: self.module_to_display[module_name] = display_name self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}") self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。") + self.reset_plugin_guard_state(module_name) self._inject_bot_to_plugin(plugin) return plugin except Exception as e: @@ -872,6 +1119,7 @@ class PluginManager: self.module_to_display[module_name] = display_name self._refresh_module_file_state(module_name) self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。") + self.reset_plugin_guard_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") return plugin @@ -927,6 +1175,7 @@ class PluginManager: self.module_to_display[module_name] = display_name self._refresh_module_file_state(module_name) self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。") + self.reset_plugin_guard_state(module_name) # self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}") return plugin @@ -1061,6 +1310,7 @@ class PluginManager: if plugin.start(): plugin.status = PluginStatus.RUNNING module_name = self._get_module_name_from_plugin(plugin) or name + self.reset_plugin_guard_state(module_name) self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。") self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为在运行") return True diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md index 5b922d1..9e361c9 100644 --- a/docs/工程优化与Feature清单.md +++ b/docs/工程优化与Feature清单.md @@ -433,6 +433,13 @@ - 防止单插件问题拖垮整体系统 +当前进展: + +- 第一阶段已完成:消息插件执行已增加统一超时保护,避免单插件长时间卡住主链路 +- 第一阶段已完成:已补充连续失败熔断、冷却后半开探测与自动恢复逻辑 +- 第一阶段已完成:插件治理快照与后台详情已可查看执行保护状态、连续失败与恢复剩余时间 +- 后续可继续补充插件级并发配额、失败原因聚合、后台手动解除熔断与更细粒度的隔离策略 + 建议内容: - 插件处理超时控制 diff --git a/robot.py b/robot.py index 912a3c7..87e5d3a 100644 --- a/robot.py +++ b/robot.py @@ -649,7 +649,33 @@ class Robot: # 检查插件是否可以处理该消息 if plugin.can_process(plugin_msg): - processed, _ = await plugin.process_message(plugin_msg) + protection_policy = self._build_message_plugin_protection_policy(plugin) + acquire_result = self.plugin_manager.try_acquire_plugin_execution( + plugin, + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + if not acquire_result.get("allowed", False): + # 熔断打开或半开探测占用时,这里只跳过当前插件: + # 1. 保护目标是避免单插件持续拖慢主链路,而不是直接关闭整个插件; + # 2. 后续插件仍然可以继续尝试处理当前消息,降低功能面损失; + # 3. 冷却结束后会自动进入半开恢复探测,无需人工介入恢复。 + self.LOG.warning( + self._trace_message( + msg, + f"插件保护跳过 plugin={plugin.name} reason={acquire_result.get('reason')} " + f"remaining={acquire_result.get('open_remaining_seconds', 0)}s" + ) + ) + continue + + processed, _ = await asyncio.wait_for( + plugin.process_message(plugin_msg), + timeout=protection_policy["process_timeout_seconds"], + ) + self.plugin_manager.record_plugin_execution_success( + plugin, + process_time_ms=self._elapsed_ms(started_at), + ) self._record_plugin_call_result( plugin=plugin, msg=msg, @@ -670,14 +696,58 @@ class Robot: ) ) return True + except asyncio.TimeoutError as timeout_error: + protection_policy = self._build_message_plugin_protection_policy(plugin) + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="timeout", + error_message=( + f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。" + ), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=protection_policy["process_timeout_seconds"], + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) + self._record_plugin_call_error( + plugin=plugin, + msg=msg, + command_name=command_name, + error=timeout_error, + ) + self.LOG.error( + self._trace_message( + msg, + f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" + ) + ) except Exception as e: + protection_policy = self._build_message_plugin_protection_policy(plugin) + failure_record = self.plugin_manager.record_plugin_execution_failure( + plugin, + failure_type="error", + error_message=str(e), + process_time_ms=self._elapsed_ms(started_at), + timeout_seconds=0, + failure_threshold=protection_policy["failure_threshold"], + recovery_seconds=protection_policy["circuit_recovery_seconds"], + ) self._record_plugin_call_error( plugin=plugin, msg=msg, command_name=command_name, error=e, ) - self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}")) + self.LOG.error( + self._trace_message( + msg, + f"插件 {plugin.name} 处理消息失败: {e} " + f"circuit_state={failure_record.get('circuit_state')} " + f"consecutive_failures={failure_record.get('consecutive_failures')}" + ) + ) return False @@ -726,6 +796,70 @@ class Robot: msg_type = getattr(getattr(msg, "msg_type", None), "name", "") return f"[{msg_type or 'UNKNOWN'}]" + @staticmethod + def _safe_positive_int(value, default: int) -> int: + """把配置中的数字安全转成正整数。""" + try: + parsed = int(value) + return parsed if parsed > 0 else default + except (TypeError, ValueError): + return default + + def _build_message_plugin_protection_policy(self, plugin) -> dict: + """构建消息插件执行保护策略。""" + plugin_config = getattr(plugin, "_config", {}) or {} + runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {} + runtime_config = runtime_config if isinstance(runtime_config, dict) else {} + breaker_config = runtime_config.get("circuit_breaker", {}) if isinstance(runtime_config, dict) else {} + breaker_config = breaker_config if isinstance(breaker_config, dict) else {} + + # 超时策略尽量遵循“显式配置优先,已有内部超时参数兜底”的思路: + # 1. 新插件如果有特殊需求,只需要在 runtime / circuit_breaker 下声明自己的超时; + # 2. 老插件不改代码也能自动复用现有的 request / llm / render 超时字段; + # 3. 最终统一加一个缓冲区,避免外层 wait_for 比插件内部自己的超时还更早打断。 + explicit_timeout = ( + runtime_config.get("plugin_process_timeout_seconds") + or runtime_config.get("message_timeout_seconds") + or breaker_config.get("timeout_seconds") + or getattr(plugin, "plugin_process_timeout_seconds", 0) + ) + timeout_candidates = [] + for attr_name in [ + "llm_call_timeout_sec", + "_request_timeout_seconds", + "default_timeout", + "_image_render_timeout_seconds", + "image_render_timeout_seconds", + "_receive_timeout", + "_connect_timeout_seconds", + "_connect_timeout", + ]: + attr_value = getattr(plugin, attr_name, 0) + if isinstance(attr_value, (int, float)) and attr_value > 0: + timeout_candidates.append(int(attr_value)) + + if explicit_timeout: + resolved_timeout = self._safe_positive_int(explicit_timeout, 30) + elif timeout_candidates: + resolved_timeout = max(timeout_candidates) + 10 + else: + resolved_timeout = 30 + + failure_threshold = self._safe_positive_int( + breaker_config.get("failure_threshold") or runtime_config.get("circuit_breaker_failure_threshold") or 3, + 3, + ) + circuit_recovery_seconds = self._safe_positive_int( + breaker_config.get("recovery_seconds") or runtime_config.get("circuit_breaker_recovery_seconds") or 180, + 180, + ) + + return { + "process_timeout_seconds": max(10, min(int(resolved_timeout), 180)), + "failure_threshold": max(2, min(int(failure_threshold), 10)), + "circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)), + } + def _get_stats_collector_plugin(self): """获取运行中的统计收集插件实例。""" # 统计插件已经从“事件订阅”切到“主链路直接回调”,