@@ -165,23 +143,6 @@ new Vue({
if (status === 'running') return 'warning';
return 'info';
},
- healthTag(status) {
- if (status === 'healthy') return 'success';
- if (status === 'running') return 'warning';
- if (status === 'failed') return 'danger';
- if (status === 'disabled') return 'info';
- return '';
- },
- healthLabel(status) {
- const mapping = {
- healthy: '健康',
- running: '执行中',
- failed: '异常',
- disabled: '停用',
- idle: '待运行'
- };
- return mapping[status] || '待运行';
- },
async loadJobs() {
this.loading = true;
try {
@@ -308,10 +269,5 @@ new Vue({
.page-hero-copy h1{font-size:30px;line-height:1.1;margin-bottom:10px;color:#0f172a}
.page-hero-copy p{color:#64748b;font-size:14px}
.action-row{display:flex;align-items:center;gap:8px;flex-wrap:wrap}
-.cell-ellipsis{overflow:hidden;text-overflow:ellipsis;white-space:nowrap;color:#475569}
-.history-metrics{display:flex;align-items:center;justify-content:center;gap:8px}
-.metric-success{color:#16a34a;font-weight:600}
-.metric-fail{color:#dc2626;font-weight:600}
-.history-total{margin-top:4px;color:#64748b;font-size:12px}
{% endblock %}
diff --git a/db/plugin_schedule_db.py b/db/plugin_schedule_db.py
index ea93354..cc0a3da 100644
--- a/db/plugin_schedule_db.py
+++ b/db/plugin_schedule_db.py
@@ -216,22 +216,6 @@ class PluginScheduleDBOperator(BaseDBOperator):
) or {}
return row.get("triggered_at")
- @staticmethod
- def _clean_schedule_ids(schedule_ids: List[int]) -> List[int]:
- """清洗批量查询用的调度 ID 列表。"""
- clean_ids: List[int] = []
- seen = set()
- for item in schedule_ids or []:
- text = str(item or "").strip()
- if not text.isdigit():
- continue
- schedule_id = int(text)
- if schedule_id in seen:
- continue
- clean_ids.append(schedule_id)
- seen.add(schedule_id)
- return clean_ids
-
def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""批量获取每个调度任务最新一条执行日志。
@@ -240,7 +224,7 @@ class PluginScheduleDBOperator(BaseDBOperator):
2. 进程重启后,async_job 的运行时计数会重置,但数据库日志仍完整;
3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。
"""
- clean_ids = self._clean_schedule_ids(schedule_ids)
+ clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()]
if not clean_ids:
return {}
@@ -263,83 +247,3 @@ class PluginScheduleDBOperator(BaseDBOperator):
if schedule_id > 0:
result[schedule_id] = row
return result
-
- def get_schedule_history_summary_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
- """批量汇总调度任务的历史执行摘要。"""
- clean_ids = self._clean_schedule_ids(schedule_ids)
- if not clean_ids:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_ids))
- summary_sql = f"""
- SELECT
- schedule_id,
- MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
- MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
- SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
- COUNT(*) AS total_count
- FROM t_plugin_schedule_logs
- WHERE schedule_id IN ({placeholders})
- GROUP BY schedule_id
- """
- latest_failed_sql = f"""
- SELECT l.*
- FROM t_plugin_schedule_logs l
- INNER JOIN (
- SELECT schedule_id, MAX(id) AS max_id
- FROM t_plugin_schedule_logs
- WHERE status = 'failed' AND schedule_id IN ({placeholders})
- GROUP BY schedule_id
- ) t ON l.id = t.max_id
- """
-
- summary_rows = self.execute_query(summary_sql, tuple(clean_ids)) or []
- latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_ids)) or []
-
- result: Dict[int, Dict[str, Any]] = {}
- for row in summary_rows:
- schedule_id = int(row.get("schedule_id") or 0)
- if schedule_id <= 0:
- continue
- result[schedule_id] = {
- "latest_success_at": row.get("latest_success_at"),
- "latest_failed_at": row.get("latest_failed_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": int(row.get("success_count") or 0),
- "history_fail_count": int(row.get("fail_count") or 0),
- "history_total_count": int(row.get("total_count") or 0),
- }
-
- for row in latest_failed_rows:
- schedule_id = int(row.get("schedule_id") or 0)
- if schedule_id <= 0:
- continue
-
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- detail = json.loads(detail)
- except json.JSONDecodeError:
- detail = {}
- elif detail is None:
- detail = {}
-
- history = result.setdefault(
- schedule_id,
- {
- "latest_success_at": None,
- "latest_failed_at": row.get("triggered_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": 0,
- "history_fail_count": 0,
- "history_total_count": 0,
- },
- )
- history["latest_failed_at"] = row.get("triggered_at")
- history["latest_failure_summary"] = str(row.get("summary") or "").strip()
- history["latest_failure_detail"] = detail or {}
-
- return result
diff --git a/db/system_job_db.py b/db/system_job_db.py
index f7cb17d..9a10d0f 100644
--- a/db/system_job_db.py
+++ b/db/system_job_db.py
@@ -171,145 +171,6 @@ class SystemJobDBOperator(BaseDBOperator):
row["detail_json"] = {}
return rows
- @staticmethod
- def _clean_job_keys(job_keys: List[str]) -> List[str]:
- """清洗批量查询用的任务 key 列表。
-
- 设计说明:
- 1. 后台列表页会一次性请求多个任务的历史摘要,必须先去掉空值和重复值;
- 2. 统一在 DB Operator 层做清洗,避免上层每个调用方都重复写一遍;
- 3. 保持输入顺序,便于后续排查时能和原始列表一一对应。
- """
- clean_keys: List[str] = []
- seen = set()
- for item in job_keys or []:
- key = str(item or "").strip()
- if not key or key in seen:
- continue
- clean_keys.append(key)
- seen.add(key)
- return clean_keys
-
- def get_latest_logs_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
- """批量读取每个任务最新一条执行日志。"""
- clean_keys = self._clean_job_keys(job_keys)
- if not clean_keys:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_keys))
- sql = f"""
- SELECT l.*
- FROM t_system_job_logs l
- INNER JOIN (
- SELECT job_key, MAX(id) AS max_id
- FROM t_system_job_logs
- WHERE job_key IN ({placeholders})
- GROUP BY job_key
- ) t ON l.id = t.max_id
- """
- rows = self.execute_query(sql, tuple(clean_keys)) or []
- result: Dict[str, Dict[str, Any]] = {}
- for row in rows:
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- row["detail_json"] = json.loads(detail)
- except json.JSONDecodeError:
- row["detail_json"] = {}
- elif detail is None:
- row["detail_json"] = {}
-
- job_key = str(row.get("job_key") or "").strip()
- if job_key:
- result[job_key] = row
- return result
-
- def get_job_history_summary_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
- """批量汇总系统任务的执行历史摘要。
-
- 返回字段覆盖后台最常用的问题定位视角:
- 1. 最近成功时间,便于判断任务是否长期没有跑通;
- 2. 最近失败时间与失败摘要,便于列表页直接看到异常原因;
- 3. 累计成功/失败/总执行次数,便于粗看任务稳定性。
- """
- clean_keys = self._clean_job_keys(job_keys)
- if not clean_keys:
- return {}
-
- placeholders = ",".join(["%s"] * len(clean_keys))
- summary_sql = f"""
- SELECT
- job_key,
- MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
- MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
- SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
- SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
- COUNT(*) AS total_count
- FROM t_system_job_logs
- WHERE job_key IN ({placeholders})
- GROUP BY job_key
- """
- latest_failed_sql = f"""
- SELECT l.*
- FROM t_system_job_logs l
- INNER JOIN (
- SELECT job_key, MAX(id) AS max_id
- FROM t_system_job_logs
- WHERE status = 'failed' AND job_key IN ({placeholders})
- GROUP BY job_key
- ) t ON l.id = t.max_id
- """
-
- summary_rows = self.execute_query(summary_sql, tuple(clean_keys)) or []
- latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_keys)) or []
-
- result: Dict[str, Dict[str, Any]] = {}
- for row in summary_rows:
- job_key = str(row.get("job_key") or "").strip()
- if not job_key:
- continue
- result[job_key] = {
- "latest_success_at": row.get("latest_success_at"),
- "latest_failed_at": row.get("latest_failed_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": int(row.get("success_count") or 0),
- "history_fail_count": int(row.get("fail_count") or 0),
- "history_total_count": int(row.get("total_count") or 0),
- }
-
- for row in latest_failed_rows:
- job_key = str(row.get("job_key") or "").strip()
- if not job_key:
- continue
-
- detail = row.get("detail_json")
- if isinstance(detail, str):
- try:
- detail = json.loads(detail)
- except json.JSONDecodeError:
- detail = {}
- elif detail is None:
- detail = {}
-
- history = result.setdefault(
- job_key,
- {
- "latest_success_at": None,
- "latest_failed_at": row.get("triggered_at"),
- "latest_failure_summary": "",
- "latest_failure_detail": {},
- "history_success_count": 0,
- "history_fail_count": 0,
- "history_total_count": 0,
- },
- )
- history["latest_failed_at"] = row.get("triggered_at")
- history["latest_failure_summary"] = str(row.get("summary") or "").strip()
- history["latest_failure_detail"] = detail or {}
-
- return result
-
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(
diff --git a/docs/工程优化与Feature清单.md b/docs/工程优化与Feature清单.md
index e22e72d..9e361c9 100644
--- a/docs/工程优化与Feature清单.md
+++ b/docs/工程优化与Feature清单.md
@@ -463,13 +463,6 @@
- 让定时任务真正可管理、可追踪
-当前进展:
-
-- 第一阶段已完成:系统任务页与插件调度页已补充历史执行摘要,可直接查看最近成功时间、最近失败原因与累计成功/失败次数
-- 第一阶段已完成:任务列表接口已合并内存运行态与数据库日志态,服务重启后后台仍可回看最近执行结果
-- 第一阶段已完成:插件调度页已补充快捷启停入口,减少仅为切换启用状态而进入编辑弹窗的操作成本
-- 后续可继续补充任务执行审计人、失败重试策略模板、筛选搜索与跨任务汇总看板
-
建议内容:
- 展示任务执行历史
diff --git a/utils/plugin_schedule_manager.py b/utils/plugin_schedule_manager.py
index 2b30afb..b9767fd 100644
--- a/utils/plugin_schedule_manager.py
+++ b/utils/plugin_schedule_manager.py
@@ -209,47 +209,6 @@ class PluginScheduleManager:
return False
return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds))
- @staticmethod
- def _build_schedule_health_status(
- *,
- enabled: bool,
- running: bool,
- last_status: str,
- latest_success_at,
- latest_failure_summary: str,
- ) -> str:
- """根据调度任务运行态和历史态生成后台健康标签。"""
- if not enabled:
- return "disabled"
- if running:
- return "running"
- # 只有“最近一次执行仍是失败”时才把健康态打成 failed,
- # 避免历史上曾失败过、但后面已经恢复成功的任务一直显示异常。
- if str(last_status or "").strip().lower() == "failed":
- return "failed"
- if latest_success_at or str(last_status or "").strip().lower() == "success":
- return "healthy"
- if str(latest_failure_summary or "").strip():
- return "degraded"
- return "idle"
-
- @staticmethod
- def _build_schedule_health_message(*, health_status: str, latest_success_at, latest_failure_summary: str) -> str:
- """生成调度任务列表里展示的简短健康说明。"""
- if health_status == "disabled":
- return "任务已停用"
- if health_status == "running":
- return "任务正在执行中"
- if health_status in ("failed", "degraded"):
- return str(latest_failure_summary or "最近存在失败记录").strip()
- if health_status == "healthy":
- if isinstance(latest_success_at, datetime):
- return f"最近成功于 {latest_success_at.strftime('%Y-%m-%d %H:%M:%S')}"
- if latest_success_at:
- return f"最近成功于 {latest_success_at}"
- return "任务近期执行正常"
- return "暂无执行记录"
-
async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]:
schedule_id = int(schedule_row["id"])
action_key = schedule_row.get("action_key")
@@ -338,7 +297,6 @@ class PluginScheduleManager:
# 日志兜底:进程重启后内存态 last_run_at 会丢失,任务页需要从数据库最新日志恢复显示。
schedule_ids = [int(row.get("id")) for row in db_rows if row.get("id") is not None]
latest_log_by_schedule = self.db.get_latest_logs_map(schedule_ids)
- history_summary_by_schedule = self.db.get_schedule_history_summary_map(schedule_ids)
data = []
for row in db_rows:
@@ -346,7 +304,6 @@ class PluginScheduleManager:
key = f"plugin_schedule:{schedule_id}"
runtime = runtime_by_key.get(key, {})
latest_log = latest_log_by_schedule.get(schedule_id) or {}
- history_summary = history_summary_by_schedule.get(schedule_id) or {}
merged = dict(row)
merged["runtime_job_id"] = runtime.get("id")
merged["running"] = runtime.get("running", False)
@@ -362,24 +319,6 @@ class PluginScheduleManager:
merged["run_count"] = runtime.get("run_count", 0)
merged["success_count"] = runtime.get("success_count", 0)
merged["fail_count"] = runtime.get("fail_count", 0)
- merged["latest_success_at"] = history_summary.get("latest_success_at")
- merged["latest_failed_at"] = history_summary.get("latest_failed_at")
- merged["latest_failure_summary"] = str(history_summary.get("latest_failure_summary") or "").strip()
- merged["history_success_count"] = int(history_summary.get("history_success_count", 0) or 0)
- merged["history_fail_count"] = int(history_summary.get("history_fail_count", 0) or 0)
- merged["history_total_count"] = int(history_summary.get("history_total_count", 0) or 0)
- merged["health_status"] = self._build_schedule_health_status(
- enabled=bool(row.get("enabled", 0)),
- running=bool(runtime.get("running", False)),
- last_status=str(merged.get("last_status") or ""),
- latest_success_at=history_summary.get("latest_success_at"),
- latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
- )
- merged["health_message"] = self._build_schedule_health_message(
- health_status=merged["health_status"],
- latest_success_at=history_summary.get("latest_success_at"),
- latest_failure_summary=str(history_summary.get("latest_failure_summary") or ""),
- )
data.append(merged)
return data