系统定时任务日志持久化:新增入库与后台查询

- 新增系统任务日志表 t_system_job_logs,持久化记录每次执行结果、摘要、耗时、详情JSON\n- SystemJobLoader 注册任务时增加执行包装器:成功/失败均写入数据库日志,失败后继续抛出保证运行态状态一致\n- 系统任务后台日志接口改为查询数据库日志(不再依赖仅内存的 async_job logs),解决重启后日志丢失问题\n- 保持前端日志字段兼容,接口返回映射为 time/level/message 结构
This commit is contained in:
liuwei
2026-04-16 16:27:53 +08:00
parent 3873596399
commit 1a1306ec71
3 changed files with 110 additions and 8 deletions

View File

@@ -94,12 +94,24 @@ def api_trigger_job(job_key: str):
@system_jobs_bp.route("/api/jobs/<job_key>/logs", methods=["GET"]) @system_jobs_bp.route("/api/jobs/<job_key>/logs", methods=["GET"])
@login_required @login_required
def api_job_logs(job_key: str): def api_job_logs(job_key: str):
job_id = async_job.get_job_id_by_key(job_key) server = current_app.dashboard_server
if not job_id:
return jsonify({"success": True, "data": []})
limit = int(request.args.get("limit", 100)) limit = int(request.args.get("limit", 100))
logs = async_job.get_job_logs(job_id, limit=limit) db_logs = server.system_job_db.get_job_logs(job_key, limit=limit)
# 为了兼容前端既有表头(time/level/message),这里做一层字段映射。
logs = []
for row in db_logs:
status = str(row.get("status") or "")
level = "error" if status == "failed" else ("success" if status == "success" else "info")
logs.append(
{
"time": row.get("triggered_at"),
"level": level,
"message": row.get("summary") or "",
"status": status,
"duration_ms": row.get("duration_ms"),
"detail_json": row.get("detail_json") or {},
}
)
return jsonify({"success": True, "data": logs}) return jsonify({"success": True, "data": logs})

View File

@@ -15,7 +15,7 @@ class SystemJobDBOperator(BaseDBOperator):
super().__init__(db_manager) super().__init__(db_manager)
def init_tables(self) -> bool: def init_tables(self) -> bool:
"""初始化系统任务配置表。""" """初始化系统任务配置表与日志表"""
try: try:
self.execute_update( self.execute_update(
""" """
@@ -31,6 +31,21 @@ class SystemJobDBOperator(BaseDBOperator):
) )
""" """
) )
# 系统任务执行日志表:用于持久化记录每次任务执行结果,避免重启后日志丢失。
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_system_job_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_key VARCHAR(64) NOT NULL,
triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(32) NOT NULL,
summary VARCHAR(255) DEFAULT '',
detail_json JSON DEFAULT NULL,
duration_ms INT DEFAULT NULL,
INDEX idx_job_time (job_key, triggered_at)
)
"""
)
return True return True
except Exception as e: except Exception as e:
logger.error(f"初始化 t_system_jobs 失败: {e}") logger.error(f"初始化 t_system_jobs 失败: {e}")
@@ -110,3 +125,47 @@ class SystemJobDBOperator(BaseDBOperator):
def delete_job(self, job_key: str) -> bool: def delete_job(self, job_key: str) -> bool:
return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,)) return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,))
def create_job_log(
self,
job_key: str,
status: str,
summary: str,
detail: Optional[Dict[str, Any]] = None,
duration_ms: Optional[int] = None,
) -> bool:
"""写入系统任务执行日志。"""
sql = """
INSERT INTO t_system_job_logs (job_key, status, summary, detail_json, duration_ms)
VALUES (%s, %s, %s, %s, %s)
"""
params = (
str(job_key),
str(status),
str(summary or ""),
json.dumps(detail or {}, ensure_ascii=False),
duration_ms if duration_ms is not None else None,
)
return self.execute_update(sql, params)
def get_job_logs(self, job_key: str, limit: int = 100) -> List[Dict[str, Any]]:
"""获取系统任务持久化日志。"""
rows = self.execute_query(
"""
SELECT * FROM t_system_job_logs
WHERE job_key = %s
ORDER BY triggered_at DESC
LIMIT %s
""",
(str(job_key), int(limit)),
) or []
for row in rows:
detail = row.get("detail_json")
if isinstance(detail, str):
try:
row["detail_json"] = json.loads(detail)
except json.JSONDecodeError:
row["detail_json"] = {}
elif detail is None:
row["detail_json"] = {}
return rows

View File

@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
import inspect
from datetime import datetime
from typing import Any, Awaitable, Callable, Dict, List from typing import Any, Awaitable, Callable, Dict, List
from loguru import logger from loguru import logger
@@ -99,9 +101,38 @@ class SystemJobLoader:
logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册") logger.warning(f"系统任务 {job_key} 在代码中无处理器,已跳过注册")
continue continue
handler = definition["handler"] raw_handler = definition["handler"]
async def _wrapped_handler(_handler=raw_handler, _job_key=job_key):
"""系统任务执行包装器:执行业务并持久化日志。"""
started_at = datetime.now()
try:
result = _handler()
# 兼容同步/异步 handler 两种写法。
if inspect.isawaitable(result):
await result
duration_ms = int((datetime.now() - started_at).total_seconds() * 1000)
self.db.create_job_log(
_job_key,
"success",
"执行成功",
detail={"job_key": _job_key},
duration_ms=duration_ms,
)
except Exception as e:
duration_ms = int((datetime.now() - started_at).total_seconds() * 1000)
# 失败日志写库后继续抛出,让 async_job 运行态状态也能正确标记为 failed。
self.db.create_job_log(
_job_key,
"failed",
f"执行失败: {e}",
detail={"job_key": _job_key, "error": str(e)},
duration_ms=duration_ms,
)
raise
job_id = async_job.register_callable( job_id = async_job.register_callable(
func=handler, func=_wrapped_handler,
trigger_type=row.get("trigger_type", definition["trigger_type"]), trigger_type=row.get("trigger_type", definition["trigger_type"]),
trigger_config=row.get("trigger_config", definition["trigger_config"]), trigger_config=row.get("trigger_config", definition["trigger_config"]),
job_name=row.get("name") or definition["name"], job_name=row.get("name") or definition["name"],