diff --git a/admin/dashboard/blueprints/system.py b/admin/dashboard/blueprints/system.py index 37db1ff..61a1000 100644 --- a/admin/dashboard/blueprints/system.py +++ b/admin/dashboard/blueprints/system.py @@ -16,6 +16,7 @@ from utils.markdown_to_image import get_md2img_health_snapshot, warmup_md2img_br from utils.ai.llm_registry import LLMRegistry from base.plugin_common.plugin_interface import PluginStatus from utils.ai.unified_llm import UnifiedLLMClient +from utils.decorator.async_job import async_job # 创建系统信息蓝图 system_bp = Blueprint('system', __name__) @@ -42,6 +43,442 @@ def _save_system_yaml(config_obj: dict) -> None: yaml.safe_dump(config_obj, f, allow_unicode=True, sort_keys=False) +def _safe_int(value, default: int = 0) -> int: + """把数据库 / Redis 返回的字符串数字安全转成整数。""" + try: + if value in (None, ""): + return default + return int(float(value)) + except (TypeError, ValueError): + return default + + +def _safe_float(value, default: float = 0.0) -> float: + """把数据库 / Redis 返回的值安全转成浮点数。""" + try: + if value in (None, ""): + return default + return float(value) + except (TypeError, ValueError): + return default + + +def _format_bytes_to_mb(value: int) -> float: + """把字节数转换为 MB,保留两位小数便于首页摘要展示。""" + return round((_safe_float(value, 0.0) / 1024 / 1024), 2) + + +def _extract_mysql_runtime_snapshot(db_manager) -> dict: + """采集 MySQL 运行态摘要。 + + 首页目标不是替代 DBA 工具,而是让管理员一眼判断: + 1. 数据库是不是活着; + 2. 当前连接压力高不高; + 3. 当前库规模是否已经明显变大; + 4. 有没有必要继续深入到更专业的监控页排查。 + """ + snapshot = { + "status": "healthy", + "summary": "连接正常", + "database": db_manager.get_mysql_database_name(), + "version": "", + "threads_connected": 0, + "threads_running": 0, + "max_connections": 0, + "connection_usage_percent": 0.0, + "questions_per_second": 0.0, + "uptime_seconds": 0, + "table_count": 0, + "schema_size_mb": 0.0, + "slow_query_threshold_ms": db_manager.get_slow_query_threshold_ms(), + } + + mysql_conn = db_manager.get_mysql_connection() + try: + with mysql_conn.cursor(dictionary=True) as cursor: + # 基础探活与版本识别: + # 1. SELECT VERSION() 成本极低; + # 2. 相比只做 SELECT 1,它还能顺便拿到版本信息; + # 3. 首页卡片里显示版本,方便线上排查“是不是某台库版本不一致”。 + cursor.execute("SELECT VERSION() AS version, DATABASE() AS database_name") + version_row = cursor.fetchone() or {} + snapshot["version"] = str(version_row.get("version") or "").strip() + snapshot["database"] = str(version_row.get("database_name") or snapshot["database"] or "").strip() + + cursor.execute( + """ + SHOW GLOBAL STATUS + WHERE Variable_name IN ('Threads_connected', 'Threads_running', 'Questions', 'Uptime') + """ + ) + status_rows = cursor.fetchall() or [] + status_map = { + str(row.get("Variable_name") or "").strip(): row.get("Value") + for row in status_rows + } + + cursor.execute( + """ + SHOW GLOBAL VARIABLES + WHERE Variable_name IN ('max_connections') + """ + ) + variable_rows = cursor.fetchall() or [] + variable_map = { + str(row.get("Variable_name") or "").strip(): row.get("Value") + for row in variable_rows + } + + # information_schema 聚合虽然比 SELECT 1 重一点,但仍属于轻量级元信息查询: + # 1. 只在首页 30 秒级刷新一次,成本可接受; + # 2. 能直接给出当前业务库表数量与体量变化; + # 3. 对判断“是不是消息表膨胀导致后台变慢”很有帮助。 + cursor.execute( + """ + SELECT + COUNT(*) AS table_count, + COALESCE(SUM(data_length + index_length), 0) AS schema_size_bytes + FROM information_schema.tables + WHERE table_schema = DATABASE() + """ + ) + schema_row = cursor.fetchone() or {} + + snapshot["threads_connected"] = _safe_int(status_map.get("Threads_connected")) + snapshot["threads_running"] = _safe_int(status_map.get("Threads_running")) + snapshot["max_connections"] = _safe_int(variable_map.get("max_connections")) + snapshot["uptime_seconds"] = _safe_int(status_map.get("Uptime")) + total_questions = _safe_int(status_map.get("Questions")) + if snapshot["uptime_seconds"] > 0: + snapshot["questions_per_second"] = round(total_questions / snapshot["uptime_seconds"], 2) + if snapshot["max_connections"] > 0: + snapshot["connection_usage_percent"] = round( + (snapshot["threads_connected"] / snapshot["max_connections"]) * 100, + 1, + ) + snapshot["table_count"] = _safe_int(schema_row.get("table_count")) + snapshot["schema_size_mb"] = _format_bytes_to_mb(schema_row.get("schema_size_bytes")) + + if snapshot["connection_usage_percent"] >= 80 or snapshot["threads_running"] >= 12: + snapshot["status"] = "warning" + snapshot["summary"] = ( + f"连接压力偏高:已连接 {snapshot['threads_connected']} / {snapshot['max_connections']}," + f"运行中线程 {snapshot['threads_running']}" + ) + else: + snapshot["summary"] = ( + f"连接正常:已连接 {snapshot['threads_connected']} / {snapshot['max_connections'] or '-'}," + f"QPS {snapshot['questions_per_second']}" + ) + return snapshot + except Exception as mysql_error: + snapshot["status"] = "danger" + snapshot["summary"] = f"MySQL 探测失败: {mysql_error}" + return snapshot + finally: + mysql_conn.close() + + +def _extract_redis_runtime_snapshot(db_manager) -> dict: + """采集 Redis 运行态摘要。""" + redis_config = getattr(db_manager, "redis_config", {}) or {} + snapshot = { + "status": "healthy", + "summary": "连接正常", + "db_index": _safe_int(redis_config.get("db", 0)), + "key_count": 0, + "connected_clients": 0, + "blocked_clients": 0, + "ops_per_sec": 0, + "used_memory_human": "", + "used_memory_peak_human": "", + "memory_usage_percent": 0.0, + "uptime_seconds": 0, + "hit_rate_percent": 0.0, + } + + try: + redis_conn = db_manager.get_redis_connection() + redis_conn.ping() + info = redis_conn.info() or {} + snapshot["key_count"] = _safe_int(redis_conn.dbsize()) + snapshot["connected_clients"] = _safe_int(info.get("connected_clients")) + snapshot["blocked_clients"] = _safe_int(info.get("blocked_clients")) + snapshot["ops_per_sec"] = _safe_int(info.get("instantaneous_ops_per_sec")) + snapshot["used_memory_human"] = str(info.get("used_memory_human") or "").strip() + snapshot["used_memory_peak_human"] = str(info.get("used_memory_peak_human") or "").strip() + snapshot["uptime_seconds"] = _safe_int(info.get("uptime_in_seconds")) + + maxmemory = _safe_int(info.get("maxmemory")) + used_memory = _safe_int(info.get("used_memory")) + if maxmemory > 0: + snapshot["memory_usage_percent"] = round((used_memory / maxmemory) * 100, 1) + + keyspace_hits = _safe_int(info.get("keyspace_hits")) + keyspace_misses = _safe_int(info.get("keyspace_misses")) + if (keyspace_hits + keyspace_misses) > 0: + snapshot["hit_rate_percent"] = round( + (keyspace_hits / (keyspace_hits + keyspace_misses)) * 100, + 1, + ) + + if snapshot["blocked_clients"] > 0 or snapshot["memory_usage_percent"] >= 80: + snapshot["status"] = "warning" + snapshot["summary"] = ( + f"缓存压力需关注:keys {snapshot['key_count']}," + f"clients {snapshot['connected_clients']},ops/s {snapshot['ops_per_sec']}" + ) + else: + snapshot["summary"] = ( + f"缓存正常:keys {snapshot['key_count']}," + f"clients {snapshot['connected_clients']},ops/s {snapshot['ops_per_sec']}" + ) + return snapshot + except Exception as redis_error: + snapshot["status"] = "danger" + snapshot["summary"] = f"Redis 探测失败: {redis_error}" + return snapshot + + +def _parse_snapshot_datetime(value: str | None) -> datetime | None: + """把首页摘要里常用的时间字符串安全转换为 datetime。""" + text = str(value or "").strip() + if not text: + return None + try: + return datetime.strptime(text, "%Y-%m-%d %H:%M:%S") + except ValueError: + return None + + +def _count_enabled_runtime_items(items) -> int: + """统计启用项数量。""" + rows = [] + if isinstance(items, dict): + rows = list(items.values()) + elif isinstance(items, list): + rows = list(items) + count = 0 + for row in rows: + if not isinstance(row, dict): + continue + if "enabled" not in row or bool(row.get("enabled", True)): + count += 1 + return count + + +def _extract_llm_catalog_summary() -> dict: + """提取首页 LLM 路由配置摘要。""" + try: + catalog = LLMRegistry.get_catalog() or {} + if catalog: + providers = catalog.get("providers", {}) or {} + dify_apps = catalog.get("dify_apps", {}) or {} + backends = catalog.get("backends", {}) or {} + scenes = catalog.get("scenes", {}) or {} + default_scene = str(catalog.get("default_scene") or "").strip() + default_backend = str(LLMRegistry.get_scene_backend_name(default_scene) or "").strip() if default_scene else "" + return { + "provider_count": _count_enabled_runtime_items(providers), + "scene_count": _count_enabled_runtime_items(scenes), + "target_count": _count_enabled_runtime_items(backends) + _count_enabled_runtime_items(dify_apps), + "default_scene": default_scene, + "default_backend": default_backend, + "has_routing": _count_enabled_runtime_items(scenes) > 0, + } + + legacy_llm = LLMRegistry.get_llm_config() or {} + scenes = legacy_llm.get("scenes", {}) or {} + backends = legacy_llm.get("backends", {}) or {} + default_backend = str(legacy_llm.get("default_backend") or "").strip() + return { + "provider_count": 0, + "scene_count": len(scenes) if isinstance(scenes, dict) else 0, + "target_count": len(backends) if isinstance(backends, dict) else 0, + "default_scene": "", + "default_backend": default_backend, + "has_routing": bool(scenes) or bool(default_backend), + } + except Exception as llm_catalog_error: + logger.warning(f"提取 LLM 路由摘要失败: {llm_catalog_error}") + return { + "provider_count": 0, + "scene_count": 0, + "target_count": 0, + "default_scene": "", + "default_backend": "", + "has_routing": False, + } + + +def _extract_ai_runtime_snapshot() -> dict: + """构建首页 LLM 运行态摘要。""" + runtime_snapshot = UnifiedLLMClient.get_runtime_snapshot() or {} + last_call = dict(runtime_snapshot.get("last_call") or {}) + catalog_summary = _extract_llm_catalog_summary() + + total_calls = _safe_int(runtime_snapshot.get("total_calls")) + failed_calls = _safe_int(runtime_snapshot.get("failed_calls")) + success_rate = _safe_float(runtime_snapshot.get("success_rate")) + avg_latency_ms = _safe_float(runtime_snapshot.get("avg_latency_ms")) + last_error = str(runtime_snapshot.get("last_error") or "").strip() + + snapshot = { + **runtime_snapshot, + "last_call": last_call, + "provider_count": catalog_summary.get("provider_count", 0), + "scene_count": catalog_summary.get("scene_count", 0), + "target_count": catalog_summary.get("target_count", 0), + "default_scene": catalog_summary.get("default_scene", ""), + "default_backend": catalog_summary.get("default_backend", ""), + "has_routing": bool(catalog_summary.get("has_routing")), + "last_provider": str(last_call.get("provider") or "").strip(), + "last_backend": str(last_call.get("backend") or "").strip(), + "last_scene": str(last_call.get("scene") or "").strip(), + "last_model": str(last_call.get("model") or "").strip(), + "last_timestamp": str(last_call.get("timestamp") or "").strip(), + "last_latency_ms": _safe_float(last_call.get("latency_ms")), + "last_error": last_error, + } + + if not snapshot["has_routing"]: + snapshot["status"] = "warning" + snapshot["summary"] = "当前未发现完整的 LLM 路由配置,建议先检查默认场景与后端绑定" + return snapshot + + if total_calls <= 0: + snapshot["status"] = "warning" + snapshot["summary"] = ( + f"已配置 {snapshot['scene_count']} 个场景、{snapshot['target_count']} 个目标," + "最近窗口内暂无统一 LLM 调用记录" + ) + return snapshot + + if failed_calls >= total_calls and total_calls > 0: + snapshot["status"] = "danger" + snapshot["summary"] = ( + f"最近 {total_calls} 次调用全部失败,成功率 {success_rate:.2f}%," + f"平均耗时 {avg_latency_ms:.2f}ms" + ) + return snapshot + + if failed_calls > 0 or last_error: + snapshot["status"] = "warning" + snapshot["summary"] = ( + f"最近 {total_calls} 次调用中失败 {failed_calls} 次,成功率 {success_rate:.2f}%," + f"平均耗时 {avg_latency_ms:.2f}ms" + ) + return snapshot + + snapshot["status"] = "healthy" + snapshot["summary"] = ( + f"最近 {total_calls} 次调用全部成功,成功率 {success_rate:.2f}%," + f"平均耗时 {avg_latency_ms:.2f}ms" + ) + return snapshot + + +def _extract_scheduler_runtime_snapshot() -> dict: + """聚合 async_job 运行态,生成首页任务调度摘要。""" + runtime_rows = async_job.get_jobs_snapshot() + next_run_candidates = [] + failed_rows = [] + system_job_count = 0 + plugin_job_count = 0 + + for row in runtime_rows: + job_key = str(row.get("job_key") or "").strip() + owner_name = str(row.get("owner_name") or "system").strip().lower() + next_run_at = _parse_snapshot_datetime(row.get("next_run_at")) + last_status = str(row.get("last_status") or "").strip().lower() + + if job_key.startswith("plugin_schedule:") or owner_name != "system": + plugin_job_count += 1 + else: + system_job_count += 1 + + if bool(row.get("enabled")) and next_run_at: + next_run_candidates.append(next_run_at) + if last_status in {"failed", "invalid_schedule"}: + failed_rows.append(row) + + latest_failed_row = {} + if failed_rows: + failed_rows.sort( + key=lambda row: ( + _parse_snapshot_datetime(row.get("updated_at")) + or _parse_snapshot_datetime(row.get("last_run_at")) + or datetime.min + ), + reverse=True, + ) + latest_failed_row = failed_rows[0] + + invalid_jobs = sum( + 1 for row in runtime_rows if str(row.get("last_status") or "").strip().lower() == "invalid_schedule" + ) + total_jobs = len(runtime_rows) + enabled_jobs = sum(1 for row in runtime_rows if bool(row.get("enabled"))) + running_jobs = sum(1 for row in runtime_rows if bool(row.get("running"))) + failed_jobs = len(failed_rows) + paused_jobs = total_jobs - enabled_jobs + never_run_jobs = sum(1 for row in runtime_rows if str(row.get("last_status") or "").strip().lower() == "never") + next_run_at_text = min(next_run_candidates).strftime("%Y-%m-%d %H:%M:%S") if next_run_candidates else "" + latest_failed_error = str(latest_failed_row.get("last_error") or "").strip() + if len(latest_failed_error) > 120: + latest_failed_error = f"{latest_failed_error[:117]}..." + + snapshot = { + "status": "healthy", + "summary": "任务调度运行正常", + "total_jobs": total_jobs, + "enabled_jobs": enabled_jobs, + "running_jobs": running_jobs, + "failed_jobs": failed_jobs, + "invalid_jobs": invalid_jobs, + "paused_jobs": paused_jobs, + "never_run_jobs": never_run_jobs, + "system_job_count": system_job_count, + "plugin_job_count": plugin_job_count, + "next_run_at": next_run_at_text, + "latest_failed_job_name": str(latest_failed_row.get("name") or "").strip(), + "latest_failed_error": latest_failed_error, + } + + if total_jobs <= 0: + snapshot["status"] = "warning" + snapshot["summary"] = "当前没有加载任何定时任务" + return snapshot + + if invalid_jobs > 0: + snapshot["status"] = "danger" + snapshot["summary"] = f"发现 {invalid_jobs} 个任务调度配置非法,建议立即检查任务页" + return snapshot + + if failed_jobs > 0: + snapshot["status"] = "warning" + snapshot["summary"] = ( + f"最近有 {failed_jobs} 个任务执行失败," + f"下一次执行 {next_run_at_text or '暂未计算'}" + ) + return snapshot + + if enabled_jobs <= 0: + snapshot["status"] = "warning" + snapshot["summary"] = "任务已加载,但当前没有启用中的调度任务" + return snapshot + + if running_jobs > 0: + snapshot["summary"] = ( + f"当前有 {running_jobs} 个任务执行中," + f"下一次执行 {next_run_at_text or '暂未计算'}" + ) + return snapshot + + snapshot["summary"] = f"已启用 {enabled_jobs} 个任务,下一次执行 {next_run_at_text or '暂未计算'}" + return snapshot + + def _legacy_llm_to_catalog(legacy_llm: dict) -> dict: """把旧 llm(backends/scenes) 结构转换为新目录结构(仅用于兜底展示)。 @@ -405,45 +842,11 @@ def api_system_health_summary(): _, recent_error_count = server.stats_db.get_error_logs(days=1, page=1, limit=1) # 基础设施健康: - # 1. MySQL 用最轻量的 SELECT 1 做可用性探测; - # 2. Redis 用 PING 验证连接池当前是否可拿到可用连接; + # 1. MySQL / Redis 都在这里做“首页摘要级”探测,而不是完整深度巡检; + # 2. 除了连通性,还补充少量负载指标,方便管理员快速判断是否需要继续下钻; # 3. 即使探测失败也只反馈到看板,不影响主接口整体返回。 - mysql_status = "healthy" - mysql_summary = "连接正常" - try: - mysql_conn = server.db_manager.get_mysql_connection() - try: - with mysql_conn.cursor() as cursor: - cursor.execute("SELECT 1") - cursor.fetchone() - finally: - mysql_conn.close() - except Exception as mysql_error: - mysql_status = "danger" - mysql_summary = f"MySQL 探测失败: {mysql_error}" - - redis_status = "healthy" - redis_summary = "连接正常" - try: - redis_conn = server.db_manager.get_redis_connection() - redis_conn.ping() - except Exception as redis_error: - redis_status = "danger" - redis_summary = f"Redis 探测失败: {redis_error}" - - # md2img 健康快照已经有现成实现,这里只做聚合,不主动预热运行时。 - md2img_snapshot = get_md2img_health_snapshot(ensure_runtime=False) or {} - browser_ready = bool( - md2img_snapshot.get("browser_ready") - or md2img_snapshot.get("playwright_ready") - or md2img_snapshot.get("ready") - ) - runtime_ready = bool( - md2img_snapshot.get("runtime_ready") - or md2img_snapshot.get("runtime_initialized") - or md2img_snapshot.get("initialized") - ) - md2img_healthy = runtime_ready and browser_ready + mysql_snapshot = _extract_mysql_runtime_snapshot(server.db_manager) + redis_snapshot = _extract_redis_runtime_snapshot(server.db_manager) # 首页只需要“够判断”的轻量结论,因此统一产出 status + summary 文本,前端无需重复拼装业务规则。 robot_running = bool(getattr(robot, "ipad_running", False)) @@ -470,37 +873,11 @@ def api_system_health_summary(): error_status = "healthy" error_summary = "近 24 小时未记录到异常" - if md2img_healthy: - md2img_status = "healthy" - md2img_summary = "运行时与浏览器均已就绪" - elif runtime_ready or browser_ready: - md2img_status = "warning" - md2img_summary = "运行时部分可用,建议检查预热状态" - else: - md2img_status = "danger" - md2img_summary = "运行时未就绪,相关转图能力可能不可用" + # 首页 AI 卡片升级为“运行态 + 路由摘要”,仍然保持被动观测,不主动探活。 + ai_runtime = _extract_ai_runtime_snapshot() - # AI 运行态: - # 1. 统一从 UnifiedLLMClient 最近调用窗口读取,避免各插件单独维护监控数据; - # 2. 若当前窗口还没有调用记录,就明确返回“暂无调用”,避免误判成异常。 - ai_runtime = UnifiedLLMClient.get_runtime_snapshot() - ai_total_calls = int(ai_runtime.get("total_calls") or 0) - ai_failed_calls = int(ai_runtime.get("failed_calls") or 0) - if ai_total_calls <= 0: - ai_status = "warning" - ai_summary = "最近窗口内暂无统一 LLM 调用记录" - elif ai_failed_calls > 0: - ai_status = "warning" - ai_summary = ( - f"最近 {ai_total_calls} 次调用中失败 {ai_failed_calls} 次," - f"平均耗时 {ai_runtime.get('avg_latency_ms', 0)}ms" - ) - else: - ai_status = "healthy" - ai_summary = ( - f"最近 {ai_total_calls} 次调用全部成功," - f"平均耗时 {ai_runtime.get('avg_latency_ms', 0)}ms" - ) + # Markdown 转图更适合保留在专门页面里排障,首页右侧改成更通用的任务调度摘要。 + scheduler_runtime = _extract_scheduler_runtime_snapshot() return jsonify({ "success": True, @@ -524,33 +901,28 @@ def api_system_health_summary(): "summary": error_summary, }, "infrastructure": { - "status": "healthy" if mysql_status == "healthy" and redis_status == "healthy" else "danger", + "status": ( + "danger" + if "danger" in {mysql_snapshot.get("status"), redis_snapshot.get("status")} + else ("warning" if "warning" in {mysql_snapshot.get("status"), redis_snapshot.get("status")} else "healthy") + ), "summary": ( "MySQL / Redis 均正常" - if mysql_status == "healthy" and redis_status == "healthy" - else "存在基础设施连接异常" + if mysql_snapshot.get("status") == "healthy" and redis_snapshot.get("status") == "healthy" + else ( + "基础设施连接正常,但部分负载指标需要关注" + if mysql_snapshot.get("status") != "danger" and redis_snapshot.get("status") != "danger" + else "存在基础设施连接异常" + ) ), - "mysql": { - "status": mysql_status, - "summary": mysql_summary, - }, - "redis": { - "status": redis_status, - "summary": redis_summary, - }, + "mysql": mysql_snapshot, + "redis": redis_snapshot, }, "ai_runtime": { - "status": ai_status, - "summary": ai_summary, **ai_runtime, }, - "md2img": { - "status": md2img_status, - "healthy": md2img_healthy, - "runtime_ready": runtime_ready, - "browser_ready": browser_ready, - "summary": md2img_summary, - "detail": md2img_snapshot, + "scheduler": { + **scheduler_runtime, }, } }) diff --git a/admin/dashboard/templates/index.html b/admin/dashboard/templates/index.html index d14657d..a93d1c4 100644 --- a/admin/dashboard/templates/index.html +++ b/admin/dashboard/templates/index.html @@ -131,7 +131,7 @@
把连接状态、插件运行、异常数量与转图运行时集中到一个面板里。
+把连接状态、插件运行、异常数量、LLM 运行态与任务调度集中到一个面板里。