恢复首页基础设施详细状态与任务调度卡片

This commit is contained in:
liuwei
2026-05-06 08:52:54 +08:00
parent 3730694465
commit ef5db2babd
2 changed files with 762 additions and 106 deletions

View File

@@ -16,6 +16,7 @@ from utils.markdown_to_image import get_md2img_health_snapshot, warmup_md2img_br
from utils.ai.llm_registry import LLMRegistry
from base.plugin_common.plugin_interface import PluginStatus
from utils.ai.unified_llm import UnifiedLLMClient
from utils.decorator.async_job import async_job
# 创建系统信息蓝图
system_bp = Blueprint('system', __name__)
@@ -42,6 +43,442 @@ def _save_system_yaml(config_obj: dict) -> None:
yaml.safe_dump(config_obj, f, allow_unicode=True, sort_keys=False)
def _safe_int(value, default: int = 0) -> int:
"""把数据库 / Redis 返回的字符串数字安全转成整数。"""
try:
if value in (None, ""):
return default
return int(float(value))
except (TypeError, ValueError):
return default
def _safe_float(value, default: float = 0.0) -> float:
"""把数据库 / Redis 返回的值安全转成浮点数。"""
try:
if value in (None, ""):
return default
return float(value)
except (TypeError, ValueError):
return default
def _format_bytes_to_mb(value: int) -> float:
"""把字节数转换为 MB保留两位小数便于首页摘要展示。"""
return round((_safe_float(value, 0.0) / 1024 / 1024), 2)
def _extract_mysql_runtime_snapshot(db_manager) -> dict:
"""采集 MySQL 运行态摘要。
首页目标不是替代 DBA 工具,而是让管理员一眼判断:
1. 数据库是不是活着;
2. 当前连接压力高不高;
3. 当前库规模是否已经明显变大;
4. 有没有必要继续深入到更专业的监控页排查。
"""
snapshot = {
"status": "healthy",
"summary": "连接正常",
"database": db_manager.get_mysql_database_name(),
"version": "",
"threads_connected": 0,
"threads_running": 0,
"max_connections": 0,
"connection_usage_percent": 0.0,
"questions_per_second": 0.0,
"uptime_seconds": 0,
"table_count": 0,
"schema_size_mb": 0.0,
"slow_query_threshold_ms": db_manager.get_slow_query_threshold_ms(),
}
mysql_conn = db_manager.get_mysql_connection()
try:
with mysql_conn.cursor(dictionary=True) as cursor:
# 基础探活与版本识别:
# 1. SELECT VERSION() 成本极低;
# 2. 相比只做 SELECT 1它还能顺便拿到版本信息
# 3. 首页卡片里显示版本,方便线上排查“是不是某台库版本不一致”。
cursor.execute("SELECT VERSION() AS version, DATABASE() AS database_name")
version_row = cursor.fetchone() or {}
snapshot["version"] = str(version_row.get("version") or "").strip()
snapshot["database"] = str(version_row.get("database_name") or snapshot["database"] or "").strip()
cursor.execute(
"""
SHOW GLOBAL STATUS
WHERE Variable_name IN ('Threads_connected', 'Threads_running', 'Questions', 'Uptime')
"""
)
status_rows = cursor.fetchall() or []
status_map = {
str(row.get("Variable_name") or "").strip(): row.get("Value")
for row in status_rows
}
cursor.execute(
"""
SHOW GLOBAL VARIABLES
WHERE Variable_name IN ('max_connections')
"""
)
variable_rows = cursor.fetchall() or []
variable_map = {
str(row.get("Variable_name") or "").strip(): row.get("Value")
for row in variable_rows
}
# information_schema 聚合虽然比 SELECT 1 重一点,但仍属于轻量级元信息查询:
# 1. 只在首页 30 秒级刷新一次,成本可接受;
# 2. 能直接给出当前业务库表数量与体量变化;
# 3. 对判断“是不是消息表膨胀导致后台变慢”很有帮助。
cursor.execute(
"""
SELECT
COUNT(*) AS table_count,
COALESCE(SUM(data_length + index_length), 0) AS schema_size_bytes
FROM information_schema.tables
WHERE table_schema = DATABASE()
"""
)
schema_row = cursor.fetchone() or {}
snapshot["threads_connected"] = _safe_int(status_map.get("Threads_connected"))
snapshot["threads_running"] = _safe_int(status_map.get("Threads_running"))
snapshot["max_connections"] = _safe_int(variable_map.get("max_connections"))
snapshot["uptime_seconds"] = _safe_int(status_map.get("Uptime"))
total_questions = _safe_int(status_map.get("Questions"))
if snapshot["uptime_seconds"] > 0:
snapshot["questions_per_second"] = round(total_questions / snapshot["uptime_seconds"], 2)
if snapshot["max_connections"] > 0:
snapshot["connection_usage_percent"] = round(
(snapshot["threads_connected"] / snapshot["max_connections"]) * 100,
1,
)
snapshot["table_count"] = _safe_int(schema_row.get("table_count"))
snapshot["schema_size_mb"] = _format_bytes_to_mb(schema_row.get("schema_size_bytes"))
if snapshot["connection_usage_percent"] >= 80 or snapshot["threads_running"] >= 12:
snapshot["status"] = "warning"
snapshot["summary"] = (
f"连接压力偏高:已连接 {snapshot['threads_connected']} / {snapshot['max_connections']}"
f"运行中线程 {snapshot['threads_running']}"
)
else:
snapshot["summary"] = (
f"连接正常:已连接 {snapshot['threads_connected']} / {snapshot['max_connections'] or '-'}"
f"QPS {snapshot['questions_per_second']}"
)
return snapshot
except Exception as mysql_error:
snapshot["status"] = "danger"
snapshot["summary"] = f"MySQL 探测失败: {mysql_error}"
return snapshot
finally:
mysql_conn.close()
def _extract_redis_runtime_snapshot(db_manager) -> dict:
"""采集 Redis 运行态摘要。"""
redis_config = getattr(db_manager, "redis_config", {}) or {}
snapshot = {
"status": "healthy",
"summary": "连接正常",
"db_index": _safe_int(redis_config.get("db", 0)),
"key_count": 0,
"connected_clients": 0,
"blocked_clients": 0,
"ops_per_sec": 0,
"used_memory_human": "",
"used_memory_peak_human": "",
"memory_usage_percent": 0.0,
"uptime_seconds": 0,
"hit_rate_percent": 0.0,
}
try:
redis_conn = db_manager.get_redis_connection()
redis_conn.ping()
info = redis_conn.info() or {}
snapshot["key_count"] = _safe_int(redis_conn.dbsize())
snapshot["connected_clients"] = _safe_int(info.get("connected_clients"))
snapshot["blocked_clients"] = _safe_int(info.get("blocked_clients"))
snapshot["ops_per_sec"] = _safe_int(info.get("instantaneous_ops_per_sec"))
snapshot["used_memory_human"] = str(info.get("used_memory_human") or "").strip()
snapshot["used_memory_peak_human"] = str(info.get("used_memory_peak_human") or "").strip()
snapshot["uptime_seconds"] = _safe_int(info.get("uptime_in_seconds"))
maxmemory = _safe_int(info.get("maxmemory"))
used_memory = _safe_int(info.get("used_memory"))
if maxmemory > 0:
snapshot["memory_usage_percent"] = round((used_memory / maxmemory) * 100, 1)
keyspace_hits = _safe_int(info.get("keyspace_hits"))
keyspace_misses = _safe_int(info.get("keyspace_misses"))
if (keyspace_hits + keyspace_misses) > 0:
snapshot["hit_rate_percent"] = round(
(keyspace_hits / (keyspace_hits + keyspace_misses)) * 100,
1,
)
if snapshot["blocked_clients"] > 0 or snapshot["memory_usage_percent"] >= 80:
snapshot["status"] = "warning"
snapshot["summary"] = (
f"缓存压力需关注keys {snapshot['key_count']}"
f"clients {snapshot['connected_clients']}ops/s {snapshot['ops_per_sec']}"
)
else:
snapshot["summary"] = (
f"缓存正常keys {snapshot['key_count']}"
f"clients {snapshot['connected_clients']}ops/s {snapshot['ops_per_sec']}"
)
return snapshot
except Exception as redis_error:
snapshot["status"] = "danger"
snapshot["summary"] = f"Redis 探测失败: {redis_error}"
return snapshot
def _parse_snapshot_datetime(value: str | None) -> datetime | None:
"""把首页摘要里常用的时间字符串安全转换为 datetime。"""
text = str(value or "").strip()
if not text:
return None
try:
return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
except ValueError:
return None
def _count_enabled_runtime_items(items) -> int:
"""统计启用项数量。"""
rows = []
if isinstance(items, dict):
rows = list(items.values())
elif isinstance(items, list):
rows = list(items)
count = 0
for row in rows:
if not isinstance(row, dict):
continue
if "enabled" not in row or bool(row.get("enabled", True)):
count += 1
return count
def _extract_llm_catalog_summary() -> dict:
"""提取首页 LLM 路由配置摘要。"""
try:
catalog = LLMRegistry.get_catalog() or {}
if catalog:
providers = catalog.get("providers", {}) or {}
dify_apps = catalog.get("dify_apps", {}) or {}
backends = catalog.get("backends", {}) or {}
scenes = catalog.get("scenes", {}) or {}
default_scene = str(catalog.get("default_scene") or "").strip()
default_backend = str(LLMRegistry.get_scene_backend_name(default_scene) or "").strip() if default_scene else ""
return {
"provider_count": _count_enabled_runtime_items(providers),
"scene_count": _count_enabled_runtime_items(scenes),
"target_count": _count_enabled_runtime_items(backends) + _count_enabled_runtime_items(dify_apps),
"default_scene": default_scene,
"default_backend": default_backend,
"has_routing": _count_enabled_runtime_items(scenes) > 0,
}
legacy_llm = LLMRegistry.get_llm_config() or {}
scenes = legacy_llm.get("scenes", {}) or {}
backends = legacy_llm.get("backends", {}) or {}
default_backend = str(legacy_llm.get("default_backend") or "").strip()
return {
"provider_count": 0,
"scene_count": len(scenes) if isinstance(scenes, dict) else 0,
"target_count": len(backends) if isinstance(backends, dict) else 0,
"default_scene": "",
"default_backend": default_backend,
"has_routing": bool(scenes) or bool(default_backend),
}
except Exception as llm_catalog_error:
logger.warning(f"提取 LLM 路由摘要失败: {llm_catalog_error}")
return {
"provider_count": 0,
"scene_count": 0,
"target_count": 0,
"default_scene": "",
"default_backend": "",
"has_routing": False,
}
def _extract_ai_runtime_snapshot() -> dict:
"""构建首页 LLM 运行态摘要。"""
runtime_snapshot = UnifiedLLMClient.get_runtime_snapshot() or {}
last_call = dict(runtime_snapshot.get("last_call") or {})
catalog_summary = _extract_llm_catalog_summary()
total_calls = _safe_int(runtime_snapshot.get("total_calls"))
failed_calls = _safe_int(runtime_snapshot.get("failed_calls"))
success_rate = _safe_float(runtime_snapshot.get("success_rate"))
avg_latency_ms = _safe_float(runtime_snapshot.get("avg_latency_ms"))
last_error = str(runtime_snapshot.get("last_error") or "").strip()
snapshot = {
**runtime_snapshot,
"last_call": last_call,
"provider_count": catalog_summary.get("provider_count", 0),
"scene_count": catalog_summary.get("scene_count", 0),
"target_count": catalog_summary.get("target_count", 0),
"default_scene": catalog_summary.get("default_scene", ""),
"default_backend": catalog_summary.get("default_backend", ""),
"has_routing": bool(catalog_summary.get("has_routing")),
"last_provider": str(last_call.get("provider") or "").strip(),
"last_backend": str(last_call.get("backend") or "").strip(),
"last_scene": str(last_call.get("scene") or "").strip(),
"last_model": str(last_call.get("model") or "").strip(),
"last_timestamp": str(last_call.get("timestamp") or "").strip(),
"last_latency_ms": _safe_float(last_call.get("latency_ms")),
"last_error": last_error,
}
if not snapshot["has_routing"]:
snapshot["status"] = "warning"
snapshot["summary"] = "当前未发现完整的 LLM 路由配置,建议先检查默认场景与后端绑定"
return snapshot
if total_calls <= 0:
snapshot["status"] = "warning"
snapshot["summary"] = (
f"已配置 {snapshot['scene_count']} 个场景、{snapshot['target_count']} 个目标,"
"最近窗口内暂无统一 LLM 调用记录"
)
return snapshot
if failed_calls >= total_calls and total_calls > 0:
snapshot["status"] = "danger"
snapshot["summary"] = (
f"最近 {total_calls} 次调用全部失败,成功率 {success_rate:.2f}%"
f"平均耗时 {avg_latency_ms:.2f}ms"
)
return snapshot
if failed_calls > 0 or last_error:
snapshot["status"] = "warning"
snapshot["summary"] = (
f"最近 {total_calls} 次调用中失败 {failed_calls} 次,成功率 {success_rate:.2f}%"
f"平均耗时 {avg_latency_ms:.2f}ms"
)
return snapshot
snapshot["status"] = "healthy"
snapshot["summary"] = (
f"最近 {total_calls} 次调用全部成功,成功率 {success_rate:.2f}%"
f"平均耗时 {avg_latency_ms:.2f}ms"
)
return snapshot
def _extract_scheduler_runtime_snapshot() -> dict:
"""聚合 async_job 运行态,生成首页任务调度摘要。"""
runtime_rows = async_job.get_jobs_snapshot()
next_run_candidates = []
failed_rows = []
system_job_count = 0
plugin_job_count = 0
for row in runtime_rows:
job_key = str(row.get("job_key") or "").strip()
owner_name = str(row.get("owner_name") or "system").strip().lower()
next_run_at = _parse_snapshot_datetime(row.get("next_run_at"))
last_status = str(row.get("last_status") or "").strip().lower()
if job_key.startswith("plugin_schedule:") or owner_name != "system":
plugin_job_count += 1
else:
system_job_count += 1
if bool(row.get("enabled")) and next_run_at:
next_run_candidates.append(next_run_at)
if last_status in {"failed", "invalid_schedule"}:
failed_rows.append(row)
latest_failed_row = {}
if failed_rows:
failed_rows.sort(
key=lambda row: (
_parse_snapshot_datetime(row.get("updated_at"))
or _parse_snapshot_datetime(row.get("last_run_at"))
or datetime.min
),
reverse=True,
)
latest_failed_row = failed_rows[0]
invalid_jobs = sum(
1 for row in runtime_rows if str(row.get("last_status") or "").strip().lower() == "invalid_schedule"
)
total_jobs = len(runtime_rows)
enabled_jobs = sum(1 for row in runtime_rows if bool(row.get("enabled")))
running_jobs = sum(1 for row in runtime_rows if bool(row.get("running")))
failed_jobs = len(failed_rows)
paused_jobs = total_jobs - enabled_jobs
never_run_jobs = sum(1 for row in runtime_rows if str(row.get("last_status") or "").strip().lower() == "never")
next_run_at_text = min(next_run_candidates).strftime("%Y-%m-%d %H:%M:%S") if next_run_candidates else ""
latest_failed_error = str(latest_failed_row.get("last_error") or "").strip()
if len(latest_failed_error) > 120:
latest_failed_error = f"{latest_failed_error[:117]}..."
snapshot = {
"status": "healthy",
"summary": "任务调度运行正常",
"total_jobs": total_jobs,
"enabled_jobs": enabled_jobs,
"running_jobs": running_jobs,
"failed_jobs": failed_jobs,
"invalid_jobs": invalid_jobs,
"paused_jobs": paused_jobs,
"never_run_jobs": never_run_jobs,
"system_job_count": system_job_count,
"plugin_job_count": plugin_job_count,
"next_run_at": next_run_at_text,
"latest_failed_job_name": str(latest_failed_row.get("name") or "").strip(),
"latest_failed_error": latest_failed_error,
}
if total_jobs <= 0:
snapshot["status"] = "warning"
snapshot["summary"] = "当前没有加载任何定时任务"
return snapshot
if invalid_jobs > 0:
snapshot["status"] = "danger"
snapshot["summary"] = f"发现 {invalid_jobs} 个任务调度配置非法,建议立即检查任务页"
return snapshot
if failed_jobs > 0:
snapshot["status"] = "warning"
snapshot["summary"] = (
f"最近有 {failed_jobs} 个任务执行失败,"
f"下一次执行 {next_run_at_text or '暂未计算'}"
)
return snapshot
if enabled_jobs <= 0:
snapshot["status"] = "warning"
snapshot["summary"] = "任务已加载,但当前没有启用中的调度任务"
return snapshot
if running_jobs > 0:
snapshot["summary"] = (
f"当前有 {running_jobs} 个任务执行中,"
f"下一次执行 {next_run_at_text or '暂未计算'}"
)
return snapshot
snapshot["summary"] = f"已启用 {enabled_jobs} 个任务,下一次执行 {next_run_at_text or '暂未计算'}"
return snapshot
def _legacy_llm_to_catalog(legacy_llm: dict) -> dict:
"""把旧 llm(backends/scenes) 结构转换为新目录结构(仅用于兜底展示)。
@@ -405,45 +842,11 @@ def api_system_health_summary():
_, recent_error_count = server.stats_db.get_error_logs(days=1, page=1, limit=1)
# 基础设施健康:
# 1. MySQL 用最轻量的 SELECT 1 做可用性探测
# 2. Redis 用 PING 验证连接池当前是否可拿到可用连接
# 1. MySQL / Redis 都在这里做“首页摘要级”探测,而不是完整深度巡检
# 2. 除了连通性,还补充少量负载指标,方便管理员快速判断是否需要继续下钻
# 3. 即使探测失败也只反馈到看板,不影响主接口整体返回。
mysql_status = "healthy"
mysql_summary = "连接正常"
try:
mysql_conn = server.db_manager.get_mysql_connection()
try:
with mysql_conn.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
finally:
mysql_conn.close()
except Exception as mysql_error:
mysql_status = "danger"
mysql_summary = f"MySQL 探测失败: {mysql_error}"
redis_status = "healthy"
redis_summary = "连接正常"
try:
redis_conn = server.db_manager.get_redis_connection()
redis_conn.ping()
except Exception as redis_error:
redis_status = "danger"
redis_summary = f"Redis 探测失败: {redis_error}"
# md2img 健康快照已经有现成实现,这里只做聚合,不主动预热运行时。
md2img_snapshot = get_md2img_health_snapshot(ensure_runtime=False) or {}
browser_ready = bool(
md2img_snapshot.get("browser_ready")
or md2img_snapshot.get("playwright_ready")
or md2img_snapshot.get("ready")
)
runtime_ready = bool(
md2img_snapshot.get("runtime_ready")
or md2img_snapshot.get("runtime_initialized")
or md2img_snapshot.get("initialized")
)
md2img_healthy = runtime_ready and browser_ready
mysql_snapshot = _extract_mysql_runtime_snapshot(server.db_manager)
redis_snapshot = _extract_redis_runtime_snapshot(server.db_manager)
# 首页只需要“够判断”的轻量结论,因此统一产出 status + summary 文本,前端无需重复拼装业务规则。
robot_running = bool(getattr(robot, "ipad_running", False))
@@ -470,37 +873,11 @@ def api_system_health_summary():
error_status = "healthy"
error_summary = "近 24 小时未记录到异常"
if md2img_healthy:
md2img_status = "healthy"
md2img_summary = "运行时与浏览器均已就绪"
elif runtime_ready or browser_ready:
md2img_status = "warning"
md2img_summary = "运行时部分可用,建议检查预热状态"
else:
md2img_status = "danger"
md2img_summary = "运行时未就绪,相关转图能力可能不可用"
# 首页 AI 卡片升级为“运行态 + 路由摘要”,仍然保持被动观测,不主动探活。
ai_runtime = _extract_ai_runtime_snapshot()
# AI 运行态:
# 1. 统一从 UnifiedLLMClient 最近调用窗口读取,避免各插件单独维护监控数据;
# 2. 若当前窗口还没有调用记录,就明确返回“暂无调用”,避免误判成异常。
ai_runtime = UnifiedLLMClient.get_runtime_snapshot()
ai_total_calls = int(ai_runtime.get("total_calls") or 0)
ai_failed_calls = int(ai_runtime.get("failed_calls") or 0)
if ai_total_calls <= 0:
ai_status = "warning"
ai_summary = "最近窗口内暂无统一 LLM 调用记录"
elif ai_failed_calls > 0:
ai_status = "warning"
ai_summary = (
f"最近 {ai_total_calls} 次调用中失败 {ai_failed_calls} 次,"
f"平均耗时 {ai_runtime.get('avg_latency_ms', 0)}ms"
)
else:
ai_status = "healthy"
ai_summary = (
f"最近 {ai_total_calls} 次调用全部成功,"
f"平均耗时 {ai_runtime.get('avg_latency_ms', 0)}ms"
)
# Markdown 转图更适合保留在专门页面里排障,首页右侧改成更通用的任务调度摘要。
scheduler_runtime = _extract_scheduler_runtime_snapshot()
return jsonify({
"success": True,
@@ -524,33 +901,28 @@ def api_system_health_summary():
"summary": error_summary,
},
"infrastructure": {
"status": "healthy" if mysql_status == "healthy" and redis_status == "healthy" else "danger",
"status": (
"danger"
if "danger" in {mysql_snapshot.get("status"), redis_snapshot.get("status")}
else ("warning" if "warning" in {mysql_snapshot.get("status"), redis_snapshot.get("status")} else "healthy")
),
"summary": (
"MySQL / Redis 均正常"
if mysql_status == "healthy" and redis_status == "healthy"
else "存在基础设施连接异常"
if mysql_snapshot.get("status") == "healthy" and redis_snapshot.get("status") == "healthy"
else (
"基础设施连接正常,但部分负载指标需要关注"
if mysql_snapshot.get("status") != "danger" and redis_snapshot.get("status") != "danger"
else "存在基础设施连接异常"
)
),
"mysql": {
"status": mysql_status,
"summary": mysql_summary,
},
"redis": {
"status": redis_status,
"summary": redis_summary,
},
"mysql": mysql_snapshot,
"redis": redis_snapshot,
},
"ai_runtime": {
"status": ai_status,
"summary": ai_summary,
**ai_runtime,
},
"md2img": {
"status": md2img_status,
"healthy": md2img_healthy,
"runtime_ready": runtime_ready,
"browser_ready": browser_ready,
"summary": md2img_summary,
"detail": md2img_snapshot,
"scheduler": {
**scheduler_runtime,
},
}
})