@@ -165,23 +143,6 @@ new Vue({
if (status === 'running') return 'warning';
return 'info';
},
- healthTag(status) {
- if (status === 'healthy') return 'success';
- if (status === 'running') return 'warning';
- if (status === 'failed') return 'danger';
- if (status === 'disabled') return 'info';
- return '';
- },
- healthLabel(status) {
- const mapping = {
- healthy: '健康',
- running: '执行中',
- failed: '异常',
- disabled: '停用',
- idle: '待运行'
- };
- return mapping[status] || '待运行';
- },
async loadJobs() {
this.loading = true;
try {
@@ -308,10 +269,5 @@ new Vue({
.page-hero-copy h1{font-size:30px;line-height:1.1;margin-bottom:10px;color:#0f172a}
.page-hero-copy p{color:#64748b;font-size:14px}
.action-row{display:flex;align-items:center;gap:8px;flex-wrap:wrap}
-.cell-ellipsis{overflow:hidden;text-overflow:ellipsis;white-space:nowrap;color:#475569}
-.history-metrics{display:flex;align-items:center;justify-content:center;gap:8px}
-.metric-success{color:#16a34a;font-weight:600}
-.metric-fail{color:#dc2626;font-weight:600}
-.history-total{margin-top:4px;color:#64748b;font-size:12px}
{% endblock %}
diff --git a/base/plugin_common/plugin_manager.py b/base/plugin_common/plugin_manager.py
index 37e1f65..2537c24 100644
--- a/base/plugin_common/plugin_manager.py
+++ b/base/plugin_common/plugin_manager.py
@@ -6,7 +6,6 @@ import threading
import time
from typing import Dict, List, Any, Optional, Tuple
-import toml
from loguru import logger
from base.plugin_common.plugin_interface import PluginInterface, PluginStatus
@@ -57,17 +56,6 @@ class PluginManager:
self.module_to_display = {} # 模块名到显示名的映射
self.system_context = {} # 系统上下文
self.current_bot: Optional[WechatAPIClient] = None
- # 运行态记录用于给“插件治理中心”提供统一视图:
- # 1. 不仅记录已成功加载的插件,也记录“加载失败 / 配置禁用 / 手动停用”等状态;
- # 2. 后台治理页就不必再从日志里猜某个插件为什么没出现在列表里;
- # 3. 这里按 module_name 维度存储,便于和 plugins 目录天然对齐。
- self.plugin_runtime_records: Dict[str, Dict[str, Any]] = {}
- # 插件执行保护记录:
- # 1. 主要服务于消息插件的“超时保护 / 连续失败熔断 / 自动半开恢复探测”;
- # 2. 记录同样按 module_name 存储,便于和治理快照直接关联;
- # 3. 后续若扩展到定时任务插件,也可以沿用同一套结构。
- self.plugin_guard_records: Dict[str, Dict[str, Any]] = {}
- self._guard_lock = threading.RLock()
# 热加载相关
self._watcher_thread: Optional[threading.Thread] = None
@@ -86,858 +74,6 @@ class PluginManager:
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
- def _record_module_runtime_state(
- self,
- module_name: str,
- state: str,
- message: str = "",
- detail: Optional[Dict[str, Any]] = None,
- ) -> None:
- """记录插件模块的运行态快照。"""
- if not module_name:
- return
- self.plugin_runtime_records[module_name] = {
- "state": str(state or "").strip().lower() or "unknown",
- "message": str(message or "").strip(),
- "detail": dict(detail or {}),
- "updated_at": float(time.time()),
- }
-
- def _get_module_runtime_state(self, module_name: str) -> Dict[str, Any]:
- """读取插件模块的最近一次运行态记录。"""
- return dict(self.plugin_runtime_records.get(module_name, {}) or {})
-
- def _build_default_guard_record(self) -> Dict[str, Any]:
- """构建插件执行保护的默认记录。"""
- return {
- "circuit_state": "closed",
- "consecutive_failures": 0,
- "consecutive_timeouts": 0,
- "failure_count_total": 0,
- "timeout_count_total": 0,
- "success_count_total": 0,
- "last_failure_at": 0.0,
- "last_success_at": 0.0,
- "last_error_message": "",
- "last_failure_type": "",
- "last_process_time_ms": 0.0,
- "last_timeout_seconds": 0,
- "circuit_opened_at": 0.0,
- "circuit_until": 0.0,
- "half_open_in_flight": False,
- }
-
- def _get_or_create_plugin_guard_record(self, module_name: str) -> Dict[str, Any]:
- """读取或初始化插件执行保护记录。"""
- if module_name not in self.plugin_guard_records:
- self.plugin_guard_records[module_name] = self._build_default_guard_record()
- return self.plugin_guard_records[module_name]
-
- def reset_plugin_guard_state(self, plugin_or_module_name) -> None:
- """重置插件执行保护状态。"""
- module_name = ""
- if isinstance(plugin_or_module_name, PluginInterface):
- module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
- else:
- module_name = str(plugin_or_module_name or "").strip()
- if not module_name:
- return
-
- with self._guard_lock:
- self.plugin_guard_records[module_name] = self._build_default_guard_record()
-
- def get_plugin_guard_snapshot(self, plugin_or_module_name) -> Dict[str, Any]:
- """读取适合后台展示的插件执行保护快照。"""
- module_name = ""
- if isinstance(plugin_or_module_name, PluginInterface):
- module_name = self._get_module_name_from_plugin(plugin_or_module_name) or ""
- else:
- module_name = str(plugin_or_module_name or "").strip()
-
- if not module_name:
- return self._build_default_guard_record()
-
- with self._guard_lock:
- record = dict(self._get_or_create_plugin_guard_record(module_name))
-
- now_ts = time.time()
- circuit_until = float(record.get("circuit_until") or 0.0)
- open_remaining_seconds = max(0, int(circuit_until - now_ts)) if circuit_until > 0 else 0
- record["open_remaining_seconds"] = open_remaining_seconds
- return record
-
- def try_acquire_plugin_execution(
- self,
- plugin: PluginInterface,
- *,
- recovery_seconds: int,
- ) -> Dict[str, Any]:
- """判断插件当前是否允许继续执行。"""
- module_name = self._get_module_name_from_plugin(plugin) or plugin.name
- now_ts = time.time()
-
- with self._guard_lock:
- record = self._get_or_create_plugin_guard_record(module_name)
- circuit_state = str(record.get("circuit_state", "closed") or "closed").strip().lower()
- circuit_until = float(record.get("circuit_until") or 0.0)
- recovery_seconds = max(int(recovery_seconds or 0), 30)
-
- # 熔断打开期间直接拒绝新流量:
- # 1. 目标是尽快切断持续失败插件对主链路的影响;
- # 2. 若冷却时间未到,则所有新消息都应跳过该插件;
- # 3. 返回剩余秒数,便于日志和后台展示更可读。
- if circuit_state == "open" and circuit_until > now_ts:
- return {
- "allowed": False,
- "reason": "circuit_open",
- "open_remaining_seconds": max(0, int(circuit_until - now_ts)),
- "circuit_state": circuit_state,
- }
-
- # 熔断窗口已过,放一个半开探测请求:
- # 1. 只允许一个探测请求进入,避免刚恢复时被瞬时并发打爆;
- # 2. 成功则后续 record_success 会关闭熔断并清零计数;
- # 3. 失败则 record_failure 会重新打开熔断窗口。
- if circuit_state == "open" and circuit_until <= now_ts:
- record["circuit_state"] = "half_open"
- record["half_open_in_flight"] = True
- record["circuit_until"] = now_ts + recovery_seconds
- return {
- "allowed": True,
- "reason": "half_open_probe",
- "open_remaining_seconds": 0,
- "circuit_state": "half_open",
- }
-
- # 半开状态下,如果已经有探测请求在飞,就继续拒绝后续请求。
- if circuit_state == "half_open":
- if bool(record.get("half_open_in_flight", False)):
- return {
- "allowed": False,
- "reason": "half_open_probe_in_flight",
- "open_remaining_seconds": max(0, int(float(record.get("circuit_until") or 0.0) - now_ts)),
- "circuit_state": "half_open",
- }
- record["half_open_in_flight"] = True
- return {
- "allowed": True,
- "reason": "half_open_probe",
- "open_remaining_seconds": 0,
- "circuit_state": "half_open",
- }
-
- return {
- "allowed": True,
- "reason": "closed",
- "open_remaining_seconds": 0,
- "circuit_state": "closed",
- }
-
- def record_plugin_execution_success(
- self,
- plugin: PluginInterface,
- *,
- process_time_ms: float,
- ) -> None:
- """记录插件一次成功执行。"""
- module_name = self._get_module_name_from_plugin(plugin) or plugin.name
- now_ts = time.time()
-
- with self._guard_lock:
- record = self._get_or_create_plugin_guard_record(module_name)
- record["success_count_total"] = int(record.get("success_count_total", 0) or 0) + 1
- record["consecutive_failures"] = 0
- record["consecutive_timeouts"] = 0
- record["last_success_at"] = now_ts
- record["last_process_time_ms"] = float(process_time_ms or 0.0)
- record["last_failure_type"] = ""
- record["last_error_message"] = ""
- record["last_timeout_seconds"] = 0
- record["half_open_in_flight"] = False
- # 半开探测成功后立即闭合熔断:
- # 1. 说明插件已经至少恢复到可以完成一次请求;
- # 2. 这里不保留历史 consecutive_failures,避免恢复后仍长期背着旧债;
- # 3. 同时把 open 时间清空,后台能立即看到状态回归 closed。
- record["circuit_state"] = "closed"
- record["circuit_opened_at"] = 0.0
- record["circuit_until"] = 0.0
-
- def record_plugin_execution_failure(
- self,
- plugin: PluginInterface,
- *,
- failure_type: str,
- error_message: str,
- process_time_ms: float,
- timeout_seconds: int,
- failure_threshold: int,
- recovery_seconds: int,
- ) -> Dict[str, Any]:
- """记录插件一次失败执行,并根据连续失败次数决定是否打开熔断。"""
- module_name = self._get_module_name_from_plugin(plugin) or plugin.name
- now_ts = time.time()
- normalized_failure_type = str(failure_type or "error").strip().lower() or "error"
- failure_threshold = max(int(failure_threshold or 0), 2)
- recovery_seconds = max(int(recovery_seconds or 0), 30)
-
- with self._guard_lock:
- record = self._get_or_create_plugin_guard_record(module_name)
- record["failure_count_total"] = int(record.get("failure_count_total", 0) or 0) + 1
- record["consecutive_failures"] = int(record.get("consecutive_failures", 0) or 0) + 1
- record["last_failure_at"] = now_ts
- record["last_failure_type"] = normalized_failure_type
- record["last_error_message"] = str(error_message or "").strip()
- record["last_process_time_ms"] = float(process_time_ms or 0.0)
- record["last_timeout_seconds"] = int(timeout_seconds or 0)
- record["half_open_in_flight"] = False
-
- if normalized_failure_type == "timeout":
- record["timeout_count_total"] = int(record.get("timeout_count_total", 0) or 0) + 1
- record["consecutive_timeouts"] = int(record.get("consecutive_timeouts", 0) or 0) + 1
- else:
- record["consecutive_timeouts"] = 0
-
- should_open_circuit = (
- str(record.get("circuit_state", "closed") or "closed").strip().lower() == "half_open"
- or int(record.get("consecutive_failures", 0) or 0) >= failure_threshold
- )
-
- if should_open_circuit:
- record["circuit_state"] = "open"
- record["circuit_opened_at"] = now_ts
- record["circuit_until"] = now_ts + recovery_seconds
- else:
- record["circuit_state"] = "closed"
-
- return dict(record)
-
- @staticmethod
- def _is_sensitive_config_key(key: str) -> bool:
- """判断配置键是否属于敏感信息。"""
- lowered_key = str(key or "").strip().lower()
- return any(
- keyword in lowered_key
- for keyword in ["password", "secret", "token", "api_key", "apikey", "cookie", "client_secret"]
- )
-
- def _build_config_overview_from_mapping(
- self,
- config_obj: Optional[Dict[str, Any]],
- *,
- config_path: str,
- file_exists: bool,
- parse_ok: bool,
- parse_error: str = "",
- ) -> Dict[str, Any]:
- """从配置对象构建统一的配置概览。"""
- config_obj = dict(config_obj or {})
- top_level_keys = list(config_obj.keys())
- dict_section_names = []
- enabled_sections = []
- disabled_sections = []
- sensitive_paths = []
-
- def _walk_sensitive_fields(node, path: str) -> None:
- if isinstance(node, dict):
- for key, value in node.items():
- next_path = f"{path}.{key}"
- if isinstance(value, str) and self._is_sensitive_config_key(key) and str(value or "").strip():
- sensitive_paths.append(next_path)
- _walk_sensitive_fields(value, next_path)
- return
- if isinstance(node, list):
- for index, value in enumerate(node):
- _walk_sensitive_fields(value, f"{path}[{index}]")
-
- for section_name, section_value in config_obj.items():
- if not isinstance(section_value, dict):
- continue
- dict_section_names.append(section_name)
- if "enable" not in section_value:
- continue
- if bool(section_value.get("enable", True)):
- enabled_sections.append(section_name)
- else:
- disabled_sections.append(section_name)
-
- _walk_sensitive_fields(config_obj, "config")
-
- return {
- "path": config_path,
- "exists": bool(file_exists),
- "parse_ok": bool(parse_ok),
- "parse_error": str(parse_error or ""),
- "top_level_keys": top_level_keys,
- "top_level_key_count": len(top_level_keys),
- "dict_section_names": dict_section_names,
- "section_count": len(dict_section_names),
- "enabled_sections": enabled_sections,
- "enabled_section_count": len(enabled_sections),
- "disabled_sections": disabled_sections,
- "disabled_section_count": len(disabled_sections),
- "sensitive_field_paths": sensitive_paths,
- "sensitive_field_count": len(sensitive_paths),
- }
-
- def _read_plugin_config_overview(self, config_path: str) -> Dict[str, Any]:
- """读取插件配置文件并返回概览。"""
- if not config_path:
- return self._build_config_overview_from_mapping(
- {},
- config_path="",
- file_exists=False,
- parse_ok=False,
- parse_error="配置路径为空",
- )
-
- if not os.path.exists(config_path):
- return self._build_config_overview_from_mapping(
- {},
- config_path=config_path,
- file_exists=False,
- parse_ok=True,
- )
-
- try:
- with open(config_path, "r", encoding="utf-8") as config_file:
- config_obj = toml.load(config_file)
- return self._build_config_overview_from_mapping(
- config_obj,
- config_path=config_path,
- file_exists=True,
- parse_ok=True,
- )
- except Exception as e:
- return self._build_config_overview_from_mapping(
- {},
- config_path=config_path,
- file_exists=True,
- parse_ok=False,
- parse_error=str(e),
- )
-
- @staticmethod
- def _build_diagnostic(level: str, code: str, message: str) -> Dict[str, str]:
- """统一治理诊断项结构。"""
- return {
- "level": str(level or "").strip().lower() or "info",
- "code": str(code or "").strip(),
- "message": str(message or "").strip(),
- }
-
- def _collect_plugin_types(self, plugin: PluginInterface) -> List[str]:
- """识别插件能力类型。"""
- plugin_types = []
- if isinstance(plugin, MessagePluginInterface):
- plugin_types.append("message")
- if isinstance(plugin, ScheduledPluginInterface):
- plugin_types.append("scheduled")
- if not plugin_types:
- plugin_types.append("generic")
- return plugin_types
-
- def _collect_plugin_commands(self, plugin: PluginInterface) -> List[str]:
- """统一读取插件声明的命令列表。"""
- commands = getattr(plugin, "commands", []) or getattr(plugin, "_commands", []) or []
- if isinstance(commands, (list, tuple, set)):
- return [str(item).strip() for item in commands if str(item or "").strip()]
- if isinstance(commands, str) and commands.strip():
- return [commands.strip()]
- return []
-
- def _build_governance_diagnostics(
- self,
- *,
- plugin: Optional[PluginInterface],
- module_name: str,
- config_overview: Dict[str, Any],
- runtime_record: Dict[str, Any],
- guard_snapshot: Optional[Dict[str, Any]] = None,
- ) -> List[Dict[str, str]]:
- """根据插件元信息、配置和运行态生成治理诊断。"""
- diagnostics = []
- runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
- runtime_message = str(runtime_record.get("message", "") or "").strip()
- guard_snapshot = dict(guard_snapshot or {})
-
- if runtime_state == "load_failed":
- diagnostics.append(
- self._build_diagnostic(
- "error",
- "load_failed",
- runtime_message or f"插件模块 `{module_name}` 加载失败,请先排查导入、初始化或依赖问题。",
- )
- )
- elif runtime_state == "disabled_by_config":
- diagnostics.append(
- self._build_diagnostic(
- "info",
- "disabled_by_config",
- runtime_message or "插件已在配置中禁用,当前未进入运行态。",
- )
- )
-
- if not config_overview.get("exists"):
- diagnostics.append(
- self._build_diagnostic(
- "info",
- "config_missing",
- "未发现 config.toml,当前插件将完全依赖默认参数或代码内置配置。",
- )
- )
- elif not config_overview.get("parse_ok"):
- diagnostics.append(
- self._build_diagnostic(
- "error",
- "config_parse_failed",
- f"配置文件解析失败:{config_overview.get('parse_error', '未知错误')}",
- )
- )
-
- sensitive_count = int(config_overview.get("sensitive_field_count", 0) or 0)
- if sensitive_count > 0:
- diagnostics.append(
- self._build_diagnostic(
- "warning",
- "config_contains_sensitive_fields",
- f"配置文件中检测到 {sensitive_count} 个敏感字段,建议逐步迁移到全局配置或环境变量。",
- )
- )
-
- circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
- consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
- open_remaining_seconds = int(guard_snapshot.get("open_remaining_seconds", 0) or 0)
- if circuit_state == "open":
- diagnostics.append(
- self._build_diagnostic(
- "error",
- "circuit_open",
- f"插件保护熔断已打开,预计 {open_remaining_seconds} 秒后再尝试半开恢复。",
- )
- )
- elif circuit_state == "half_open":
- diagnostics.append(
- self._build_diagnostic(
- "warning",
- "circuit_half_open",
- "插件当前处于半开探测状态,正在等待恢复结果。",
- )
- )
- elif consecutive_failures > 0:
- diagnostics.append(
- self._build_diagnostic(
- "info",
- "recent_failures",
- f"插件最近存在连续失败记录,当前连续失败次数为 {consecutive_failures}。",
- )
- )
-
- if plugin is None:
- return diagnostics
-
- version = str(getattr(plugin, "version", "") or "").strip()
- author = str(getattr(plugin, "author", "") or "").strip()
- description = str(getattr(plugin, "description", "") or "").strip()
- if not version or version.upper() == "N/A":
- diagnostics.append(self._build_diagnostic("warning", "missing_version", "插件未声明版本号,不利于后续升级和兼容治理。"))
- if not author or author.upper() == "N/A":
- diagnostics.append(self._build_diagnostic("info", "missing_author", "插件未声明作者信息,后续定位维护人会比较困难。"))
- if not description or description.upper() == "N/A":
- diagnostics.append(self._build_diagnostic("info", "missing_description", "插件未声明描述信息,后台可读性较弱。"))
-
- dependencies = list(getattr(plugin, "dependencies", []) or [])
- for dependency_name in dependencies:
- if dependency_name not in self.plugins:
- diagnostics.append(
- self._build_diagnostic(
- "warning",
- "missing_dependency",
- f"声明依赖插件 `{dependency_name}` 当前未加载,存在运行时能力缺失风险。",
- )
- )
-
- if isinstance(plugin, MessagePluginInterface):
- commands = self._collect_plugin_commands(plugin)
- if not commands:
- diagnostics.append(
- self._build_diagnostic(
- "info",
- "missing_commands",
- "消息插件未声明命令列表,后台无法准确展示其触发入口。",
- )
- )
- feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
- if not feature_key:
- diagnostics.append(
- self._build_diagnostic(
- "info",
- "missing_feature_key",
- "消息插件未声明 feature_key,将无法纳入统一群级权限治理。",
- )
- )
-
- return diagnostics
-
- @staticmethod
- def _summarize_governance_status(diagnostics: List[Dict[str, str]]) -> Dict[str, Any]:
- """把诊断列表汇总为后台更容易消费的治理状态。"""
- level_priority = {"healthy": 0, "info": 1, "warning": 2, "error": 3}
- level_counts = {"error": 0, "warning": 0, "info": 0}
- governance_status = "healthy"
-
- for item in diagnostics or []:
- level = str(item.get("level", "") or "info").strip().lower()
- if level in level_counts:
- level_counts[level] += 1
- if level_priority.get(level, 0) > level_priority.get(governance_status, 0):
- governance_status = level
-
- if governance_status == "info" and level_counts["warning"] == 0 and level_counts["error"] == 0:
- governance_status = "healthy"
-
- return {
- "status": governance_status,
- "error_count": level_counts["error"],
- "warning_count": level_counts["warning"],
- "info_count": level_counts["info"],
- }
-
- @staticmethod
- def _format_runtime_timestamp(timestamp_value: Any) -> str:
- """把运行态中的 unix 时间戳转成后台可读文本。"""
- try:
- normalized = float(timestamp_value or 0.0)
- except (TypeError, ValueError):
- return ""
- if normalized <= 0:
- return ""
- try:
- return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(normalized))
- except (OverflowError, OSError, ValueError):
- return ""
-
- @staticmethod
- def _safe_percent(numerator: Any, denominator: Any) -> float:
- """安全计算百分比,避免分母为空时抛异常。"""
- try:
- denominator_value = float(denominator or 0.0)
- if denominator_value <= 0:
- return 0.0
- return round((float(numerator or 0.0) / denominator_value) * 100, 2)
- except (TypeError, ValueError, ZeroDivisionError):
- return 0.0
-
- def _build_execution_summary(self, guard_snapshot: Dict[str, Any]) -> Dict[str, Any]:
- """把执行保护记录转换成更适合后台页面展示的执行摘要。
-
- 设计考虑:
- 1. 原始 execution_guard 更偏底层状态,前端直接消费会充满规则判断;
- 2. 这里统一补出成功率、总执行次数、最近成功/失败时间、最近错误摘要;
- 3. 未来如果还要做“高风险插件排行”“慢插件排行”,也能直接复用该摘要。
- """
- guard_snapshot = dict(guard_snapshot or {})
- success_count_total = int(guard_snapshot.get("success_count_total", 0) or 0)
- failure_count_total = int(guard_snapshot.get("failure_count_total", 0) or 0)
- timeout_count_total = int(guard_snapshot.get("timeout_count_total", 0) or 0)
- consecutive_failures = int(guard_snapshot.get("consecutive_failures", 0) or 0)
- consecutive_timeouts = int(guard_snapshot.get("consecutive_timeouts", 0) or 0)
- last_process_time_ms = round(float(guard_snapshot.get("last_process_time_ms", 0.0) or 0.0), 2)
- circuit_state = str(guard_snapshot.get("circuit_state", "closed") or "closed").strip().lower()
- last_error_message = str(guard_snapshot.get("last_error_message") or "").strip()
- if len(last_error_message) > 240:
- last_error_message = f"{last_error_message[:237]}..."
-
- total_executions = success_count_total + failure_count_total
- success_rate = self._safe_percent(success_count_total, total_executions)
- timeout_rate = self._safe_percent(timeout_count_total, total_executions)
- last_success_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_success_at"))
- last_failure_at_text = self._format_runtime_timestamp(guard_snapshot.get("last_failure_at"))
-
- status = "info"
- summary = "暂无执行样本"
- if total_executions > 0:
- status = "healthy"
- summary = (
- f"累计执行 {total_executions} 次,成功率 {success_rate}%,"
- f"最近耗时 {last_process_time_ms}ms"
- )
-
- # 熔断打开是最明确的高风险信号,应优先标记为 error。
- if circuit_state == "open":
- status = "error"
- summary = (
- f"插件当前处于熔断中,连续失败 {consecutive_failures} 次,"
- f"恢复剩余 {int(guard_snapshot.get('open_remaining_seconds', 0) or 0)}s"
- )
- elif failure_count_total > 0 or timeout_count_total > 0 or consecutive_failures > 0 or consecutive_timeouts > 0:
- status = "warning"
- summary = (
- f"累计失败 {failure_count_total} 次,超时 {timeout_count_total} 次,"
- f"成功率 {success_rate}%"
- )
-
- return {
- "status": status,
- "summary": summary,
- "total_executions": total_executions,
- "success_count_total": success_count_total,
- "failure_count_total": failure_count_total,
- "timeout_count_total": timeout_count_total,
- "success_rate": success_rate,
- "timeout_rate": timeout_rate,
- "consecutive_failures": consecutive_failures,
- "consecutive_timeouts": consecutive_timeouts,
- "last_process_time_ms": last_process_time_ms,
- "last_success_at_text": last_success_at_text,
- "last_failure_at_text": last_failure_at_text,
- "last_error_message": last_error_message,
- "last_failure_type": str(guard_snapshot.get("last_failure_type") or "").strip(),
- "last_timeout_seconds": int(guard_snapshot.get("last_timeout_seconds", 0) or 0),
- "circuit_state": circuit_state,
- "open_remaining_seconds": int(guard_snapshot.get("open_remaining_seconds", 0) or 0),
- }
-
- def _build_plugin_snapshot(self, plugin: PluginInterface) -> Dict[str, Any]:
- """为已加载插件生成标准治理快照。"""
- module_name = self._get_module_name_from_plugin(plugin) or "unknown"
- runtime_record = self._get_module_runtime_state(module_name)
- guard_snapshot = self.get_plugin_guard_snapshot(module_name)
- execution_summary = self._build_execution_summary(guard_snapshot)
- config_path = plugin.get_config_path()
- config_overview = self._read_plugin_config_overview(config_path)
- commands = self._collect_plugin_commands(plugin)
- feature_key = str(getattr(plugin, "feature_key", "") or "").strip()
- feature_description = str(getattr(plugin, "feature_description", "") or "").strip()
- governance_diagnostics = self._build_governance_diagnostics(
- plugin=plugin,
- module_name=module_name,
- config_overview=config_overview,
- runtime_record=runtime_record,
- guard_snapshot=guard_snapshot,
- )
- governance_summary = self._summarize_governance_status(governance_diagnostics)
-
- return {
- "name": plugin.name,
- "module_name": module_name,
- "version": getattr(plugin, "version", "N/A"),
- "author": getattr(plugin, "author", "N/A"),
- "description": getattr(plugin, "description", "N/A"),
- "status": plugin.status.name if hasattr(plugin, "status") else "UNKNOWN",
- "status_label": self._status_to_label(plugin.status.name if hasattr(plugin, "status") else "UNKNOWN"),
- "plugin_types": self._collect_plugin_types(plugin),
- "commands": commands,
- "command_count": len(commands),
- "command_prefix": getattr(plugin, "command_prefix", ""),
- "dependencies": list(getattr(plugin, "dependencies", []) or []),
- "feature_key": feature_key,
- "feature_description": feature_description,
- "supports_group_switch": bool(getattr(plugin, "feature", None)),
- "config": getattr(plugin, "_config", {}),
- "config_path": config_path,
- "config_overview": config_overview,
- "governance_diagnostics": governance_diagnostics,
- "governance_status": governance_summary["status"],
- "governance_error_count": governance_summary["error_count"],
- "governance_warning_count": governance_summary["warning_count"],
- "governance_info_count": governance_summary["info_count"],
- "runtime_state": runtime_record.get("state", "loaded"),
- "runtime_message": runtime_record.get("message", ""),
- "execution_guard": guard_snapshot,
- "execution_summary": execution_summary,
- }
-
- def _build_unloaded_plugin_snapshot(self, module_name: str) -> Dict[str, Any]:
- """为未成功加载的插件模块生成治理快照。"""
- runtime_record = self._get_module_runtime_state(module_name)
- guard_snapshot = self.get_plugin_guard_snapshot(module_name)
- execution_summary = self._build_execution_summary(guard_snapshot)
- config_path = os.path.join(self.plugin_dir, module_name, "config.toml")
- if not os.path.exists(config_path):
- config_path = os.path.join(self.plugin_dir, f"{module_name}", "config.toml")
- config_overview = self._read_plugin_config_overview(config_path)
- governance_diagnostics = self._build_governance_diagnostics(
- plugin=None,
- module_name=module_name,
- config_overview=config_overview,
- runtime_record=runtime_record,
- guard_snapshot=guard_snapshot,
- )
- governance_summary = self._summarize_governance_status(governance_diagnostics)
- runtime_state = str(runtime_record.get("state", "") or "").strip().lower()
- status = "DISCOVERED"
- if runtime_state == "load_failed":
- status = "ERROR"
- elif runtime_state == "disabled_by_config":
- status = "STOPPED"
-
- return {
- "name": module_name,
- "module_name": module_name,
- "version": "N/A",
- "author": "N/A",
- "description": runtime_record.get("message", "") or "插件模块已发现,但当前未进入加载态。",
- "status": status,
- "status_label": self._status_to_label(status),
- "plugin_types": ["unknown"],
- "commands": [],
- "command_count": 0,
- "command_prefix": "",
- "dependencies": [],
- "feature_key": "",
- "feature_description": "",
- "supports_group_switch": False,
- "config": {},
- "config_path": config_path,
- "config_overview": config_overview,
- "governance_diagnostics": governance_diagnostics,
- "governance_status": governance_summary["status"],
- "governance_error_count": governance_summary["error_count"],
- "governance_warning_count": governance_summary["warning_count"],
- "governance_info_count": governance_summary["info_count"],
- "runtime_state": runtime_state or "discovered",
- "runtime_message": runtime_record.get("message", ""),
- "execution_guard": guard_snapshot,
- "execution_summary": execution_summary,
- }
-
- @staticmethod
- def _normalize_snapshot_dependency_key(value: Any) -> str:
- """把依赖引用统一规整成可匹配的 key。"""
- return str(value or "").strip().lower()
-
- def _enrich_dependency_relationships(self, snapshots: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """为插件快照补齐依赖关系摘要。
-
- 设计目标:
- 1. 后台既要看“我依赖谁”,也要看“谁依赖我”;
- 2. 依赖声明有可能写插件名,也可能写模块名,因此这里做双 key 兼容;
- 3. 缺失依赖需要单独产出,便于页面上做风险聚合与高亮。
- """
- normalized_snapshots = [dict(item or {}) for item in (snapshots or [])]
- lookup_by_key: Dict[str, Dict[str, Any]] = {}
-
- for snapshot in normalized_snapshots:
- module_name = str(snapshot.get("module_name") or "").strip()
- display_name = str(snapshot.get("name") or "").strip()
- if module_name:
- lookup_by_key[self._normalize_snapshot_dependency_key(module_name)] = snapshot
- if display_name:
- lookup_by_key[self._normalize_snapshot_dependency_key(display_name)] = snapshot
-
- for snapshot in normalized_snapshots:
- snapshot["resolved_dependencies"] = []
- snapshot["missing_dependencies"] = []
- snapshot["dependent_plugins"] = []
-
- for snapshot in normalized_snapshots:
- dependencies = list(snapshot.get("dependencies", []) or [])
- resolved_dependencies = []
- missing_dependencies = []
-
- for dependency_name in dependencies:
- normalized_key = self._normalize_snapshot_dependency_key(dependency_name)
- target_snapshot = lookup_by_key.get(normalized_key)
- if target_snapshot:
- dependency_row = {
- "name": str(target_snapshot.get("name") or "").strip(),
- "module_name": str(target_snapshot.get("module_name") or "").strip(),
- "status": str(target_snapshot.get("status") or "").strip(),
- "status_label": str(target_snapshot.get("status_label") or "").strip(),
- "governance_status": str(target_snapshot.get("governance_status") or "").strip(),
- }
- resolved_dependencies.append(dependency_row)
- target_snapshot.setdefault("dependent_plugins", []).append(
- {
- "name": str(snapshot.get("name") or "").strip(),
- "module_name": str(snapshot.get("module_name") or "").strip(),
- "status": str(snapshot.get("status") or "").strip(),
- "status_label": str(snapshot.get("status_label") or "").strip(),
- "governance_status": str(snapshot.get("governance_status") or "").strip(),
- }
- )
- else:
- missing_dependencies.append(
- {
- "name": str(dependency_name or "").strip(),
- }
- )
-
- snapshot["resolved_dependencies"] = resolved_dependencies
- snapshot["missing_dependencies"] = missing_dependencies
- snapshot["dependency_summary"] = {
- "declared_count": len(dependencies),
- "resolved_count": len(resolved_dependencies),
- "missing_count": len(missing_dependencies),
- "dependent_count": len(snapshot.get("dependent_plugins", []) or []),
- "has_missing": len(missing_dependencies) > 0,
- }
-
- for snapshot in normalized_snapshots:
- dependent_plugins = list(snapshot.get("dependent_plugins", []) or [])
- dependent_plugins.sort(key=lambda item: (str(item.get("name") or ""), str(item.get("module_name") or "")))
- snapshot["dependent_plugins"] = dependent_plugins
-
- return normalized_snapshots
-
- @staticmethod
- def _status_to_label(status: str) -> str:
- """把运行态状态码转换成中文展示文案。"""
- status_map = {
- "RUNNING": "运行中",
- "STOPPED": "已停用",
- "LOADED": "已加载",
- "UNLOADED": "未加载",
- "ERROR": "异常",
- "DISCOVERED": "待处理",
- "UNKNOWN": "未知",
- }
- return status_map.get(str(status or "").strip().upper(), "未知")
-
- def get_plugin_snapshots(self) -> List[Dict[str, Any]]:
- """返回插件治理中心使用的统一快照列表。"""
- snapshots = []
- loaded_module_names = set()
- discovered_module_names = set(self.discover_plugins())
-
- for plugin in self.plugins.values():
- snapshot = self._build_plugin_snapshot(plugin)
- snapshots.append(snapshot)
- loaded_module_names.add(snapshot["module_name"])
-
- # 这里把“目录已存在但插件未成功加载”的模块也补进列表:
- # 1. 否则后台只能看到成功插件,看不到真正需要排查的失败模块;
- # 2. 这类插件往往正是治理中心最该暴露的问题;
- # 3. 统一补成快照后,前端无需区分“已加载”与“未加载”两套数据源。
- for module_name in sorted(discovered_module_names - loaded_module_names):
- snapshots.append(self._build_unloaded_plugin_snapshot(module_name))
-
- snapshots = self._enrich_dependency_relationships(snapshots)
-
- snapshots.sort(
- key=lambda item: (
- 0 if item.get("status") == "RUNNING" else 1,
- 0 if item.get("governance_status") == "error" else 1 if item.get("governance_status") == "warning" else 2,
- str(item.get("module_name", "")),
- )
- )
- return snapshots
-
- def get_plugin_snapshot(self, name: str) -> Optional[Dict[str, Any]]:
- """按模块名或展示名获取单个插件治理快照。"""
- target_name = str(name or "").strip()
- if not target_name:
- return None
-
- display_name, plugin = self.find_plugin_by_name(target_name)
- if plugin:
- return self._build_plugin_snapshot(plugin)
-
- for snapshot in self.get_plugin_snapshots():
- if snapshot.get("module_name") == target_name or snapshot.get("name") == target_name:
- return snapshot
- return None
-
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
@@ -1218,8 +354,6 @@ class PluginManager:
if module_name not in self.module_to_display:
self.module_to_display[module_name] = display_name
self.LOG.debug(f"PluginManager:添加缺失的模块映射 {module_name} -> {display_name}")
- self._record_module_runtime_state(module_name, "loaded", "插件已在内存中复用现有实例。")
- self.reset_plugin_guard_state(module_name)
self._inject_bot_to_plugin(plugin)
return plugin
except Exception as e:
@@ -1238,7 +372,6 @@ class PluginManager:
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager:导入插件模块 {module_path} 失败: {e}")
- self._record_module_runtime_state(module_name, "load_failed", f"导入插件模块失败: {e}")
return None
else:
# 单文件插件
@@ -1248,7 +381,6 @@ class PluginManager:
self.plugin_modules[module_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager:导入单文件插件 {module_name} 失败: {e}")
- self._record_module_runtime_state(module_name, "load_failed", f"导入单文件插件失败: {e}")
return None
# 查找插件类
@@ -1274,14 +406,12 @@ class PluginManager:
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败")
- self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败")
- self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
@@ -1298,17 +428,13 @@ class PluginManager:
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
- self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
- self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
return plugin
else:
self.LOG.error(f"PluginManager:插件模块 {module_name} 的 get_plugin() 返回的不是有效的插件实例")
- self._record_module_runtime_state(module_name, "load_failed", "get_plugin() 未返回有效的插件实例。")
else:
self.LOG.error(f"PluginManager:插件模块 {module_name} 中未找到有效的插件类或 get_plugin 函数")
- self._record_module_runtime_state(module_name, "load_failed", "未找到有效的插件类或 get_plugin 函数。")
return None
# 实例化插件
@@ -1322,7 +448,6 @@ class PluginManager:
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager:插件模块 {module_name} 加载配置失败")
- self._record_module_runtime_state(module_name, "load_failed", "插件配置加载失败。")
async_job.remove_jobs_by_owner(plugin)
return None
@@ -1330,14 +455,12 @@ class PluginManager:
for section in plugin._config.values():
if isinstance(section, dict) and not section.get("enable", True):
self.LOG.debug(f"PluginManager:插件 {module_name} 已禁用,跳过加载")
- self._record_module_runtime_state(module_name, "disabled_by_config", "插件在配置中已禁用,启动时已跳过加载。")
async_job.remove_jobs_by_owner(plugin)
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager:插件模块 {module_name} 初始化失败")
- self._record_module_runtime_state(module_name, "load_failed", "插件初始化失败。")
async_job.remove_jobs_by_owner(plugin)
return None
self._inject_bot_to_plugin(plugin)
@@ -1354,8 +477,6 @@ class PluginManager:
# 添加模块名到显示名的映射
self.module_to_display[module_name] = display_name
self._refresh_module_file_state(module_name)
- self._record_module_runtime_state(module_name, "loaded", "插件已成功加载。")
- self.reset_plugin_guard_state(module_name)
# self.LOG.info(f"PluginManager:添加模块映射 {module_name} -> {display_name}")
return plugin
@@ -1364,7 +485,6 @@ class PluginManager:
plugin_obj = locals().get("plugin")
if plugin_obj is not None:
async_job.remove_jobs_by_owner(plugin_obj)
- self._record_module_runtime_state(module_name, "load_failed", f"插件加载异常: {e}")
self.LOG.exception(f"PluginManager:加载插件模块 {module_name} 失败: {e}", exc_info=True)
return None
@@ -1489,15 +609,10 @@ class PluginManager:
if plugin.start():
plugin.status = PluginStatus.RUNNING
- module_name = self._get_module_name_from_plugin(plugin) or name
- self.reset_plugin_guard_state(module_name)
- self._record_module_runtime_state(module_name, "running", "插件已启动并进入运行态。")
self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为在运行")
return True
else:
plugin.status = PluginStatus.ERROR
- module_name = self._get_module_name_from_plugin(plugin) or name
- self._record_module_runtime_state(module_name, "load_failed", "插件启动失败,状态已标记为异常。")
self.LOG.debug(f"PluginManager:插件 {display_name} 状态变更为异常")
return False
@@ -1524,14 +639,10 @@ class PluginManager:
if plugin.stop():
plugin.status = PluginStatus.STOPPED
- module_name = self._get_module_name_from_plugin(plugin) or name
- self._record_module_runtime_state(module_name, "stopped", "插件已手动停用。")
self.LOG.debug(f"插件 {display_name} 状态变更为已停止")
return True
else:
plugin.status = PluginStatus.ERROR
- module_name = self._get_module_name_from_plugin(plugin) or name
- self._record_module_runtime_state(module_name, "load_failed", "插件停用失败,状态已标记为异常。")
self.LOG.debug(f"插件 {display_name} 状态变更为异常")
return False
diff --git a/db/base.py b/db/base.py
index 5211455..66d08b2 100644
--- a/db/base.py
+++ b/db/base.py
@@ -1,6 +1,4 @@
# -*- coding: utf-8 -*-
-import time
-
from loguru import logger
from typing import List, Dict, Any, Optional, Tuple, Union
@@ -14,62 +12,19 @@ class BaseDBOperator:
self.db_manager = db_manager
self.LOG = logger
- @staticmethod
- def _compact_sql(sql: str) -> str:
- """把 SQL 压成单行,便于日志里快速定位问题。"""
- return " ".join(str(sql or "").split())
-
- @classmethod
- def _truncate_text(cls, value, max_length: int = 240) -> str:
- """截断长文本,避免日志被超长 SQL 或参数刷屏。"""
- text = str(value or "")
- if len(text) <= max_length:
- return text
- return f"{text[:max_length]}..."
-
- def _log_sql_timing(self, operation: str, sql: str, params, elapsed_ms: float, affected_rows: Optional[int] = None) -> None:
- """记录慢 SQL 日志。
-
- 设计说明:
- 1. 只在超过阈值时输出 warning,避免日常日志噪声过大;
- 2. 统一输出压缩后的 SQL 与截断参数,便于线上排查具体慢点;
- 3. 查询/更新/批量/事务都走同一入口,后续如果要接后台审计也更容易扩展。
- """
- if not self.db_manager.is_slow_query_log_enabled():
- return
-
- threshold_ms = self.db_manager.get_slow_query_threshold_ms()
- if elapsed_ms < threshold_ms:
- return
-
- affected_text = ""
- if affected_rows is not None:
- affected_text = f" affected_rows={affected_rows}"
- self.LOG.warning(
- f"检测到慢SQL operation={operation} cost_ms={round(elapsed_ms, 2)} threshold_ms={threshold_ms}"
- f"{affected_text} sql={self._truncate_text(self._compact_sql(sql), 400)} "
- f"params={self._truncate_text(params, 240)}"
- )
-
def execute_query(self, sql: str, params: Optional[tuple] = None, fetch_one: bool = False) -> Union[
List[Dict], Dict, None]:
"""执行查询SQL"""
conn = self.db_manager.get_mysql_connection()
- started_at = time.perf_counter()
try:
with conn.cursor(dictionary=True) as cursor:
cursor.execute(sql, params or ())
- elapsed_ms = (time.perf_counter() - started_at) * 1000
if fetch_one:
- result = cursor.fetchone()
- self._log_sql_timing("query_one", sql, params, elapsed_ms, 1 if result else 0)
- return result
- result = cursor.fetchall()
- self._log_sql_timing("query", sql, params, elapsed_ms, len(result or []))
- return result
+ return cursor.fetchone()
+ return cursor.fetchall()
except Exception as e:
self.LOG.error(
- f"执行查询SQL出错: {e}, SQL: {sql}, 参数: {str(params)[:200] + '...' if len(str(params)) > 200 else params}"
+ f"执行更新SQL出错: {e}, SQL: {sql}, 参数: {str(params)[:200] + '...' if len(str(params)) > 200 else params}"
)
return None
finally:
@@ -78,13 +33,10 @@ class BaseDBOperator:
def execute_update(self, sql: str, params: Optional[tuple] = None) -> bool:
"""执行更新SQL"""
conn = self.db_manager.get_mysql_connection()
- started_at = time.perf_counter()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params or ())
- affected_rows = cursor.rowcount
conn.commit()
- self._log_sql_timing("update", sql, params, (time.perf_counter() - started_at) * 1000, affected_rows)
return True
except Exception as e:
self.LOG.error(
@@ -101,19 +53,10 @@ class BaseDBOperator:
return True
conn = self.db_manager.get_mysql_connection()
- started_at = time.perf_counter()
try:
with conn.cursor() as cursor:
cursor.executemany(sql, params_list)
- affected_rows = cursor.rowcount
conn.commit()
- self._log_sql_timing(
- "batch_update",
- sql,
- f"params_count={len(params_list)}",
- (time.perf_counter() - started_at) * 1000,
- affected_rows,
- )
return True
except Exception as e:
self.LOG.error(f"批量执行SQL出错: {e}, SQL: {sql}, 参数数量: {len(params_list)}")
@@ -128,18 +71,11 @@ class BaseDBOperator:
return True
conn = self.db_manager.get_mysql_connection()
- started_at = time.perf_counter()
try:
with conn.cursor() as cursor:
for sql, params in operations:
cursor.execute(sql, params)
conn.commit()
- self._log_sql_timing(
- "transaction",
- f"{len(operations)} statements",
- f"operations={len(operations)}",
- (time.perf_counter() - started_at) * 1000,
- )
return True
except Exception as e:
self.LOG.error(f"执行事务出错: {e}, 操作数量: {len(operations)}")
diff --git a/db/connection.py b/db/connection.py
index 10bc57f..17964ff 100644
--- a/db/connection.py
+++ b/db/connection.py
@@ -39,13 +39,7 @@ class DBConnectionManager:
self.LOG = logger
self.mysql_pool = None
self.redis_pool = None
- # 保存原始配置快照,供慢 SQL 阈值、库名探测等公共能力复用:
- # 1. BaseDBOperator 需要读取数据库名,去 information_schema 中检查索引;
- # 2. 慢 SQL 记录需要统一读取阈值配置,而不是每个 DB Operator 各自硬编码;
- # 3. 这里做浅拷贝即可,避免后续外部修改传入 dict 时影响内部状态。
- self.mysql_config = dict(mysql_config or {})
- self.redis_config = dict(redis_config or {})
-
+
# 初始化MySQL连接池
if mysql_config:
self.init_mysql_pool(mysql_config)
@@ -64,8 +58,6 @@ class DBConnectionManager:
if not config:
self.LOG.warning("MySQL配置为空,跳过初始化")
return
-
- self.mysql_config = dict(config or {})
# 准备连接池配置
pool_config = {
@@ -98,8 +90,6 @@ class DBConnectionManager:
if not config:
self.LOG.warning("Redis配置为空,跳过初始化")
return
-
- self.redis_config = dict(config or {})
self.redis_pool = redis.ConnectionPool(
host=config.get('host', 'localhost'),
@@ -127,26 +117,6 @@ class DBConnectionManager:
raise Exception("MySQL连接池未初始化")
return self.mysql_pool.get_connection()
-
- def get_mysql_database_name(self) -> str:
- """返回当前 MySQL 目标库名。"""
- return str(self.mysql_config.get('database', '') or '').strip()
-
- def get_slow_query_threshold_ms(self) -> int:
- """读取慢 SQL 阈值,默认 500ms。"""
- try:
- threshold = int(self.mysql_config.get('slow_query_threshold_ms', 500) or 500)
- return threshold if threshold > 0 else 500
- except (TypeError, ValueError):
- return 500
-
- def is_slow_query_log_enabled(self) -> bool:
- """是否启用慢 SQL 日志。"""
- raw_value = self.mysql_config.get('enable_slow_query_log', True)
- if isinstance(raw_value, str):
- normalized = raw_value.strip().lower()
- return normalized not in {'0', 'false', 'off', 'no'}
- return bool(raw_value)
def get_redis_connection(self):
"""获取Redis连接
@@ -170,4 +140,4 @@ class DBConnectionManager:
# 关闭Redis连接池
if self.redis_pool:
self.redis_pool.disconnect()
- self.redis_pool = None
+ self.redis_pool = None
\ No newline at end of file
diff --git a/db/message_storage.py b/db/message_storage.py
index ee62ea9..9bbf852 100644
--- a/db/message_storage.py
+++ b/db/message_storage.py
@@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
-from datetime import datetime, timedelta
+from datetime import datetime
import json
-from threading import Lock
from typing import Dict, List, Optional
from db.base import BaseDBOperator
@@ -13,103 +12,8 @@ from wechat_ipad.models.message import WxMessage
class MessageStorageDB(BaseDBOperator):
"""消息存储相关数据库操作"""
- _performance_ready = False
- _performance_lock = Lock()
-
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
- self._ensure_performance_primitives()
-
- @staticmethod
- def _normalize_datetime_text(value) -> str:
- """把日期/时间对象统一转成数据库可比较的标准字符串。"""
- if isinstance(value, datetime):
- return value.strftime("%Y-%m-%d %H:%M:%S")
- return str(value or "").strip()
-
- @classmethod
- def _build_day_time_range(cls, target_date: str) -> tuple[str, str]:
- """把 `YYYY-MM-DD` 日期转换成 `[00:00:00, 次日00:00:00)` 时间范围。"""
- start_dt = datetime.strptime(str(target_date or "").strip(), "%Y-%m-%d")
- end_dt = start_dt.replace(hour=0, minute=0, second=0, microsecond=0)
- next_day_dt = end_dt + timedelta(days=1)
- return (
- end_dt.strftime("%Y-%m-%d 00:00:00"),
- next_day_dt.strftime("%Y-%m-%d 00:00:00"),
- )
-
- @classmethod
- def _build_day_bounds(cls, start_date: str, end_date: str) -> tuple[str, str]:
- """把日期区间转换成适合索引命中的时间范围。"""
- start_dt = datetime.strptime(str(start_date or "").strip(), "%Y-%m-%d")
- end_dt = datetime.strptime(str(end_date or "").strip(), "%Y-%m-%d")
- if end_dt < start_dt:
- start_dt, end_dt = end_dt, start_dt
- next_day_dt = end_dt + timedelta(days=1)
- return (
- start_dt.strftime("%Y-%m-%d 00:00:00"),
- next_day_dt.strftime("%Y-%m-%d 00:00:00"),
- )
-
- def _ensure_performance_primitives(self) -> None:
- """确保消息存储相关的关键索引存在。
-
- 设计说明:
- 1. 这一步只补“高频查询明确受益”的索引,不做激进表结构重写;
- 2. 使用 information_schema 做存在性检查,保证重复启动时仍然幂等;
- 3. 只在进程内执行一次,避免每次 new MessageStorageDB 都重复打元数据查询。
- """
- if self.__class__._performance_ready:
- return
-
- with self.__class__._performance_lock:
- if self.__class__._performance_ready:
- return
-
- self._ensure_index_exists(
- table_name="messages",
- index_name="idx_group_sender_timestamp",
- create_sql="CREATE INDEX idx_group_sender_timestamp ON messages (group_id, sender, timestamp)",
- )
- self._ensure_index_exists(
- table_name="messages",
- index_name="idx_group_type_timestamp",
- create_sql="CREATE INDEX idx_group_type_timestamp ON messages (group_id, message_type, timestamp)",
- )
- self._ensure_index_exists(
- table_name="messages",
- index_name="idx_media_pending_lookup",
- create_sql="CREATE INDEX idx_media_pending_lookup ON messages (message_type, image_path, timestamp, group_id)",
- )
- self.__class__._performance_ready = True
-
- def _ensure_index_exists(self, table_name: str, index_name: str, create_sql: str) -> None:
- """按需补建单个索引。"""
- database_name = self.db_manager.get_mysql_database_name()
- if not database_name:
- return
-
- existing = self.execute_query(
- """
- SELECT 1
- FROM information_schema.statistics
- WHERE table_schema = %s
- AND table_name = %s
- AND index_name = %s
- LIMIT 1
- """,
- (database_name, table_name, index_name),
- fetch_one=True,
- )
- if existing:
- return
-
- # 索引补建属于“性能自愈”动作:
- # 1. 不要求用户手工跑 migration,服务启动时可自动补齐;
- # 2. 若线上库字段类型和预期不一致,失败后只记日志,不阻断主流程;
- # 3. 这样先拿到可观测收益,再决定后续是否做更完整的 schema migration。
- if not self.execute_update(create_sql):
- self.LOG.warning(f"消息表索引补建失败,请人工检查: table={table_name}, index={index_name}")
def archive_message(self, msg: WxMessage) -> bool:
"""存档消息
@@ -348,12 +252,10 @@ class MessageStorageDB(BaseDBOperator):
def get_member_messages_on_date(self, group_id: str, wxid: str, target_date: str, limit: int = 120) -> List[Dict]:
"""获取成员在某一天的消息"""
- start_time, end_time = self._build_day_time_range(target_date)
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
- WHERE timestamp >= %s
- AND timestamp < %s
+ WHERE DATE(timestamp) = %s
AND group_id = %s
AND sender = %s
AND message_type IN (1, 49)
@@ -362,16 +264,14 @@ class MessageStorageDB(BaseDBOperator):
ORDER BY timestamp ASC
LIMIT %s
"""
- return self.execute_query(sql, (start_time, end_time, group_id, wxid, limit)) or []
+ return self.execute_query(sql, (target_date, group_id, wxid, limit)) or []
def get_member_messages_for_group_date(self, group_id: str, target_date: str, limit: int = 5000) -> List[Dict]:
"""获取群在某一天的全部文本消息"""
- start_time, end_time = self._build_day_time_range(target_date)
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
- WHERE timestamp >= %s
- AND timestamp < %s
+ WHERE DATE(timestamp) = %s
AND group_id = %s
AND sender IS NOT NULL
AND sender <> ''
@@ -381,7 +281,7 @@ class MessageStorageDB(BaseDBOperator):
ORDER BY timestamp ASC
LIMIT %s
"""
- return self.execute_query(sql, (start_time, end_time, group_id, limit)) or []
+ return self.execute_query(sql, (target_date, group_id, limit)) or []
def get_recent_group_chat_messages(self, group_id: str, limit: int = 20) -> List[Dict]:
"""获取群聊最近消息"""
@@ -415,15 +315,13 @@ class MessageStorageDB(BaseDBOperator):
def get_message_count_by_date(self, date: str) -> List[Dict]:
"""获取指定日期的消息统计"""
- start_time, end_time = self._build_day_time_range(date)
sql = """
SELECT group_id, sender, COUNT(*) as count
FROM messages
- WHERE timestamp >= %s
- AND timestamp < %s
+ WHERE DATE(timestamp) = %s
GROUP BY group_id, sender
"""
- return self.execute_query(sql, (start_time, end_time)) or []
+ return self.execute_query(sql, (date,)) or []
def get_speech_ranking(self, date: str, group_id: str, limit: int = 20) -> List[Dict]:
"""获取指定日期和群组的发言排名"""
@@ -582,19 +480,14 @@ class MessageStorageDB(BaseDBOperator):
params.append(group_id)
if start_date:
- start_bound = f"{str(start_date).strip()} 00:00:00"
- sql_count += " AND timestamp >= %s "
- sql_data += " AND timestamp >= %s "
- params.append(start_bound)
+ sql_count += " AND DATE(timestamp) >= %s "
+ sql_data += " AND DATE(timestamp) >= %s "
+ params.append(start_date)
if end_date:
- _, end_bound = self._build_day_bounds(
- start_date or str(end_date).strip(),
- str(end_date).strip(),
- )
- sql_count += " AND timestamp < %s "
- sql_data += " AND timestamp < %s "
- params.append(end_bound)
+ sql_count += " AND DATE(timestamp) <= %s "
+ sql_data += " AND DATE(timestamp) <= %s "
+ params.append(end_date)
if search_text:
sql_count += " AND content LIKE %s "
@@ -772,8 +665,8 @@ class MessageStorageDB(BaseDBOperator):
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
- def get_messages_by_calendar_range(self, group_id: str, start_date: str, end_date: str = None,
- min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
+ def get_messages_by_date_range(self, group_id: str, start_date: str, end_date: str = None,
+ min_content_length: int = 6, max_results: int = 5000) -> List[Dict]:
"""按日期范围获取消息(支持按天总结)
Args:
@@ -789,13 +682,11 @@ class MessageStorageDB(BaseDBOperator):
if end_date is None:
end_date = start_date
- start_time, end_time = self._build_day_bounds(start_date, end_date)
-
sql = """
SELECT timestamp, sender, content, message_type
FROM messages
- WHERE timestamp >= %s
- AND timestamp < %s
+ WHERE DATE(timestamp) >= %s
+ AND DATE(timestamp) <= %s
AND group_id = %s
AND message_type IN (1, 49)
AND LENGTH(content) > %s
@@ -804,7 +695,7 @@ class MessageStorageDB(BaseDBOperator):
ORDER BY timestamp ASC
LIMIT %s
"""
- params = (start_time, end_time, group_id, min_content_length, max_results)
+ params = (start_date, end_date, group_id, min_content_length, max_results)
return self.execute_query(sql, params) or []
def get_messages_for_summary(self, group_id: str, hours_ago: int = 8,
@@ -858,8 +749,8 @@ class MessageStorageDB(BaseDBOperator):
AND content NOT LIKE '/%'
ORDER BY timestamp ASC
"""
- params = (self._normalize_datetime_text(start_time),
- self._normalize_datetime_text(end_time),
+ params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
+ end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id)
return self.execute_query(sql, params) or []
@@ -885,8 +776,8 @@ class MessageStorageDB(BaseDBOperator):
AND CHAR_LENGTH(content) < 300
AND content NOT LIKE '/%'
"""
- params = (self._normalize_datetime_text(start_time),
- self._normalize_datetime_text(end_time),
+ params = (start_time.strftime('%Y-%m-%d %H:%M:%S'),
+ end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id)
result = self.execute_query(sql, params)
return result[0]['count'] if result else 0
@@ -910,8 +801,8 @@ class MessageStorageDB(BaseDBOperator):
AND sender <> ''
"""
params = (
- self._normalize_datetime_text(start_time),
- self._normalize_datetime_text(end_time),
+ start_time.strftime('%Y-%m-%d %H:%M:%S'),
+ end_time.strftime('%Y-%m-%d %H:%M:%S'),
group_id,
)
result = self.execute_query(sql, params, fetch_one=True) or {}
diff --git a/db/plugin_schedule_db.py b/db/plugin_schedule_db.py
index ea93354..cc0a3da 100644
--- a/db/plugin_schedule_db.py
+++ b/db/plugin_schedule_db.py
@@ -216,22 +216,6 @@ class PluginScheduleDBOperator(BaseDBOperator):
) or {}
return row.get("triggered_at")
- @staticmethod
- def _clean_schedule_ids(schedule_ids: List[int]) -> List[int]:
- """清洗批量查询用的调度 ID 列表。"""
- clean_ids: List[int] = []
- seen = set()
- for item in schedule_ids or []:
- text = str(item or "").strip()
- if not text.isdigit():
- continue
- schedule_id = int(text)
- if schedule_id in seen:
- continue
- clean_ids.append(schedule_id)
- seen.add(schedule_id)
- return clean_ids
-
def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""批量获取每个调度任务最新一条执行日志。
@@ -240,7 +224,7 @@ class PluginScheduleDBOperator(BaseDBOperator):
2. 进程重启后,async_job 的运行时计数会重置,但数据库日志仍完整;
3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。
"""
- clean_ids = self._clean_schedule_ids(schedule_ids)
+ clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()]
if not clean_ids:
return {}
@@ -263,83 +247,3 @@ class PluginScheduleDBOperator(BaseDBOperator):
if schedule_id > 0:
result[schedule_id] = row
return result
-
- def get_schedule_history_summary_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
- """批量汇总调度任务的历史执行摘要。"""
- clean_ids = self._clean_schedule_ids(schedule_ids)
- if not clean_ids:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_ids))
- summary_sql = f"""
- SELECT
- schedule_id,
- MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
- MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
- SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
- COUNT(*) AS total_count
- FROM t_plugin_schedule_logs
- WHERE schedule_id IN ({placeholders})
- GROUP BY schedule_id
- """
- latest_failed_sql = f"""
- SELECT l.*
- FROM t_plugin_schedule_logs l
- INNER JOIN (
- SELECT schedule_id, MAX(id) AS max_id
- FROM t_plugin_schedule_logs
- WHERE status = 'failed' AND schedule_id IN ({placeholders})
- GROUP BY schedule_id
- ) t ON l.id = t.max_id
- """
-
- summary_rows = self.execute_query(summary_sql, tuple(clean_ids)) or []
- latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_ids)) or []
-
- result: Dict[int, Dict[str, Any]] = {}
- for row in summary_rows:
- schedule_id = int(row.get("schedule_id") or 0)
- if schedule_id <= 0:
- continue
- result[schedule_id] = {
- "latest_success_at": row.get("latest_success_at"),
- "latest_failed_at": row.get("latest_failed_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": int(row.get("success_count") or 0),
- "history_fail_count": int(row.get("fail_count") or 0),
- "history_total_count": int(row.get("total_count") or 0),
- }
-
- for row in latest_failed_rows:
- schedule_id = int(row.get("schedule_id") or 0)
- if schedule_id <= 0:
- continue
-
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- detail = json.loads(detail)
- except json.JSONDecodeError:
- detail = {}
- elif detail is None:
- detail = {}
-
- history = result.setdefault(
- schedule_id,
- {
- "latest_success_at": None,
- "latest_failed_at": row.get("triggered_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": 0,
- "history_fail_count": 0,
- "history_total_count": 0,
- },
- )
- history["latest_failed_at"] = row.get("triggered_at")
- history["latest_failure_summary"] = str(row.get("summary") or "").strip()
- history["latest_failure_detail"] = detail or {}
-
- return result
diff --git a/db/scripts/init.sql b/db/scripts/init.sql
index 909bb06..5db917a 100644
--- a/db/scripts/init.sql
+++ b/db/scripts/init.sql
@@ -52,12 +52,6 @@ create or replace index idx_date_timestamp
create or replace index idx_group_timestamp
on message_archive.messages (group_id, timestamp);
-create or replace index idx_group_sender_timestamp
- on message_archive.messages (group_id, sender, timestamp);
-
-create or replace index idx_group_type_timestamp
- on message_archive.messages (group_id, message_type, timestamp);
-
create or replace index idx_message_sender
on message_archive.messages (sender);
@@ -67,9 +61,6 @@ create or replace index idx_message_type
create or replace index messages_message_id_index
on message_archive.messages (message_id);
-create or replace index idx_media_pending_lookup
- on message_archive.messages (message_type, image_path, timestamp, group_id);
-
create or replace table message_archive.t_emoji_assets
(
md5 varchar(64) not null comment '表情MD5'
diff --git a/db/system_job_db.py b/db/system_job_db.py
index f7cb17d..9a10d0f 100644
--- a/db/system_job_db.py
+++ b/db/system_job_db.py
@@ -171,145 +171,6 @@ class SystemJobDBOperator(BaseDBOperator):
row["detail_json"] = {}
return rows
- @staticmethod
- def _clean_job_keys(job_keys: List[str]) -> List[str]:
- """清洗批量查询用的任务 key 列表。
-
- 设计说明:
- 1. 后台列表页会一次性请求多个任务的历史摘要,必须先去掉空值和重复值;
- 2. 统一在 DB Operator 层做清洗,避免上层每个调用方都重复写一遍;
- 3. 保持输入顺序,便于后续排查时能和原始列表一一对应。
- """
- clean_keys: List[str] = []
- seen = set()
- for item in job_keys or []:
- key = str(item or "").strip()
- if not key or key in seen:
- continue
- clean_keys.append(key)
- seen.add(key)
- return clean_keys
-
- def get_latest_logs_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
- """批量读取每个任务最新一条执行日志。"""
- clean_keys = self._clean_job_keys(job_keys)
- if not clean_keys:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_keys))
- sql = f"""
- SELECT l.*
- FROM t_system_job_logs l
- INNER JOIN (
- SELECT job_key, MAX(id) AS max_id
- FROM t_system_job_logs
- WHERE job_key IN ({placeholders})
- GROUP BY job_key
- ) t ON l.id = t.max_id
- """
- rows = self.execute_query(sql, tuple(clean_keys)) or []
- result: Dict[str, Dict[str, Any]] = {}
- for row in rows:
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- row["detail_json"] = json.loads(detail)
- except json.JSONDecodeError:
- row["detail_json"] = {}
- elif detail is None:
- row["detail_json"] = {}
-
- job_key = str(row.get("job_key") or "").strip()
- if job_key:
- result[job_key] = row
- return result
-
- def get_job_history_summary_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
- """批量汇总系统任务的执行历史摘要。
-
- 返回字段覆盖后台最常用的问题定位视角:
- 1. 最近成功时间,便于判断任务是否长期没有跑通;
- 2. 最近失败时间与失败摘要,便于列表页直接看到异常原因;
- 3. 累计成功/失败/总执行次数,便于粗看任务稳定性。
- """
- clean_keys = self._clean_job_keys(job_keys)
- if not clean_keys:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_keys))
- summary_sql = f"""
- SELECT
- job_key,
- MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
- MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
- SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
- COUNT(*) AS total_count
- FROM t_system_job_logs
- WHERE job_key IN ({placeholders})
- GROUP BY job_key
- """
- latest_failed_sql = f"""
- SELECT l.*
- FROM t_system_job_logs l
- INNER JOIN (
- SELECT job_key, MAX(id) AS max_id
- FROM t_system_job_logs
- WHERE status = 'failed' AND job_key IN ({placeholders})
- GROUP BY job_key
- ) t ON l.id = t.max_id
- """
-
- summary_rows = self.execute_query(summary_sql, tuple(clean_keys)) or []
- latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_keys)) or []
-
- result: Dict[str, Dict[str, Any]] = {}
- for row in summary_rows:
- job_key = str(row.get("job_key") or "").strip()
- if not job_key:
- continue
- result[job_key] = {
- "latest_success_at": row.get("latest_success_at"),
- "latest_failed_at": row.get("latest_failed_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": int(row.get("success_count") or 0),
- "history_fail_count": int(row.get("fail_count") or 0),
- "history_total_count": int(row.get("total_count") or 0),
- }
-
- for row in latest_failed_rows:
- job_key = str(row.get("job_key") or "").strip()
- if not job_key:
- continue
-
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- detail = json.loads(detail)
- except json.JSONDecodeError:
- detail = {}
- elif detail is None:
- detail = {}
-
- history = result.setdefault(
- job_key,
- {
- "latest_success_at": None,
- "latest_failed_at": row.get("triggered_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": 0,
- "history_fail_count": 0,
- "history_total_count": 0,
- },
- )
- history["latest_failed_at"] = row.get("triggered_at")
- history["latest_failure_summary"] = str(row.get("summary") or "").strip()
- history["latest_failure_detail"] = detail or {}
-
- return result
-
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(
diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md
index b105441..0478d49 100644
--- a/docs/工程优化与Feature清单.md
+++ b/docs/工程优化与Feature清单.md
@@ -400,15 +400,6 @@
- 把插件系统从“可加载”升级为“可治理”
-当前进展:
-
-- 第一阶段已完成:`PluginManager` 已输出统一插件治理快照,后台不再只展示“加载成功的插件”
-- 第一阶段已完成:后台插件管理页已补充治理健康、能力类型、Feature Key、依赖与配置概览信息
-- 第一阶段已完成:插件配置保存前已增加格式校验,避免坏配置直接写回线上文件
-- 第二阶段已完成:插件管理页已补充执行表现摘要、最近错误信息与高风险/慢插件排行,便于快速定位运行异常插件
-- 第二阶段已完成:插件快照已补充依赖拓扑摘要,后台可直接查看核心依赖插件、缺失依赖风险与上下游关系
-- 后续可继续补充插件错误历史、性能排名、依赖图与熔断/隔离控制
-
建议内容:
- 插件元信息页面
@@ -435,13 +426,6 @@
- 防止单插件问题拖垮整体系统
-当前进展:
-
-- 第一阶段已完成:消息插件执行已增加统一超时保护,避免单插件长时间卡住主链路
-- 第一阶段已完成:已补充连续失败熔断、冷却后半开探测与自动恢复逻辑
-- 第一阶段已完成:插件治理快照与后台详情已可查看执行保护状态、连续失败与恢复剩余时间
-- 后续可继续补充插件级并发配额、失败原因聚合、后台手动解除熔断与更细粒度的隔离策略
-
建议内容:
- 插件处理超时控制
@@ -465,13 +449,6 @@
- 让定时任务真正可管理、可追踪
-当前进展:
-
-- 第一阶段已完成:系统任务页与插件调度页已补充历史执行摘要,可直接查看最近成功时间、最近失败原因与累计成功/失败次数
-- 第一阶段已完成:任务列表接口已合并内存运行态与数据库日志态,服务重启后后台仍可回看最近执行结果
-- 第一阶段已完成:插件调度页已补充快捷启停入口,减少仅为切换启用状态而进入编辑弹窗的操作成本
-- 后续可继续补充任务执行审计人、失败重试策略模板、筛选搜索与跨任务汇总看板
-
建议内容:
- 展示任务执行历史
@@ -499,14 +476,6 @@
- 提高高消息量场景下的吞吐与查询效率
-当前进展:
-
-- 第一阶段已完成:数据库公共层已增加慢 SQL 记录能力,可按 `db_config.slow_query_threshold_ms` 阈值输出慢查询日志
-- 第一阶段已完成:消息存储层启动时会自动补齐关键查询索引,优先覆盖群消息范围查询、成员消息回溯与待处理媒体扫描场景
-- 第一阶段已完成:多处按日期查询已改为时间范围查询,避免 `DATE(timestamp)` 直接作用在索引列上导致索引失效
-- 第一阶段已完成:已修正消息存储层重复定义的日期范围方法,避免按天汇总查询误走错误实现
-- 后续可继续补充统计报表快照表、Redis key 扫描替换方案、后台慢 SQL 看板与更多统计表索引治理
-
建议内容:
- 梳理消息表与统计表索引
@@ -587,13 +556,6 @@
- 降低普通用户与管理员的使用门槛
-当前进展:
-
-- 第一阶段已完成:`菜单 指令清单 / 功能清单 / 命令清单 / 帮助` 已改为基于运行中插件快照自动生成
-- 第一阶段已完成:指令清单已按当前群真实可用状态过滤,管理员可额外看到未启用命令与管理命令
-- 第二阶段已完成:后台已新增“命令索引”页面,可按群查看真实可用命令、未启用命令、自动能力与管理员触发示例
-- 后续可继续补充插件触发示例模板、命令分类标签与更细粒度的使用说明
-
建议内容:
- 自动生成按插件分类的帮助菜单
diff --git a/plugins/robot_menu/main.py b/plugins/robot_menu/main.py
index ee9a880..1a75b33 100644
--- a/plugins/robot_menu/main.py
+++ b/plugins/robot_menu/main.py
@@ -20,7 +20,7 @@ class RobotMenuPlugin(MessagePluginInterface):
# 功能权限常量
FEATURE_KEY = "ROBOT_MENU"
- FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 | 菜单 状态 | 菜单 指令清单]"
+ FEATURE_DESCRIPTION = "📋 功能菜单 [菜单 - 显示功能菜单 | 菜单 状态 - 显示功能状态]"
@property
def name(self) -> str:
@@ -263,31 +263,6 @@ class RobotMenuPlugin(MessagePluginInterface):
)
return True, "显示功能状态"
- if cmd_name in {"指令清单", "功能清单", "命令清单", "帮助"}:
- # 指令清单改为直接从插件快照自动生成:
- # 1. 展示当前群“真实可用”的命令,而不是手工维护的固定文案;
- # 2. 管理员额外看到未启用项与管理命令,普通用户只看到能直接用的内容;
- # 3. 这样后续新增/删除插件后,菜单无需手动同步修改。
- command_catalog_text = self.menu_renderer.build_command_catalog_text(
- roomid if roomid else sender,
- sender,
- )
- command_catalog_markdown = self.menu_renderer.build_command_catalog_markdown(
- roomid if roomid else sender,
- sender,
- )
- await self.menu_renderer.send_menu_content(
- bot=bot,
- target=target,
- sender=sender,
- revoke=revoke,
- text_content=command_catalog_text,
- markdown_content=command_catalog_markdown,
- html_content="",
- revoke_seconds=120,
- )
- return True, "显示指令清单"
-
# 处理群列表命令
if cmd_name.upper() == "群列表":
group_list_text = self.get_group_list()
diff --git a/plugins/robot_menu/menu_render_tool.py b/plugins/robot_menu/menu_render_tool.py
index b6286c6..bdb6b0e 100644
--- a/plugins/robot_menu/menu_render_tool.py
+++ b/plugins/robot_menu/menu_render_tool.py
@@ -7,7 +7,6 @@ from typing import Any, Optional, Tuple
from loguru import logger as default_logger
-from base.plugin_common.plugin_manager import PluginManager
from utils.markdown_to_image import convert_md_str_to_image, html_to_image
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
@@ -190,283 +189,6 @@ class RobotMenuRenderTool:
},
)
- @staticmethod
- def _get_plugin_manager() -> PluginManager:
- """获取当前运行中的插件管理器单例。"""
- return PluginManager.get_instance()
-
- @staticmethod
- def _resolve_snapshot_group_status(snapshot: dict, group_id: str) -> dict:
- """解析插件在当前群里的可用状态。
-
- 规则说明:
- 1. 插件必须先处于 RUNNING,才可能被认为“可用”;
- 2. 若插件支持群级开关,则继续读取该群的 feature 权限;
- 3. 若插件没有群级开关,则视为“运行即全局可用”。
- """
- normalized_snapshot = dict(snapshot or {})
- status = str(normalized_snapshot.get("status") or "").strip().upper()
- supports_group_switch = bool(normalized_snapshot.get("supports_group_switch"))
- feature_key = str(normalized_snapshot.get("feature_key") or "").strip()
-
- if status != "RUNNING":
- return {
- "available": False,
- "reason": "插件未运行",
- "reason_code": "plugin_not_running",
- }
-
- if not group_id or not supports_group_switch or not feature_key:
- return {
- "available": True,
- "reason": "全局可用",
- "reason_code": "global_available",
- }
-
- feature = Feature.get_feature(feature_key)
- if feature is None:
- return {
- "available": True,
- "reason": "未绑定群级开关,按运行中处理",
- "reason_code": "feature_not_registered",
- }
-
- permission = GroupBotManager.get_group_permission(group_id, feature)
- if permission == PermissionStatus.ENABLED:
- return {
- "available": True,
- "reason": "本群已启用",
- "reason_code": "group_enabled",
- }
- return {
- "available": False,
- "reason": "本群未启用",
- "reason_code": "group_disabled",
- }
-
- @staticmethod
- def _format_plugin_command(example_command: str, command_prefix: str) -> str:
- """把插件命令和前缀拼成最终展示文本。"""
- prefix = str(command_prefix or "").strip()
- command = str(example_command or "").strip()
- if not prefix:
- return command
- return f"{prefix}{command}"
-
- def _build_plugin_command_entry(self, snapshot: dict, group_id: str) -> Optional[dict]:
- """把插件快照转换为菜单可展示的命令项。"""
- normalized_snapshot = dict(snapshot or {})
- commands = list(normalized_snapshot.get("commands", []) or [])
- plugin_types = list(normalized_snapshot.get("plugin_types", []) or [])
- if not commands and "scheduled" not in plugin_types:
- return None
-
- availability = self._resolve_snapshot_group_status(normalized_snapshot, group_id)
- command_prefix = str(normalized_snapshot.get("command_prefix") or "").strip()
- primary_command = self._format_plugin_command(commands[0], command_prefix) if commands else ""
- alias_commands = [
- self._format_plugin_command(command_text, command_prefix)
- for command_text in commands[1:4]
- if str(command_text or "").strip()
- ]
-
- if "message" in plugin_types:
- category = "message"
- category_label = "消息指令"
- elif "scheduled" in plugin_types:
- category = "scheduled"
- category_label = "自动任务"
- else:
- category = "generic"
- category_label = "通用能力"
-
- return {
- "name": str(normalized_snapshot.get("name") or "").strip(),
- "module_name": str(normalized_snapshot.get("module_name") or "").strip(),
- "description": str(normalized_snapshot.get("description") or "").strip() or "暂无描述",
- "category": category,
- "category_label": category_label,
- "commands": commands,
- "primary_command": primary_command,
- "alias_commands": alias_commands,
- "supports_group_switch": bool(normalized_snapshot.get("supports_group_switch")),
- "feature_key": str(normalized_snapshot.get("feature_key") or "").strip(),
- "available": bool(availability.get("available")),
- "availability_reason": str(availability.get("reason") or "").strip(),
- "availability_code": str(availability.get("reason_code") or "").strip(),
- "status_label": str(normalized_snapshot.get("status_label") or "").strip(),
- }
-
- def _collect_command_catalog(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
- """采集当前群和当前身份视角下的命令清单。
-
- 输出结构分三层:
- 1. 普通用户可直接用的命令;
- 2. 自动/定时能力;
- 3. 管理员附加能力与未启用项。
- """
- plugin_manager = self._get_plugin_manager()
- snapshots = plugin_manager.get_plugin_snapshots()
- if force_admin is None:
- is_admin = bool(GroupBotManager.is_admin_for_group(requester_id, group_id)) if group_id else bool(GroupBotManager.is_admin(requester_id))
- else:
- is_admin = bool(force_admin)
-
- available_manual = []
- available_auto = []
- unavailable_manual = []
-
- for snapshot in snapshots:
- entry = self._build_plugin_command_entry(snapshot, group_id)
- if not entry:
- continue
- if entry["category"] == "scheduled":
- if entry["available"]:
- available_auto.append(entry)
- continue
- if entry["available"]:
- available_manual.append(entry)
- else:
- unavailable_manual.append(entry)
-
- available_manual.sort(key=lambda item: (item["category"], item["name"], item["primary_command"]))
- available_auto.sort(key=lambda item: (item["name"], item["primary_command"]))
- unavailable_manual.sort(key=lambda item: (item["availability_code"], item["name"]))
-
- admin_commands = []
- if is_admin:
- admin_commands = [
- {"title": "查看功能状态", "example": "菜单 状态", "description": "查看当前群所有功能开关状态"},
- {"title": "启用某个功能", "example": "菜单 启用 功能序号", "description": "按菜单序号启用某项功能"},
- {"title": "关闭某个功能", "example": "菜单 关闭 功能序号", "description": "按菜单序号关闭某项功能"},
- {"title": "查看群管理员", "example": "菜单 管理员 列表", "description": "查看当前群管理员清单"},
- {"title": "添加群管理员", "example": "菜单 管理员 添加 @某人", "description": "把某个群成员加入本群管理员"},
- {"title": "删除群管理员", "example": "菜单 管理员 删除 @某人", "description": "移除某个群管理员"},
- ]
- if GroupBotManager.is_admin(requester_id):
- admin_commands.append(
- {"title": "查看托管群列表", "example": "菜单 群列表", "description": "查看所有已启用机器人的群"}
- )
-
- return {
- "group_id": str(group_id or "").strip(),
- "requester_id": str(requester_id or "").strip(),
- "is_admin": is_admin,
- "available_manual": available_manual,
- "available_auto": available_auto,
- "unavailable_manual": unavailable_manual,
- "admin_commands": admin_commands,
- "generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
- }
-
- def build_command_catalog_data(self, group_id: str, requester_id: str, force_admin: Optional[bool] = None) -> dict:
- """对外暴露统一的命令目录结构,供机器人菜单和后台页面共同复用。"""
- return self._collect_command_catalog(group_id, requester_id, force_admin=force_admin)
-
- def build_command_catalog_text(self, group_id: str, requester_id: str) -> str:
- """构建适合直接发送给用户的文本版命令清单。"""
- catalog = self.build_command_catalog_data(group_id, requester_id)
- lines = [
- "📚 当前群指令清单",
- f"群ID:{catalog['group_id'] or '私聊'}",
- f"生成时间:{catalog['generated_at']}",
- "",
- "一、当前可直接使用的命令",
- ]
-
- if catalog["available_manual"]:
- for item in catalog["available_manual"]:
- lines.append(f"【{item['name']}】{item['description']}")
- if item["primary_command"]:
- lines.append(f"主指令:{item['primary_command']}")
- if item["alias_commands"]:
- lines.append(f"别名:{' / '.join(item['alias_commands'])}")
- lines.append("")
- else:
- lines.append("当前没有可直接使用的手动命令")
- lines.append("")
-
- lines.append("二、自动/定时能力")
- if catalog["available_auto"]:
- for item in catalog["available_auto"]:
- lines.append(f"【{item['name']}】{item['description']}")
- lines.append("触发方式:自动或定时运行")
- lines.append("")
- else:
- lines.append("当前没有已启用的自动能力")
- lines.append("")
-
- if catalog["is_admin"]:
- lines.append("三、管理员额外可见")
- if catalog["unavailable_manual"]:
- lines.append("未启用或暂不可用命令:")
- for item in catalog["unavailable_manual"]:
- primary = item["primary_command"] or "无手动指令"
- lines.append(f"- {item['name']}:{primary}({item['availability_reason']})")
- lines.append("")
- else:
- lines.append("当前没有未启用的命令项")
- lines.append("")
-
- lines.append("管理命令:")
- for item in catalog["admin_commands"]:
- lines.append(f"- {item['example']}:{item['description']}")
- lines.append("")
-
- lines.append("提示:发送“菜单”查看功能开关;发送“菜单 状态”查看本群功能状态。")
- return "\n".join(lines).strip()
-
- def build_command_catalog_markdown(self, group_id: str, requester_id: str) -> str:
- """构建适合图片渲染的 Markdown 版指令清单。"""
- catalog = self.build_command_catalog_data(group_id, requester_id)
- lines = [
- "# 机器人指令清单",
- "",
- f"- 目标:`{catalog['group_id'] or '私聊'}`",
- f"- 生成时间:`{catalog['generated_at']}`",
- "",
- "## 当前可直接使用的命令",
- ]
-
- if catalog["available_manual"]:
- for item in catalog["available_manual"]:
- lines.append(f"### {item['name']}")
- lines.append(f"- 说明:{item['description']}")
- if item["primary_command"]:
- lines.append(f"- 主指令:`{item['primary_command']}`")
- if item["alias_commands"]:
- alias_text = " / ".join(f"`{alias}`" for alias in item["alias_commands"])
- lines.append(f"- 别名:{alias_text}")
- lines.append("")
- else:
- lines.append("- 当前没有可直接使用的手动命令")
- lines.append("")
-
- lines.append("## 自动/定时能力")
- if catalog["available_auto"]:
- for item in catalog["available_auto"]:
- lines.append(f"- **{item['name']}**:{item['description']}")
- else:
- lines.append("- 当前没有已启用的自动能力")
- lines.append("")
-
- if catalog["is_admin"]:
- lines.append("## 管理员额外可见")
- if catalog["unavailable_manual"]:
- lines.append("### 未启用或暂不可用命令")
- for item in catalog["unavailable_manual"]:
- primary = item["primary_command"] or "无手动指令"
- lines.append(f"- **{item['name']}**:`{primary}`({item['availability_reason']})")
- lines.append("")
-
- lines.append("### 管理命令")
- for item in catalog["admin_commands"]:
- lines.append(f"- `{item['example']}`:{item['description']}")
- lines.append("")
-
- lines.append("> 提示:发送 `菜单` 查看功能开关;发送 `菜单 状态` 查看本群功能状态。")
- return "\n".join(lines)
-
async def send_menu_content(
self,
bot: WechatAPIClient,
diff --git a/robot.py b/robot.py
index 87e5d3a..912a3c7 100644
--- a/robot.py
+++ b/robot.py
@@ -649,33 +649,7 @@ class Robot:
# 检查插件是否可以处理该消息
if plugin.can_process(plugin_msg):
- protection_policy = self._build_message_plugin_protection_policy(plugin)
- acquire_result = self.plugin_manager.try_acquire_plugin_execution(
- plugin,
- recovery_seconds=protection_policy["circuit_recovery_seconds"],
- )
- if not acquire_result.get("allowed", False):
- # 熔断打开或半开探测占用时,这里只跳过当前插件:
- # 1. 保护目标是避免单插件持续拖慢主链路,而不是直接关闭整个插件;
- # 2. 后续插件仍然可以继续尝试处理当前消息,降低功能面损失;
- # 3. 冷却结束后会自动进入半开恢复探测,无需人工介入恢复。
- self.LOG.warning(
- self._trace_message(
- msg,
- f"插件保护跳过 plugin={plugin.name} reason={acquire_result.get('reason')} "
- f"remaining={acquire_result.get('open_remaining_seconds', 0)}s"
- )
- )
- continue
-
- processed, _ = await asyncio.wait_for(
- plugin.process_message(plugin_msg),
- timeout=protection_policy["process_timeout_seconds"],
- )
- self.plugin_manager.record_plugin_execution_success(
- plugin,
- process_time_ms=self._elapsed_ms(started_at),
- )
+ processed, _ = await plugin.process_message(plugin_msg)
self._record_plugin_call_result(
plugin=plugin,
msg=msg,
@@ -696,58 +670,14 @@ class Robot:
)
)
return True
- except asyncio.TimeoutError as timeout_error:
- protection_policy = self._build_message_plugin_protection_policy(plugin)
- failure_record = self.plugin_manager.record_plugin_execution_failure(
- plugin,
- failure_type="timeout",
- error_message=(
- f"插件执行超时,超过 {protection_policy['process_timeout_seconds']} 秒仍未完成。"
- ),
- process_time_ms=self._elapsed_ms(started_at),
- timeout_seconds=protection_policy["process_timeout_seconds"],
- failure_threshold=protection_policy["failure_threshold"],
- recovery_seconds=protection_policy["circuit_recovery_seconds"],
- )
- self._record_plugin_call_error(
- plugin=plugin,
- msg=msg,
- command_name=command_name,
- error=timeout_error,
- )
- self.LOG.error(
- self._trace_message(
- msg,
- f"插件 {plugin.name} 执行超时,timeout={protection_policy['process_timeout_seconds']}s "
- f"circuit_state={failure_record.get('circuit_state')} "
- f"consecutive_failures={failure_record.get('consecutive_failures')}"
- )
- )
except Exception as e:
- protection_policy = self._build_message_plugin_protection_policy(plugin)
- failure_record = self.plugin_manager.record_plugin_execution_failure(
- plugin,
- failure_type="error",
- error_message=str(e),
- process_time_ms=self._elapsed_ms(started_at),
- timeout_seconds=0,
- failure_threshold=protection_policy["failure_threshold"],
- recovery_seconds=protection_policy["circuit_recovery_seconds"],
- )
self._record_plugin_call_error(
plugin=plugin,
msg=msg,
command_name=command_name,
error=e,
)
- self.LOG.error(
- self._trace_message(
- msg,
- f"插件 {plugin.name} 处理消息失败: {e} "
- f"circuit_state={failure_record.get('circuit_state')} "
- f"consecutive_failures={failure_record.get('consecutive_failures')}"
- )
- )
+ self.LOG.error(self._trace_message(msg, f"插件 {plugin.name} 处理消息失败: {e}"))
return False
@@ -796,70 +726,6 @@ class Robot:
msg_type = getattr(getattr(msg, "msg_type", None), "name", "")
return f"[{msg_type or 'UNKNOWN'}]"
- @staticmethod
- def _safe_positive_int(value, default: int) -> int:
- """把配置中的数字安全转成正整数。"""
- try:
- parsed = int(value)
- return parsed if parsed > 0 else default
- except (TypeError, ValueError):
- return default
-
- def _build_message_plugin_protection_policy(self, plugin) -> dict:
- """构建消息插件执行保护策略。"""
- plugin_config = getattr(plugin, "_config", {}) or {}
- runtime_config = plugin_config.get("runtime", {}) if isinstance(plugin_config, dict) else {}
- runtime_config = runtime_config if isinstance(runtime_config, dict) else {}
- breaker_config = runtime_config.get("circuit_breaker", {}) if isinstance(runtime_config, dict) else {}
- breaker_config = breaker_config if isinstance(breaker_config, dict) else {}
-
- # 超时策略尽量遵循“显式配置优先,已有内部超时参数兜底”的思路:
- # 1. 新插件如果有特殊需求,只需要在 runtime / circuit_breaker 下声明自己的超时;
- # 2. 老插件不改代码也能自动复用现有的 request / llm / render 超时字段;
- # 3. 最终统一加一个缓冲区,避免外层 wait_for 比插件内部自己的超时还更早打断。
- explicit_timeout = (
- runtime_config.get("plugin_process_timeout_seconds")
- or runtime_config.get("message_timeout_seconds")
- or breaker_config.get("timeout_seconds")
- or getattr(plugin, "plugin_process_timeout_seconds", 0)
- )
- timeout_candidates = []
- for attr_name in [
- "llm_call_timeout_sec",
- "_request_timeout_seconds",
- "default_timeout",
- "_image_render_timeout_seconds",
- "image_render_timeout_seconds",
- "_receive_timeout",
- "_connect_timeout_seconds",
- "_connect_timeout",
- ]:
- attr_value = getattr(plugin, attr_name, 0)
- if isinstance(attr_value, (int, float)) and attr_value > 0:
- timeout_candidates.append(int(attr_value))
-
- if explicit_timeout:
- resolved_timeout = self._safe_positive_int(explicit_timeout, 30)
- elif timeout_candidates:
- resolved_timeout = max(timeout_candidates) + 10
- else:
- resolved_timeout = 30
-
- failure_threshold = self._safe_positive_int(
- breaker_config.get("failure_threshold") or runtime_config.get("circuit_breaker_failure_threshold") or 3,
- 3,
- )
- circuit_recovery_seconds = self._safe_positive_int(
- breaker_config.get("recovery_seconds") or runtime_config.get("circuit_breaker_recovery_seconds") or 180,
- 180,
- )
-
- return {
- "process_timeout_seconds": max(10, min(int(resolved_timeout), 180)),
- "failure_threshold": max(2, min(int(failure_threshold), 10)),
- "circuit_recovery_seconds": max(30, min(int(circuit_recovery_seconds), 900)),
- }
-
def _get_stats_collector_plugin(self):
"""获取运行中的统计收集插件实例。"""
# 统计插件已经从“事件订阅”切到“主链路直接回调”,
diff --git a/utils/plugin_schedule_manager.py b/utils/plugin_schedule_manager.py
index 2b30afb..b9767fd 100644
--- a/utils/plugin_schedule_manager.py
+++ b/utils/plugin_schedule_manager.py
@@ -209,47 +209,6 @@ class PluginScheduleManager:
return False
return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds))
- @staticmethod
- def _build_schedule_health_status(
- *,
- enabled: bool,
- running: bool,
- last_status: str,
- latest_success_at,
- latest_failure_summary: str,
- ) -> str:
- """根据调度任务运行态和历史态生成后台健康标签。"""
- if not enabled:
- return "disabled"
- if running:
- return "running"
- # 只有“最近一次执行仍是失败”时才把健康态打成 failed,
- # 避免历史上曾失败过、但后面已经恢复成功的任务一直显示异常。
- if str(last_status or "").strip().lower() == "failed":
- return "failed"
- if latest_success_at or str(last_status or "").strip().lower() == "success":
- return "healthy"
- if str(latest_failure_summary or "").strip():
- return "degraded"
- return "idle"
-
- @staticmethod
- def _build_schedule_health_message(*, health_status: str, latest_success_at, latest_failure_summary: str) -> str:
- """生成调度任务列表里展示的简短健康说明。"""
- if health_status == "disabled":
- return "任务已停用"
- if health_status == "running":
- return "任务正在执行中"
- if health_status in ("failed", "degraded"):
- return str(latest_failure_summary or "最近存在失败记录").strip()
- if health_status == "healthy":
- if isinstance(latest_success_at, datetime):
- return f"最近成功于 {latest_success_at.strftime('%Y-%m-%d %H:%M:%S')}"
- if latest_success_at:
- return f"最近成功于 {latest_success_at}"
- return "任务近期执行正常"
- return "暂无执行记录"
-
async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]:
schedule_id = int(schedule_row["id"])
action_key = schedule_row.get("action_key")
@@ -338,7 +297,6 @@ class PluginScheduleManager:
# 日志兜底:进程重启后内存态 last_run_at 会丢失,任务页需要从数据库最新日志恢复显示。
schedule_ids = [int(row.get("id")) for row in db_rows if row.get("id") is not None]
latest_log_by_schedule = self.db.get_latest_logs_map(schedule_ids)
- history_summary_by_schedule = self.db.get_schedule_history_summary_map(schedule_ids)
data = []
for row in db_rows:
@@ -346,7 +304,6 @@ class PluginScheduleManager:
key = f"plugin_schedule:{schedule_id}"
runtime = runtime_by_key.get(key, {})
latest_log = latest_log_by_schedule.get(schedule_id) or {}
- history_summary = history_summary_by_schedule.get(schedule_id) or {}
merged = dict(row)
merged["runtime_job_id"] = runtime.get("id")
merged["running"] = runtime.get("running", False)
@@ -362,24 +319,6 @@ class PluginScheduleManager:
merged["run_count"] = runtime.get("run_count", 0)
merged["success_count"] = runtime.get("success_count", 0)
merged["fail_count"] = runtime.get("fail_count", 0)
- merged["latest_success_at"] = history_summary.get("latest_success_at")
- merged["latest_failed_at"] = history_summary.get("latest_failed_at")
- merged["latest_failure_summary"] = str(history_summary.get("latest_failure_summary") or "").strip()
- merged["history_success_count"] = int(history_summary.get("history_success_count", 0) or 0)
- merged["history_fail_count"] = int(history_summary.get("history_fail_count", 0) or 0)
- merged["history_total_count"] = int(history_summary.get("history_total_count", 0) or 0)
- merged["health_status"] = self._build_schedule_health_status(
- enabled=bool(row.get("enabled", 0)),
- running=bool(runtime.get("running", False)),
- last_status=str(merged.get("last_status") or ""),
- latest_success_at=history_summary.get("latest_success_at"),
- latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
- )
- merged["health_message"] = self._build_schedule_health_message(
- health_status=merged["health_status"],
- latest_success_at=history_summary.get("latest_success_at"),
- latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
- )
data.append(merged)
return data
diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py
index a28160d..2ab01d6 100644
--- a/utils/wechat/message_to_db.py
+++ b/utils/wechat/message_to_db.py
@@ -883,7 +883,7 @@ class MessageStorage:
end_date = current_time.strftime('%Y-%m-%d')
# 使用新的按日期查询方法
- messages = self.message_db.get_messages_by_calendar_range(
+ messages = self.message_db.get_messages_by_date_range(
group_id,
start_date=start_date,
end_date=end_date,