fix(schedule): 任务页上次执行时间改为日志兜底
- 新增批量查询接口 get_latest_logs_map,从 t_plugin_schedule_logs 获取各任务最新执行记录 - 插件任务列表拼装时优先用运行时数据,缺失则回填数据库最新日志 - 修复重启后任务页 last_run_at/last_status 显示为空的问题
This commit is contained in:
@@ -215,3 +215,35 @@ class PluginScheduleDBOperator(BaseDBOperator):
|
||||
fetch_one=True,
|
||||
) or {}
|
||||
return row.get("triggered_at")
|
||||
|
||||
def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
|
||||
"""批量获取每个调度任务最新一条执行日志。
|
||||
|
||||
设计说明:
|
||||
1. 后台任务页展示“上次执行时间/状态”时,不能只依赖内存态;
|
||||
2. 进程重启后,async_job 的运行时计数会重置,但数据库日志仍完整;
|
||||
3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。
|
||||
"""
|
||||
clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()]
|
||||
if not clean_ids:
|
||||
return {}
|
||||
|
||||
placeholders = ",".join(["%s"] * len(clean_ids))
|
||||
sql = f"""
|
||||
SELECT l.*
|
||||
FROM t_plugin_schedule_logs l
|
||||
INNER JOIN (
|
||||
SELECT schedule_id, MAX(id) AS max_id
|
||||
FROM t_plugin_schedule_logs
|
||||
WHERE schedule_id IN ({placeholders})
|
||||
GROUP BY schedule_id
|
||||
) t ON l.id = t.max_id
|
||||
"""
|
||||
rows = self.execute_query(sql, tuple(clean_ids)) or []
|
||||
result: Dict[int, Dict[str, Any]] = {}
|
||||
for row in rows:
|
||||
self._parse_json_field(row, "detail_json")
|
||||
schedule_id = int(row.get("schedule_id") or 0)
|
||||
if schedule_id > 0:
|
||||
result[schedule_id] = row
|
||||
return result
|
||||
|
||||
@@ -294,19 +294,27 @@ class PluginScheduleManager:
|
||||
db_rows = self.db.list_schedules()
|
||||
runtime_rows = async_job.get_jobs_snapshot()
|
||||
runtime_by_key = {row.get("job_key"): row for row in runtime_rows if row.get("job_key")}
|
||||
# 日志兜底:进程重启后内存态 last_run_at 会丢失,任务页需要从数据库最新日志恢复显示。
|
||||
schedule_ids = [int(row.get("id")) for row in db_rows if row.get("id") is not None]
|
||||
latest_log_by_schedule = self.db.get_latest_logs_map(schedule_ids)
|
||||
|
||||
data = []
|
||||
for row in db_rows:
|
||||
key = f"plugin_schedule:{row['id']}"
|
||||
schedule_id = int(row["id"])
|
||||
key = f"plugin_schedule:{schedule_id}"
|
||||
runtime = runtime_by_key.get(key, {})
|
||||
latest_log = latest_log_by_schedule.get(schedule_id) or {}
|
||||
merged = dict(row)
|
||||
merged["runtime_job_id"] = runtime.get("id")
|
||||
merged["running"] = runtime.get("running", False)
|
||||
merged["trigger_text"] = runtime.get("trigger_text", "")
|
||||
merged["next_run_at"] = runtime.get("next_run_at")
|
||||
merged["last_run_at"] = runtime.get("last_run_at")
|
||||
merged["last_status"] = runtime.get("last_status")
|
||||
merged["last_error"] = runtime.get("last_error")
|
||||
# last_run_at 等字段优先取运行时;若缺失则用最新日志兜底,避免页面显示空白。
|
||||
merged["last_run_at"] = runtime.get("last_run_at") or latest_log.get("triggered_at")
|
||||
merged["last_status"] = runtime.get("last_status") or latest_log.get("status")
|
||||
merged["last_error"] = runtime.get("last_error") or ""
|
||||
if not merged["last_error"] and str(merged["last_status"]) == "failed":
|
||||
merged["last_error"] = str(latest_log.get("summary") or "")
|
||||
merged["last_duration_ms"] = runtime.get("last_duration_ms")
|
||||
merged["run_count"] = runtime.get("run_count", 0)
|
||||
merged["success_count"] = runtime.get("success_count", 0)
|
||||
|
||||
Reference in New Issue
Block a user