From 1a1306ec7160ec6ce892f27394aa8f1ee3ba9b52 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 16 Apr 2026 16:27:53 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=97=A5=E5=BF=97=E6=8C=81=E4=B9=85=E5=8C=96=EF=BC=9A?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=85=A5=E5=BA=93=E4=B8=8E=E5=90=8E=E5=8F=B0?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增系统任务日志表 t_system_job_logs,持久化记录每次执行结果、摘要、耗时、详情JSON\n- SystemJobLoader 注册任务时增加执行包装器:成功/失败均写入数据库日志,失败后继续抛出保证运行态状态一致\n- 系统任务后台日志接口改为查询数据库日志(不再依赖仅内存的 async_job logs),解决重启后日志丢失问题\n- 保持前端日志字段兼容,接口返回映射为 time/level/message 结构 --- admin/dashboard/blueprints/system_jobs.py | 22 ++++++-- db/system_job_db.py | 61 ++++++++++++++++++++++- utils/system_jobs.py | 35 ++++++++++++- 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/admin/dashboard/blueprints/system_jobs.py b/admin/dashboard/blueprints/system_jobs.py index ee2a884..d0dc029 100644 --- a/admin/dashboard/blueprints/system_jobs.py +++ b/admin/dashboard/blueprints/system_jobs.py @@ -94,12 +94,24 @@ def api_trigger_job(job_key: str): @system_jobs_bp.route("/api/jobs//logs", methods=["GET"]) @login_required def api_job_logs(job_key: str): - job_id = async_job.get_job_id_by_key(job_key) - if not job_id: - return jsonify({"success": True, "data": []}) - + server = current_app.dashboard_server limit = int(request.args.get("limit", 100)) - logs = async_job.get_job_logs(job_id, limit=limit) + db_logs = server.system_job_db.get_job_logs(job_key, limit=limit) + # 为了兼容前端既有表头(time/level/message),这里做一层字段映射。 + logs = [] + for row in db_logs: + status = str(row.get("status") or "") + level = "error" if status == "failed" else ("success" if status == "success" else "info") + logs.append( + { + "time": row.get("triggered_at"), + "level": level, + "message": row.get("summary") or "", + "status": status, + "duration_ms": row.get("duration_ms"), + "detail_json": row.get("detail_json") or {}, + } + ) return jsonify({"success": True, "data": logs}) diff --git a/db/system_job_db.py b/db/system_job_db.py index f855760..9ec0e81 100644 --- a/db/system_job_db.py +++ b/db/system_job_db.py @@ -15,7 +15,7 @@ class SystemJobDBOperator(BaseDBOperator): super().__init__(db_manager) def init_tables(self) -> bool: - """初始化系统任务配置表。""" + """初始化系统任务配置表与日志表。""" try: self.execute_update( """ @@ -31,6 +31,21 @@ class SystemJobDBOperator(BaseDBOperator): ) """ ) + # 系统任务执行日志表:用于持久化记录每次任务执行结果,避免重启后日志丢失。 + self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_system_job_logs ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + job_key VARCHAR(64) NOT NULL, + triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP, + status VARCHAR(32) NOT NULL, + summary VARCHAR(255) DEFAULT '', + detail_json JSON DEFAULT NULL, + duration_ms INT DEFAULT NULL, + INDEX idx_job_time (job_key, triggered_at) + ) + """ + ) return True except Exception as e: logger.error(f"初始化 t_system_jobs 失败: {e}") @@ -110,3 +125,47 @@ class SystemJobDBOperator(BaseDBOperator): def delete_job(self, job_key: str) -> bool: return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,)) + + def create_job_log( + self, + job_key: str, + status: str, + summary: str, + detail: Optional[Dict[str, Any]] = None, + duration_ms: Optional[int] = None, + ) -> bool: + """写入系统任务执行日志。""" + sql = """ + INSERT INTO t_system_job_logs (job_key, status, summary, detail_json, duration_ms) + VALUES (%s, %s, %s, %s, %s) + """ + params = ( + str(job_key), + str(status), + str(summary or ""), + json.dumps(detail or {}, ensure_ascii=False), + duration_ms if duration_ms is not None else None, + ) + return self.execute_update(sql, params) + + def get_job_logs(self, job_key: str, limit: int = 100) -> List[Dict[str, Any]]: + """获取系统任务持久化日志。""" + rows = self.execute_query( + """ + SELECT * FROM t_system_job_logs + WHERE job_key = %s + ORDER BY triggered_at DESC + LIMIT %s + """, + (str(job_key), int(limit)), + ) or [] + for row in rows: + detail = row.get("detail_json") + if isinstance(detail, str): + try: + row["detail_json"] = json.loads(detail) + except json.JSONDecodeError: + row["detail_json"] = {} + elif detail is None: + row["detail_json"] = {} + return rows diff --git a/utils/system_jobs.py b/utils/system_jobs.py index 03c9dac..18c4587 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import annotations +import inspect +from datetime import datetime from typing import Any, Awaitable, Callable, Dict, List from loguru import logger @@ -99,9 +101,38 @@ class SystemJobLoader: logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册") continue - handler = definition["handler"] + raw_handler = definition["handler"] + + async def _wrapped_handler(_handler=raw_handler, _job_key=job_key): + """系统任务执行包装器:执行业务并持久化日志。""" + started_at = datetime.now() + try: + result = _handler() + # 兼容同步/异步 handler 两种写法。 + if inspect.isawaitable(result): + await result + duration_ms = int((datetime.now() - started_at).total_seconds() * 1000) + self.db.create_job_log( + _job_key, + "success", + "执行成功", + detail={"job_key": _job_key}, + duration_ms=duration_ms, + ) + except Exception as e: + duration_ms = int((datetime.now() - started_at).total_seconds() * 1000) + # 失败日志写库后继续抛出,让 async_job 运行态状态也能正确标记为 failed。 + self.db.create_job_log( + _job_key, + "failed", + f"执行失败: {e}", + detail={"job_key": _job_key, "error": str(e)}, + duration_ms=duration_ms, + ) + raise + job_id = async_job.register_callable( - func=handler, + func=_wrapped_handler, trigger_type=row.get("trigger_type", definition["trigger_type"]), trigger_config=row.get("trigger_config", definition["trigger_config"]), job_name=row.get("name") or definition["name"],