From fc8af8ff7538a9b52931669111cda74eb7f7d3eb Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 22 Apr 2026 09:49:15 +0800 Subject: [PATCH] =?UTF-8?q?fix(schedule):=20=E4=BB=BB=E5=8A=A1=E9=A1=B5?= =?UTF-8?q?=E4=B8=8A=E6=AC=A1=E6=89=A7=E8=A1=8C=E6=97=B6=E9=97=B4=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=97=A5=E5=BF=97=E5=85=9C=E5=BA=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增批量查询接口 get_latest_logs_map,从 t_plugin_schedule_logs 获取各任务最新执行记录 - 插件任务列表拼装时优先用运行时数据,缺失则回填数据库最新日志 - 修复重启后任务页 last_run_at/last_status 显示为空的问题 --- db/plugin_schedule_db.py | 32 ++++++++++++++++++++++++++++++++ utils/plugin_schedule_manager.py | 16 ++++++++++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/db/plugin_schedule_db.py b/db/plugin_schedule_db.py index 1702552..cc0a3da 100644 --- a/db/plugin_schedule_db.py +++ b/db/plugin_schedule_db.py @@ -215,3 +215,35 @@ class PluginScheduleDBOperator(BaseDBOperator): fetch_one=True, ) or {} return row.get("triggered_at") + + def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]: + """批量获取每个调度任务最新一条执行日志。 + + 设计说明: + 1. 后台任务页展示“上次执行时间/状态”时,不能只依赖内存态; + 2. 进程重启后,async_job 的运行时计数会重置,但数据库日志仍完整; + 3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。 + """ + clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()] + if not clean_ids: + return {} + + placeholders = ",".join(["%s"] * len(clean_ids)) + sql = f""" + SELECT l.* + FROM t_plugin_schedule_logs l + INNER JOIN ( + SELECT schedule_id, MAX(id) AS max_id + FROM t_plugin_schedule_logs + WHERE schedule_id IN ({placeholders}) + GROUP BY schedule_id + ) t ON l.id = t.max_id + """ + rows = self.execute_query(sql, tuple(clean_ids)) or [] + result: Dict[int, Dict[str, Any]] = {} + for row in rows: + self._parse_json_field(row, "detail_json") + schedule_id = int(row.get("schedule_id") or 0) + if schedule_id > 0: + result[schedule_id] = row + return result diff --git a/utils/plugin_schedule_manager.py b/utils/plugin_schedule_manager.py index cb4577d..b9767fd 100644 --- a/utils/plugin_schedule_manager.py +++ b/utils/plugin_schedule_manager.py @@ -294,19 +294,27 @@ class PluginScheduleManager: db_rows = self.db.list_schedules() runtime_rows = async_job.get_jobs_snapshot() runtime_by_key = {row.get("job_key"): row for row in runtime_rows if row.get("job_key")} + # 日志兜底:进程重启后内存态 last_run_at 会丢失,任务页需要从数据库最新日志恢复显示。 + schedule_ids = [int(row.get("id")) for row in db_rows if row.get("id") is not None] + latest_log_by_schedule = self.db.get_latest_logs_map(schedule_ids) data = [] for row in db_rows: - key = f"plugin_schedule:{row['id']}" + schedule_id = int(row["id"]) + key = f"plugin_schedule:{schedule_id}" runtime = runtime_by_key.get(key, {}) + latest_log = latest_log_by_schedule.get(schedule_id) or {} merged = dict(row) merged["runtime_job_id"] = runtime.get("id") merged["running"] = runtime.get("running", False) merged["trigger_text"] = runtime.get("trigger_text", "") merged["next_run_at"] = runtime.get("next_run_at") - merged["last_run_at"] = runtime.get("last_run_at") - merged["last_status"] = runtime.get("last_status") - merged["last_error"] = runtime.get("last_error") + # last_run_at 等字段优先取运行时;若缺失则用最新日志兜底,避免页面显示空白。 + merged["last_run_at"] = runtime.get("last_run_at") or latest_log.get("triggered_at") + merged["last_status"] = runtime.get("last_status") or latest_log.get("status") + merged["last_error"] = runtime.get("last_error") or "" + if not merged["last_error"] and str(merged["last_status"]) == "failed": + merged["last_error"] = str(latest_log.get("summary") or "") merged["last_duration_ms"] = runtime.get("last_duration_ms") merged["run_count"] = runtime.get("run_count", 0) merged["success_count"] = runtime.get("success_count", 0)