Dict[str, Any]:
+ """构建插件执行保护的默认记录。"""
+ return {
+ "circuit_state": "closed",
+ "consecutive_failures": 0,
+ "consecutive_timeouts": 0,
+ "failure_count_total": 0,
+ "timeout_count_total": 0,
+ "success_count_total": 0,
+ "last_failure_at": 0.0,
+ "last_success_at": 0.0,
+ "last_error_message": "",
+ "last_failure_type": "",
+ "last_process_time_ms": 0.0,
+ "last_timeout_seconds": 0,
+ "circuit_opened_at": 0.0,
+ "circuit_until": 0.0,
+ "half_open_in_flight": False,
+ }
+
+ def _get_or_create_plugin_guard_record(self, module_name: str) -> Dict[str, Any]:
+ """读取或初始化插件执行保护记录。"""
+ if module_name not in self.plugin_guard_records:
+ self.plugin_guard_records[module_name] = self._build_default_guard_record()
+ return self.plugin_guard_records[module_name]
+
+ def reset_plugin_guard_state(self, plugin_or_module_name) -> None:
+ """重置插件执行保护状态。"""
+ module_name = ""
+ if isinstance(plugin_or_module_name, PluginInterface):
+ module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
+ else:
+ module_name = str(plugin_or_module_name or "").strip()
+ if not module_name:
+ return
+
+ with self._guard_lock:
+ self.plugin_guard_records[module_name] = self._build_default_guard_record()
+
+ def get_plugin_guard_snapshot(self, plugin_or_module_name) -> Dict[str, Any]:
+ """读取适合后台展示的插件执行保护快照。"""
+ module_name = ""
+ if isinstance(plugin_or_module_name, PluginInterface):
+ module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
+ else:
+ module_name = str(plugin_or_module_name or "").strip()
+
+ if not module_name:
+ return self._build_default_guard_record()
+
+ with self._guard_lock:
+ record = dict(self._get_or_create_plugin_guard_record(module_name))
+
+ now_ts = time.time()
+ circuit_until = float(record.get("circuit_until") or 0.0)
+ open_remaining_seconds = max(0, int(circuit_until - now_ts)) if circuit_until > 0 else 0
+ record["open_remaining_seconds"] = open_remaining_seconds
+ return record
+
+ def try_acquire_plugin_execution(
+ self,
+ plugin: PluginInterface,
+ *,
+ recovery_seconds: int,
+ ) -> Dict[str, Any]:
+ """判断插件当前是否允许继续执行。"""
+ module_name = self._get_module_name_from_plugin(plugin) or plugin.name
+ now_ts = time.time()
+
+ with self._guard_lock:
+ record = self._get_or_create_plugin_guard_record(module_name)
+ circuit_state = str(record.get("circuit_state", "closed") or "closed").strip().lower()
+ circuit_until = float(record.get("circuit_until") or 0.0)
+ recovery_seconds = max(int(recovery_seconds or 0), 30)
+
+ # 熔断打开期间直接拒绝新流量:
+ # 1. 目标是尽快切断持续失败插件对主链路的影响;
+ # 2. 若冷却时间未到,则所有新消息都应跳过该插件;
+ # 3. 返回剩余秒数,便于日志和后台展示更可读。
+ if circuit_state == "open" and circuit_until > now_ts:
+ return {
+ "allowed": False,
+ "reason": "circuit_open",
+ "open_remaining_seconds": max(0, int(circuit_until - now_ts)),
+ "circuit_state": circuit_state,
+ }
+
+ # 熔断窗口已过,放一个半开探测请求:
+ # 1. 只允许一个探测请求进入,避免刚恢复时被瞬时并发打爆;
+ # 2. 成功则后续 record_success 会关闭熔断并清零计数;
+ # 3. 失败则 record_failure 会重新打开熔断窗口。
+ if circuit_state == "open" and circuit_until <= now_ts:
+ record["circuit_state"] = "half_open"
+ record["half_open_in_flight"] = True
+ record["circuit_until"] = now_ts + recovery_seconds
+ return {
+ "allowed": True,
+ "reason": "half_open_probe",
+ "open_remaining_seconds": 0,
+ "circuit_state": "half_open",
+ }
+
+ # 半开状态下,如果已经有探测请求在飞,就继续拒绝后续请求。
+ if circuit_state == "half_open":
+ if bool(record.get("half_open_in_flight", False)):
+ return {
+ "allowed": False,
+ "reason": "half_open_probe_in_flight",
+ "open_remaining_seconds": max(0, int(float(record.get("circuit_until") or 0.0) - now_ts)),
+ "circuit_state": "half_open",
+ }
+ record["half_open_in_flight"] = True
+ return {
+ "allowed": True,
+ "reason": "half_open_probe",
+ "open_remaining_seconds": 0,
+ "circuit_state": "half_open",
+ }
+
+ return {
+ "allowed": True,
+ "reason": "closed",
+ "open_remaining_seconds": 0,
+ "circuit_state": "closed",
+ }
+
+ def record_plugin_execution_success(
+ self,
+ plugin: PluginInterface,
+ *,
+ process_time_ms: float,
+ ) -> None:
+ """记录插件一次成功执行。"""
+ module_name = self._get_module_name_from_plugin(plugin) or plugin.name
+ now_ts = time.time()
+
+ with self._guard_lock:
+ record = self._get_or_create_plugin_guard_record(module_name)
+ record["success_count_total"] = int(record.get("success_count_total", 0) or 0) + 1
+ record["consecutive_failures"] = 0
+ record["consecutive_timeouts"] = 0
+ record["last_success_at"] = now_ts
+ record["last_process_time_ms"] = float(process_time_ms or 0.0)
+ record["last_failure_type"] = ""
+ record["last_error_message"] = ""
+ record["last_timeout_seconds"] = 0
+ record["half_open_in_flight"] = False
+ # 半开探测成功后立即闭合熔断:
+ # 1. 说明插件已经至少恢复到可以完成一次请求;
+ # 2. 这里不保留历史 consecutive_failures,避免恢复后仍长期背着旧债;
+ # 3. 同时把 open 时间清空,后台能立即看到状态回归 closed。
+ record["circuit_state"] = "closed"
+ record["circuit_opened_at"] = 0.0
+ record["circuit_until"] = 0.0
+
+ def record_plugin_execution_failure(
+ self,
+ plugin: PluginInterface,
+ *,
+ failure_type: str,
+ error_message: str,
+ process_time_ms: float,
+ timeout_seconds: int,
+ failure_threshold: int,
+ recovery_seconds: int,
+ ) -> Dict[str, Any]:
+ """记录插件一次失败执行,并根据连续失败次数决定是否打开熔断。"""
+ module_name = self._get_module_name_from_plugin(plugin) or plugin.name
+ now_ts = time.time()
+ normalized_failure_type = str(failure_type or "error").strip().lower() or "error"
+ failure_threshold = max(int(failure_threshold or 0), 2)
+ recovery_seconds = max(int(recovery_seconds or 0), 30)
+
+ with self._guard_lock:
+ record = self._get_or_create_plugin_guard_record(module_name)
+ record["failure_count_total"] = int(record.get("failure_count_total", 0) or 0) + 1
+ record["consecutive_failures"] = int(record.get("consecutive_failures", 0) or 0) + 1
+ record["last_failure_at"] = now_ts
+ record["last_failure_type"] = normalized_failure_type
+ record["last_error_message"] = str(error_message or "").strip()
+ record["last_process_time_ms"] = float(process_time_ms or 0.0)
+ record["last_timeout_seconds"] = int(timeout_seconds or 0)
+ record["half_open_in_flight"] = False
+
+ if normalized_failure_type == "timeout":
+ record["timeout_count_total"] = int(record.get("timeout_count_total", 0) or 0) + 1
+ record["consecutive_timeouts"] = int(record.get("consecutive_timeouts", 0) or 0) + 1
+ else:
+ record["consecutive_timeouts"] = 0
+
+ should_open_circuit = (
+ str(record.get("circuit_state", "closed") or "closed").strip().lower() == "half_open"
+ or int(record.get("consecutive_failures", 0) or 0) >= failure_threshold
+ )
+
+ if should_open_circuit:
+ record["circuit_state"] = "open"
+ record["circuit_opened_at"] = now_ts
+ record["circuit_until"] = now_ts + recovery_seconds
+ else:
+ record["circuit_state"] = "closed"
+
+ return dict(record)
+
@staticmethod
def _is_sensitive_config_key(key: str) -> bool:
"""判断配置键是否属于敏感信息。"""
@@ -242,11 +452,13 @@ class PluginManager:
module_name: str,
config_overview: Dict[str, Any],
runtime_record: Dict[str, Any],
+ guard_snapshot: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, str]]:
"""根据插件元信息、配置和运行态生成治理诊断。"""
diagnostics = []
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
runtime_message = str(runtime_record.get("message", "") or "").strip()
+ guard_snapshot = dict(guard_snapshot or {})
if runtime_state == "load_failed":
diagnostics.append(
@@ -292,6 +504,34 @@ class PluginManager:
)
)
+ circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
+ consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
+ open_remaining_seconds = int(guard_snapshot.get("open_remaining_seconds", 0) or 0)
+ if circuit_state == "open":
+ diagnostics.append(
+ self._build_diagnostic(
+ "error",
+ "circuit_open",
+ f"插件保护熔断已打开,预计 {open_remaining_seconds} 秒后再尝试半开恢复。",
+ )
+ )
+ elif circuit_state == "half_open":
+ diagnostics.append(
+ self._build_diagnostic(
+ "warning",
+ "circuit_half_open",
+ "插件当前处于半开探测状态,正在等待恢复结果。",
+ )
+ )
+ elif consecutive_failures > 0:
+ diagnostics.append(
+ self._build_diagnostic(
+ "info",
+ "recent_failures",
+ f"插件最近存在连续失败记录,当前连续失败次数为 {consecutive_failures}。",
+ )
+ )
+
if plugin is None:
return diagnostics
@@ -366,6 +606,7 @@ class PluginManager:
"""为已加载插件生成标准治理快照。"""
module_name = self._get_module_name_from_plugin(plugin) or "unknown"
runtime_record = self._get_module_runtime_state(module_name)
+ guard_snapshot = self.get_plugin_guard_snapshot(module_name)
config_path = plugin.get_config_path()
config_overview = self._read_plugin_config_overview(config_path)
commands = self._collect_plugin_commands(plugin)
@@ -376,6 +617,7 @@ class PluginManager:
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
+ guard_snapshot=guard_snapshot,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
@@ -405,11 +647,13 @@ class PluginManager:
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_record.get("state", "loaded"),
"runtime_message": runtime_record.get("message", ""),
+ "execution_guard": guard_snapshot,
}
def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]:
"""为未成功加载的插件模块生成治理快照。"""
runtime_record = self._get_module_runtime_state(module_name)
+ guard_snapshot = self.get_plugin_guard_snapshot(module_name)
config_path = os.path.join(self.plugin_dir, module_name, "config.toml")
if not os.path.exists(config_path):
config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml")
@@ -419,6 +663,7 @@ class PluginManager:
module_name=module_name,
config_overview=config_overview,
runtime_record=runtime_record,
+ guard_snapshot=guard_snapshot,
)
governance_summary = self._summarize_governance_status(governance_diagnostics)
runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
@@ -454,6 +699,7 @@ class PluginManager:
"governance_info_count": governance_summary["info_count"],
"runtime_state": runtime_state or "discovered",
"runtime_message": runtime_record.get("message", ""),
+ "execution_guard": guard_snapshot,
}
@staticmethod
@@ -793,6 +1039,7 @@ class PluginManager:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}")
self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。")
+ self.reset_plugin_guard_state(module_name)
self._inject_bot_to_plugin(plugin)
return plugin
except Exception as e:
@@ -872,6 +1119,7 @@ class PluginManager:
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
+ self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
return plugin
@@ -927,6 +1175,7 @@ class PluginManager:
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
+ self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
return plugin
@@ -1061,6 +1310,7 @@ class PluginManager:
if plugin.start():
plugin.status = PluginStatus.RUNNING
module_name = self._get_module_name_from_plugin(plugin) or name
+ self.reset_plugin_guard_state(module_name)
self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。")
self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为在运行")
return True
diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md
index 5b922d1..9e361c9 100644
--- a/docs/工程优化与Feature清单.md
+++ b/docs/工程优化与Feature清单.md
@@ -433,6 +433,13 @@
- 防止单插件问题拖垮整体系统
+当前进展:
+
+- 第一阶段已完成:消息插件执行已增加统一超时保护,避免单插件长时间卡住主链路
+- 第一阶段已完成:已补充连续失败熔断、冷却后半开探测与自动恢复逻辑
+- 第一阶段已完成:插件治理快照与后台详情已可查看执行保护状态、连续失败与恢复剩余时间
+- 后续可继续补充插件级并发配额、失败原因聚合、后台手动解除熔断与更细粒度的隔离策略
+
建议内容:
- 插件处理超时控制
diff --git a/robot.py b/robot.py
index 912a3c7..87e5d3a 100644
--- a/robot.py
+++ b/robot.py
@@ -649,7 +649,33 @@ class Robot:
# 检查插件是否可以处理该消息
if plugin.can_process(plugin_msg):
- processed, _ = await plugin.process_message(plugin_msg)
+ protection_policy = self._build_message_plugin_protection_policy(plugin)
+ acquire_result = self.plugin_manager.try_acquire_plugin_execution(
+ plugin,
+ recovery_seconds=protection_policy["circuit_recovery_seconds"],
+ )
+ if not acquire_result.get("allowed", False):
+ # 熔断打开或半开探测占用时,这里只跳过当前插件:
+ # 1. 保护目标是避免单插件持续拖慢主链路,而不是直接关闭整个插件;
+ # 2. 后续插件仍然可以继续尝试处理当前消息,降低功能面损失;
+ # 3. 冷却结束后会自动进入半开恢复探测,无需人工介入恢复。
+ self.LOG.warning(
+ self._trace_message(
+ msg,
+ f"插件保护跳过 plugin={plugin.name} reason={acquire_result.get('reason')} "
+ f"remaining={acquire_result.get('open_remaining_seconds', 0)}s"
+ )
+ )
+ continue
+
+ processed, _ = await asyncio.wait_for(
+ plugin.process_message(plugin_msg),
+ timeout=protection_policy["process_timeout_seconds"],
+ )
+ self.plugin_manager.record_plugin_execution_success(
+ plugin,
+ process_time_ms=self._elapsed_ms(started_at),
+ )
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
@@ -670,14 +696,58 @@ class Robot:
)
)
return True
+ except asyncio.TimeoutError as timeout_error:
+ protection_policy = self._build_message_plugin_protection_policy(plugin)
+ failure_record = self.plugin_manager.record_plugin_execution_failure(
+ plugin,
+ failure_type="timeout",
+ error_message=(
+ f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
+ ),
+ process_time_ms=self._elapsed_ms(started_at),
+ timeout_seconds=protection_policy["process_timeout_seconds"],
+ failure_threshold=protection_policy["failure_threshold"],
+ recovery_seconds=protection_policy["circuit_recovery_seconds"],
+ )
+ self._record_plugin_call_error(
+ plugin=plugin,
+ msg=msg,
+ command_name=command_name,
+ error=timeout_error,
+ )
+ self.LOG.error(
+ self._trace_message(
+ msg,
+ f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s "
+ f"circuit_state={failure_record.get('circuit_state')} "
+ f"consecutive_failures={failure_record.get('consecutive_failures')}"
+ )
+ )
except Exception as e:
+ protection_policy = self._build_message_plugin_protection_policy(plugin)
+ failure_record = self.plugin_manager.record_plugin_execution_failure(
+ plugin,
+ failure_type="error",
+ error_message=str(e),
+ process_time_ms=self._elapsed_ms(started_at),
+ timeout_seconds=0,
+ failure_threshold=protection_policy["failure_threshold"],
+ recovery_seconds=protection_policy["circuit_recovery_seconds"],
+ )
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
- self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}"))
+ self.LOG.error(
+ self._trace_message(
+ msg,
+ f"插件 {plugin.name} 处理消息失败: {e} "
+ f"circuit_state={failure_record.get('circuit_state')} "
+ f"consecutive_failures={failure_record.get('consecutive_failures')}"
+ )
+ )
return False
@@ -726,6 +796,70 @@ class Robot:
msg_type = getattr(getattr(msg, "msg_type", None), "name", "")
return f"[{msg_type or 'UNKNOWN'}]"
+ @staticmethod
+ def _safe_positive_int(value, default: int) -> int:
+ """把配置中的数字安全转成正整数。"""
+ try:
+ parsed = int(value)
+ return parsed if parsed > 0 else default
+ except (TypeError, ValueError):
+ return default
+
+ def _build_message_plugin_protection_policy(self, plugin) -> dict:
+ """构建消息插件执行保护策略。"""
+ plugin_config = getattr(plugin, "_config", {}) or {}
+ runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {}
+ runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
+ breaker_config = runtime_config.get("circuit_breaker", {}) if isinstance(runtime_config, dict) else {}
+ breaker_config = breaker_config if isinstance(breaker_config, dict) else {}
+
+ # 超时策略尽量遵循“显式配置优先,已有内部超时参数兜底”的思路:
+ # 1. 新插件如果有特殊需求,只需要在 runtime / circuit_breaker 下声明自己的超时;
+ # 2. 老插件不改代码也能自动复用现有的 request / llm / render 超时字段;
+ # 3. 最终统一加一个缓冲区,避免外层 wait_for 比插件内部自己的超时还更早打断。
+ explicit_timeout = (
+ runtime_config.get("plugin_process_timeout_seconds")
+ or runtime_config.get("message_timeout_seconds")
+ or breaker_config.get("timeout_seconds")
+ or getattr(plugin, "plugin_process_timeout_seconds", 0)
+ )
+ timeout_candidates = []
+ for attr_name in [
+ "llm_call_timeout_sec",
+ "_request_timeout_seconds",
+ "default_timeout",
+ "_image_render_timeout_seconds",
+ "image_render_timeout_seconds",
+ "_receive_timeout",
+ "_connect_timeout_seconds",
+ "_connect_timeout",
+ ]:
+ attr_value = getattr(plugin, attr_name, 0)
+ if isinstance(attr_value, (int, float)) and attr_value > 0:
+ timeout_candidates.append(int(attr_value))
+
+ if explicit_timeout:
+ resolved_timeout = self._safe_positive_int(explicit_timeout, 30)
+ elif timeout_candidates:
+ resolved_timeout = max(timeout_candidates) + 10
+ else:
+ resolved_timeout = 30
+
+ failure_threshold = self._safe_positive_int(
+ breaker_config.get("failure_threshold") or runtime_config.get("circuit_breaker_failure_threshold") or 3,
+ 3,
+ )
+ circuit_recovery_seconds = self._safe_positive_int(
+ breaker_config.get("recovery_seconds") or runtime_config.get("circuit_breaker_recovery_seconds") or 180,
+ 180,
+ )
+
+ return {
+ "process_timeout_seconds": max(10, min(int(resolved_timeout), 180)),
+ "failure_threshold": max(2, min(int(failure_threshold), 10)),
+ "circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)),
+ }
+
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,