@@ -143,6 +165,23 @@ new Vue({
if (status === 'running') return 'warning';
return 'info';
},
+ healthTag(status) {
+ if (status === 'healthy') return 'success';
+ if (status === 'running') return 'warning';
+ if (status === 'failed') return 'danger';
+ if (status === 'disabled') return 'info';
+ return '';
+ },
+ healthLabel(status) {
+ const mapping = {
+ healthy: '健康',
+ running: '执行中',
+ failed: '异常',
+ disabled: '停用',
+ idle: '待运行'
+ };
+ return mapping[status] || '待运行';
+ },
async loadJobs() {
this.loading = true;
try {
@@ -269,5 +308,10 @@ new Vue({
.page-hero-copy h1{font-size:30px;line-height:1.1;margin-bottom:10px;color:#0f172a}
.page-hero-copy p{color:#64748b;font-size:14px}
.action-row{display:flex;align-items:center;gap:8px;flex-wrap:wrap}
+.cell-ellipsis{overflow:hidden;text-overflow:ellipsis;white-space:nowrap;color:#475569}
+.history-metrics{display:flex;align-items:center;justify-content:center;gap:8px}
+.metric-success{color:#16a34a;font-weight:600}
+.metric-fail{color:#dc2626;font-weight:600}
+.history-total{margin-top:4px;color:#64748b;font-size:12px}
{% endblock %}
diff --git a/db/plugin_schedule_db.py b/db/plugin_schedule_db.py
index cc0a3da..ea93354 100644
--- a/db/plugin_schedule_db.py
+++ b/db/plugin_schedule_db.py
@@ -216,6 +216,22 @@ class PluginScheduleDBOperator(BaseDBOperator):
) or {}
return row.get("triggered_at")
+ @staticmethod
+ def _clean_schedule_ids(schedule_ids: List[int]) -> List[int]:
+ """清洗批量查询用的调度 ID 列表。"""
+ clean_ids: List[int] = []
+ seen = set()
+ for item in schedule_ids or []:
+ text = str(item or "").strip()
+ if not text.isdigit():
+ continue
+ schedule_id = int(text)
+ if schedule_id in seen:
+ continue
+ clean_ids.append(schedule_id)
+ seen.add(schedule_id)
+ return clean_ids
+
def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""批量获取每个调度任务最新一条执行日志。
@@ -224,7 +240,7 @@ class PluginScheduleDBOperator(BaseDBOperator):
2. 进程重启后,async_job 的运行时计数会重置,但数据库日志仍完整;
3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。
"""
- clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()]
+ clean_ids = self._clean_schedule_ids(schedule_ids)
if not clean_ids:
return {}
@@ -247,3 +263,83 @@ class PluginScheduleDBOperator(BaseDBOperator):
if schedule_id > 0:
result[schedule_id] = row
return result
+
+ def get_schedule_history_summary_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
+ """批量汇总调度任务的历史执行摘要。"""
+ clean_ids = self._clean_schedule_ids(schedule_ids)
+ if not clean_ids:
+ return {}
+
+ placeholders = ",".join(["%s"] * len(clean_ids))
+ summary_sql = f"""
+ SELECT
+ schedule_id,
+ MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
+ MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
+ SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
+ COUNT(*) AS total_count
+ FROM t_plugin_schedule_logs
+ WHERE schedule_id IN ({placeholders})
+ GROUP BY schedule_id
+ """
+ latest_failed_sql = f"""
+ SELECT l.*
+ FROM t_plugin_schedule_logs l
+ INNER JOIN (
+ SELECT schedule_id, MAX(id) AS max_id
+ FROM t_plugin_schedule_logs
+ WHERE status = 'failed' AND schedule_id IN ({placeholders})
+ GROUP BY schedule_id
+ ) t ON l.id = t.max_id
+ """
+
+ summary_rows = self.execute_query(summary_sql, tuple(clean_ids)) or []
+ latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_ids)) or []
+
+ result: Dict[int, Dict[str, Any]] = {}
+ for row in summary_rows:
+ schedule_id = int(row.get("schedule_id") or 0)
+ if schedule_id <= 0:
+ continue
+ result[schedule_id] = {
+ "latest_success_at": row.get("latest_success_at"),
+ "latest_failed_at": row.get("latest_failed_at"),
+ "latest_failure_summary": "",
+ "latest_failure_detail": {},
+ "history_success_count": int(row.get("success_count") or 0),
+ "history_fail_count": int(row.get("fail_count") or 0),
+ "history_total_count": int(row.get("total_count") or 0),
+ }
+
+ for row in latest_failed_rows:
+ schedule_id = int(row.get("schedule_id") or 0)
+ if schedule_id <= 0:
+ continue
+
+ detail = row.get("detail_json")
+ if isinstance(detail, str):
+ try:
+ detail = json.loads(detail)
+ except json.JSONDecodeError:
+ detail = {}
+ elif detail is None:
+ detail = {}
+
+ history = result.setdefault(
+ schedule_id,
+ {
+ "latest_success_at": None,
+ "latest_failed_at": row.get("triggered_at"),
+ "latest_failure_summary": "",
+ "latest_failure_detail": {},
+ "history_success_count": 0,
+ "history_fail_count": 0,
+ "history_total_count": 0,
+ },
+ )
+ history["latest_failed_at"] = row.get("triggered_at")
+ history["latest_failure_summary"] = str(row.get("summary") or "").strip()
+ history["latest_failure_detail"] = detail or {}
+
+ return result
diff --git a/db/system_job_db.py b/db/system_job_db.py
index 9a10d0f..f7cb17d 100644
--- a/db/system_job_db.py
+++ b/db/system_job_db.py
@@ -171,6 +171,145 @@ class SystemJobDBOperator(BaseDBOperator):
row["detail_json"] = {}
return rows
+ @staticmethod
+ def _clean_job_keys(job_keys: List[str]) -> List[str]:
+ """清洗批量查询用的任务 key 列表。
+
+ 设计说明:
+ 1. 后台列表页会一次性请求多个任务的历史摘要,必须先去掉空值和重复值;
+ 2. 统一在 DB Operator 层做清洗,避免上层每个调用方都重复写一遍;
+ 3. 保持输入顺序,便于后续排查时能和原始列表一一对应。
+ """
+ clean_keys: List[str] = []
+ seen = set()
+ for item in job_keys or []:
+ key = str(item or "").strip()
+ if not key or key in seen:
+ continue
+ clean_keys.append(key)
+ seen.add(key)
+ return clean_keys
+
+ def get_latest_logs_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
+ """批量读取每个任务最新一条执行日志。"""
+ clean_keys = self._clean_job_keys(job_keys)
+ if not clean_keys:
+ return {}
+
+ placeholders = ",".join(["%s"] * len(clean_keys))
+ sql = f"""
+ SELECT l.*
+ FROM t_system_job_logs l
+ INNER JOIN (
+ SELECT job_key, MAX(id) AS max_id
+ FROM t_system_job_logs
+ WHERE job_key IN ({placeholders})
+ GROUP BY job_key
+ ) t ON l.id = t.max_id
+ """
+ rows = self.execute_query(sql, tuple(clean_keys)) or []
+ result: Dict[str, Dict[str, Any]] = {}
+ for row in rows:
+ detail = row.get("detail_json")
+ if isinstance(detail, str):
+ try:
+ row["detail_json"] = json.loads(detail)
+ except json.JSONDecodeError:
+ row["detail_json"] = {}
+ elif detail is None:
+ row["detail_json"] = {}
+
+ job_key = str(row.get("job_key") or "").strip()
+ if job_key:
+ result[job_key] = row
+ return result
+
+ def get_job_history_summary_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
+ """批量汇总系统任务的执行历史摘要。
+
+ 返回字段覆盖后台最常用的问题定位视角:
+ 1. 最近成功时间,便于判断任务是否长期没有跑通;
+ 2. 最近失败时间与失败摘要,便于列表页直接看到异常原因;
+ 3. 累计成功/失败/总执行次数,便于粗看任务稳定性。
+ """
+ clean_keys = self._clean_job_keys(job_keys)
+ if not clean_keys:
+ return {}
+
+ placeholders = ",".join(["%s"] * len(clean_keys))
+ summary_sql = f"""
+ SELECT
+ job_key,
+ MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
+ MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
+ SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
+ COUNT(*) AS total_count
+ FROM t_system_job_logs
+ WHERE job_key IN ({placeholders})
+ GROUP BY job_key
+ """
+ latest_failed_sql = f"""
+ SELECT l.*
+ FROM t_system_job_logs l
+ INNER JOIN (
+ SELECT job_key, MAX(id) AS max_id
+ FROM t_system_job_logs
+ WHERE status = 'failed' AND job_key IN ({placeholders})
+ GROUP BY job_key
+ ) t ON l.id = t.max_id
+ """
+
+ summary_rows = self.execute_query(summary_sql, tuple(clean_keys)) or []
+ latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_keys)) or []
+
+ result: Dict[str, Dict[str, Any]] = {}
+ for row in summary_rows:
+ job_key = str(row.get("job_key") or "").strip()
+ if not job_key:
+ continue
+ result[job_key] = {
+ "latest_success_at": row.get("latest_success_at"),
+ "latest_failed_at": row.get("latest_failed_at"),
+ "latest_failure_summary": "",
+ "latest_failure_detail": {},
+ "history_success_count": int(row.get("success_count") or 0),
+ "history_fail_count": int(row.get("fail_count") or 0),
+ "history_total_count": int(row.get("total_count") or 0),
+ }
+
+ for row in latest_failed_rows:
+ job_key = str(row.get("job_key") or "").strip()
+ if not job_key:
+ continue
+
+ detail = row.get("detail_json")
+ if isinstance(detail, str):
+ try:
+ detail = json.loads(detail)
+ except json.JSONDecodeError:
+ detail = {}
+ elif detail is None:
+ detail = {}
+
+ history = result.setdefault(
+ job_key,
+ {
+ "latest_success_at": None,
+ "latest_failed_at": row.get("triggered_at"),
+ "latest_failure_summary": "",
+ "latest_failure_detail": {},
+ "history_success_count": 0,
+ "history_fail_count": 0,
+ "history_total_count": 0,
+ },
+ )
+ history["latest_failed_at"] = row.get("triggered_at")
+ history["latest_failure_summary"] = str(row.get("summary") or "").strip()
+ history["latest_failure_detail"] = detail or {}
+
+ return result
+
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(
diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md
index 9e361c9..e22e72d 100644
--- a/docs/工程优化与Feature清单.md
+++ b/docs/工程优化与Feature清单.md
@@ -463,6 +463,13 @@
- 让定时任务真正可管理、可追踪
+当前进展:
+
+- 第一阶段已完成:系统任务页与插件调度页已补充历史执行摘要,可直接查看最近成功时间、最近失败原因与累计成功/失败次数
+- 第一阶段已完成:任务列表接口已合并内存运行态与数据库日志态,服务重启后后台仍可回看最近执行结果
+- 第一阶段已完成:插件调度页已补充快捷启停入口,减少仅为切换启用状态而进入编辑弹窗的操作成本
+- 后续可继续补充任务执行审计人、失败重试策略模板、筛选搜索与跨任务汇总看板
+
建议内容:
- 展示任务执行历史
diff --git a/utils/plugin_schedule_manager.py b/utils/plugin_schedule_manager.py
index b9767fd..2b30afb 100644
--- a/utils/plugin_schedule_manager.py
+++ b/utils/plugin_schedule_manager.py
@@ -209,6 +209,47 @@ class PluginScheduleManager:
return False
return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds))
+ @staticmethod
+ def _build_schedule_health_status(
+ *,
+ enabled: bool,
+ running: bool,
+ last_status: str,
+ latest_success_at,
+ latest_failure_summary: str,
+ ) -> str:
+ """根据调度任务运行态和历史态生成后台健康标签。"""
+ if not enabled:
+ return "disabled"
+ if running:
+ return "running"
+ # 只有“最近一次执行仍是失败”时才把健康态打成 failed,
+ # 避免历史上曾失败过、但后面已经恢复成功的任务一直显示异常。
+ if str(last_status or "").strip().lower() == "failed":
+ return "failed"
+ if latest_success_at or str(last_status or "").strip().lower() == "success":
+ return "healthy"
+ if str(latest_failure_summary or "").strip():
+ return "degraded"
+ return "idle"
+
+ @staticmethod
+ def _build_schedule_health_message(*, health_status: str, latest_success_at, latest_failure_summary: str) -> str:
+ """生成调度任务列表里展示的简短健康说明。"""
+ if health_status == "disabled":
+ return "任务已停用"
+ if health_status == "running":
+ return "任务正在执行中"
+ if health_status in ("failed", "degraded"):
+ return str(latest_failure_summary or "最近存在失败记录").strip()
+ if health_status == "healthy":
+ if isinstance(latest_success_at, datetime):
+ return f"最近成功于 {latest_success_at.strftime('%Y-%m-%d %H:%M:%S')}"
+ if latest_success_at:
+ return f"最近成功于 {latest_success_at}"
+ return "任务近期执行正常"
+ return "暂无执行记录"
+
async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]:
schedule_id = int(schedule_row["id"])
action_key = schedule_row.get("action_key")
@@ -297,6 +338,7 @@ class PluginScheduleManager:
# 日志兜底:进程重启后内存态 last_run_at 会丢失,任务页需要从数据库最新日志恢复显示。
schedule_ids = [int(row.get("id")) for row in db_rows if row.get("id") is not None]
latest_log_by_schedule = self.db.get_latest_logs_map(schedule_ids)
+ history_summary_by_schedule = self.db.get_schedule_history_summary_map(schedule_ids)
data = []
for row in db_rows:
@@ -304,6 +346,7 @@ class PluginScheduleManager:
key = f"plugin_schedule:{schedule_id}"
runtime = runtime_by_key.get(key, {})
latest_log = latest_log_by_schedule.get(schedule_id) or {}
+ history_summary = history_summary_by_schedule.get(schedule_id) or {}
merged = dict(row)
merged["runtime_job_id"] = runtime.get("id")
merged["running"] = runtime.get("running", False)
@@ -319,6 +362,24 @@ class PluginScheduleManager:
merged["run_count"] = runtime.get("run_count", 0)
merged["success_count"] = runtime.get("success_count", 0)
merged["fail_count"] = runtime.get("fail_count", 0)
+ merged["latest_success_at"] = history_summary.get("latest_success_at")
+ merged["latest_failed_at"] = history_summary.get("latest_failed_at")
+ merged["latest_failure_summary"] = str(history_summary.get("latest_failure_summary") or "").strip()
+ merged["history_success_count"] = int(history_summary.get("history_success_count", 0) or 0)
+ merged["history_fail_count"] = int(history_summary.get("history_fail_count", 0) or 0)
+ merged["history_total_count"] = int(history_summary.get("history_total_count", 0) or 0)
+ merged["health_status"] = self._build_schedule_health_status(
+ enabled=bool(row.get("enabled", 0)),
+ running=bool(runtime.get("running", False)),
+ last_status=str(merged.get("last_status") or ""),
+ latest_success_at=history_summary.get("latest_success_at"),
+ latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
+ )
+ merged["health_message"] = self._build_schedule_health_message(
+ health_status=merged["health_status"],
+ latest_success_at=history_summary.get("latest_success_at"),
+ latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
+ )
data.append(merged)
return data