diff --git a/admin/dashboard/blueprints/system_jobs.py b/admin/dashboard/blueprints/system_jobs.py index ee2a884..d0dc029 100644 --- a/admin/dashboard/blueprints/system_jobs.py +++ b/admin/dashboard/blueprints/system_jobs.py @@ -94,12 +94,24 @@ def api_trigger_job(job_key: str): @system_jobs_bp.route("/api/jobs//logs", methods=["GET"]) @login_required def api_job_logs(job_key: str): - job_id = async_job.get_job_id_by_key(job_key) - if not job_id: - return jsonify({"success": True, "data": []}) - + server = current_app.dashboard_server limit = int(request.args.get("limit", 100)) - logs = async_job.get_job_logs(job_id, limit=limit) + db_logs = server.system_job_db.get_job_logs(job_key, limit=limit) + # 为了兼容前端既有表头(time/level/message),这里做一层字段映射。 + logs = [] + for row in db_logs: + status = str(row.get("status") or "") + level = "error" if status == "failed" else ("success" if status == "success" else "info") + logs.append( + { + "time": row.get("triggered_at"), + "level": level, + "message": row.get("summary") or "", + "status": status, + "duration_ms": row.get("duration_ms"), + "detail_json": row.get("detail_json") or {}, + } + ) return jsonify({"success": True, "data": logs}) diff --git a/db/system_job_db.py b/db/system_job_db.py index f855760..9ec0e81 100644 --- a/db/system_job_db.py +++ b/db/system_job_db.py @@ -15,7 +15,7 @@ class SystemJobDBOperator(BaseDBOperator): super().__init__(db_manager) def init_tables(self) -> bool: - """初始化系统任务配置表。""" + """初始化系统任务配置表与日志表。""" try: self.execute_update( """ @@ -31,6 +31,21 @@ class SystemJobDBOperator(BaseDBOperator): ) """ ) + # 系统任务执行日志表:用于持久化记录每次任务执行结果,避免重启后日志丢失。 + self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_system_job_logs ( + id BIGINT PRIMARY KEY AUTO_INCREMENT, + job_key VARCHAR(64) NOT NULL, + triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP, + status VARCHAR(32) NOT NULL, + summary VARCHAR(255) DEFAULT '', + detail_json JSON DEFAULT NULL, + duration_ms INT DEFAULT NULL, + INDEX idx_job_time (job_key, triggered_at) + ) + """ + ) return True except Exception as e: logger.error(f"初始化 t_system_jobs 失败: {e}") @@ -110,3 +125,47 @@ class SystemJobDBOperator(BaseDBOperator): def delete_job(self, job_key: str) -> bool: return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,)) + + def create_job_log( + self, + job_key: str, + status: str, + summary: str, + detail: Optional[Dict[str, Any]] = None, + duration_ms: Optional[int] = None, + ) -> bool: + """写入系统任务执行日志。""" + sql = """ + INSERT INTO t_system_job_logs (job_key, status, summary, detail_json, duration_ms) + VALUES (%s, %s, %s, %s, %s) + """ + params = ( + str(job_key), + str(status), + str(summary or ""), + json.dumps(detail or {}, ensure_ascii=False), + duration_ms if duration_ms is not None else None, + ) + return self.execute_update(sql, params) + + def get_job_logs(self, job_key: str, limit: int = 100) -> List[Dict[str, Any]]: + """获取系统任务持久化日志。""" + rows = self.execute_query( + """ + SELECT * FROM t_system_job_logs + WHERE job_key = %s + ORDER BY triggered_at DESC + LIMIT %s + """, + (str(job_key), int(limit)), + ) or [] + for row in rows: + detail = row.get("detail_json") + if isinstance(detail, str): + try: + row["detail_json"] = json.loads(detail) + except json.JSONDecodeError: + row["detail_json"] = {} + elif detail is None: + row["detail_json"] = {} + return rows diff --git a/utils/system_jobs.py b/utils/system_jobs.py index 03c9dac..18c4587 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import annotations +import inspect +from datetime import datetime from typing import Any, Awaitable, Callable, Dict, List from loguru import logger @@ -99,9 +101,38 @@ class SystemJobLoader: logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册") continue - handler = definition["handler"] + raw_handler = definition["handler"] + + async def _wrapped_handler(_handler=raw_handler, _job_key=job_key): + """系统任务执行包装器:执行业务并持久化日志。""" + started_at = datetime.now() + try: + result = _handler() + # 兼容同步/异步 handler 两种写法。 + if inspect.isawaitable(result): + await result + duration_ms = int((datetime.now() - started_at).total_seconds() * 1000) + self.db.create_job_log( + _job_key, + "success", + "执行成功", + detail={"job_key": _job_key}, + duration_ms=duration_ms, + ) + except Exception as e: + duration_ms = int((datetime.now() - started_at).total_seconds() * 1000) + # 失败日志写库后继续抛出,让 async_job 运行态状态也能正确标记为 failed。 + self.db.create_job_log( + _job_key, + "failed", + f"执行失败: {e}", + detail={"job_key": _job_key, "error": str(e)}, + duration_ms=duration_ms, + ) + raise + job_id = async_job.register_callable( - func=handler, + func=_wrapped_handler, trigger_type=row.get("trigger_type", definition["trigger_type"]), trigger_config=row.get("trigger_config", definition["trigger_config"]), job_name=row.get("name") or definition["name"],