完善后台任务中心历史摘要视图

- 为系统任务和插件调度补充批量历史摘要查询,支持最近成功时间、最近失败原因与累计成功失败次数

- 任务列表接口合并内存运行态与数据库日志态,服务重启后后台仍可回看最近执行结果

- 系统任务页与插件调度页新增健康状态、历史执行摘要与插件调度快捷启停入口

- 更新工程优化文档,记录 7.3 第一阶段当前进展
This commit is contained in:
liuwei
2026-04-30 16:21:29 +08:00
parent 0d7fe5d6f0
commit 1db8681636
8 changed files with 496 additions and 7 deletions

View File

@@ -216,6 +216,22 @@ class PluginScheduleDBOperator(BaseDBOperator):
) or {}
return row.get("triggered_at")
@staticmethod
def _clean_schedule_ids(schedule_ids: List[int]) -> List[int]:
"""清洗批量查询用的调度 ID 列表。"""
clean_ids: List[int] = []
seen = set()
for item in schedule_ids or []:
text = str(item or "").strip()
if not text.isdigit():
continue
schedule_id = int(text)
if schedule_id in seen:
continue
clean_ids.append(schedule_id)
seen.add(schedule_id)
return clean_ids
def get_latest_logs_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""批量获取每个调度任务最新一条执行日志。
@@ -224,7 +240,7 @@ class PluginScheduleDBOperator(BaseDBOperator):
2. 进程重启后async_job 的运行时计数会重置,但数据库日志仍完整;
3. 这里提供批量查询接口,让上层可用日志数据兜底回填展示字段。
"""
clean_ids = [int(x) for x in schedule_ids if str(x).strip().isdigit()]
clean_ids = self._clean_schedule_ids(schedule_ids)
if not clean_ids:
return {}
@@ -247,3 +263,83 @@ class PluginScheduleDBOperator(BaseDBOperator):
if schedule_id > 0:
result[schedule_id] = row
return result
def get_schedule_history_summary_map(self, schedule_ids: List[int]) -> Dict[int, Dict[str, Any]]:
"""批量汇总调度任务的历史执行摘要。"""
clean_ids = self._clean_schedule_ids(schedule_ids)
if not clean_ids:
return {}
placeholders = ",".join(["%s"] * len(clean_ids))
summary_sql = f"""
SELECT
schedule_id,
MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
COUNT(*) AS total_count
FROM t_plugin_schedule_logs
WHERE schedule_id IN ({placeholders})
GROUP BY schedule_id
"""
latest_failed_sql = f"""
SELECT l.*
FROM t_plugin_schedule_logs l
INNER JOIN (
SELECT schedule_id, MAX(id) AS max_id
FROM t_plugin_schedule_logs
WHERE status = 'failed' AND schedule_id IN ({placeholders})
GROUP BY schedule_id
) t ON l.id = t.max_id
"""
summary_rows = self.execute_query(summary_sql, tuple(clean_ids)) or []
latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_ids)) or []
result: Dict[int, Dict[str, Any]] = {}
for row in summary_rows:
schedule_id = int(row.get("schedule_id") or 0)
if schedule_id <= 0:
continue
result[schedule_id] = {
"latest_success_at": row.get("latest_success_at"),
"latest_failed_at": row.get("latest_failed_at"),
"latest_failure_summary": "",
"latest_failure_detail": {},
"history_success_count": int(row.get("success_count") or 0),
"history_fail_count": int(row.get("fail_count") or 0),
"history_total_count": int(row.get("total_count") or 0),
}
for row in latest_failed_rows:
schedule_id = int(row.get("schedule_id") or 0)
if schedule_id <= 0:
continue
detail = row.get("detail_json")
if isinstance(detail, str):
try:
detail = json.loads(detail)
except json.JSONDecodeError:
detail = {}
elif detail is None:
detail = {}
history = result.setdefault(
schedule_id,
{
"latest_success_at": None,
"latest_failed_at": row.get("triggered_at"),
"latest_failure_summary": "",
"latest_failure_detail": {},
"history_success_count": 0,
"history_fail_count": 0,
"history_total_count": 0,
},
)
history["latest_failed_at"] = row.get("triggered_at")
history["latest_failure_summary"] = str(row.get("summary") or "").strip()
history["latest_failure_detail"] = detail or {}
return result

View File

@@ -171,6 +171,145 @@ class SystemJobDBOperator(BaseDBOperator):
row["detail_json"] = {}
return rows
@staticmethod
def _clean_job_keys(job_keys: List[str]) -> List[str]:
"""清洗批量查询用的任务 key 列表。
设计说明:
1. 后台列表页会一次性请求多个任务的历史摘要,必须先去掉空值和重复值;
2. 统一在 DB Operator 层做清洗,避免上层每个调用方都重复写一遍;
3. 保持输入顺序,便于后续排查时能和原始列表一一对应。
"""
clean_keys: List[str] = []
seen = set()
for item in job_keys or []:
key = str(item or "").strip()
if not key or key in seen:
continue
clean_keys.append(key)
seen.add(key)
return clean_keys
def get_latest_logs_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
"""批量读取每个任务最新一条执行日志。"""
clean_keys = self._clean_job_keys(job_keys)
if not clean_keys:
return {}
placeholders = ",".join(["%s"] * len(clean_keys))
sql = f"""
SELECT l.*
FROM t_system_job_logs l
INNER JOIN (
SELECT job_key, MAX(id) AS max_id
FROM t_system_job_logs
WHERE job_key IN ({placeholders})
GROUP BY job_key
) t ON l.id = t.max_id
"""
rows = self.execute_query(sql, tuple(clean_keys)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in rows:
detail = row.get("detail_json")
if isinstance(detail, str):
try:
row["detail_json"] = json.loads(detail)
except json.JSONDecodeError:
row["detail_json"] = {}
elif detail is None:
row["detail_json"] = {}
job_key = str(row.get("job_key") or "").strip()
if job_key:
result[job_key] = row
return result
def get_job_history_summary_map(self, job_keys: List[str]) -> Dict[str, Dict[str, Any]]:
"""批量汇总系统任务的执行历史摘要。
返回字段覆盖后台最常用的问题定位视角:
1. 最近成功时间,便于判断任务是否长期没有跑通;
2. 最近失败时间与失败摘要,便于列表页直接看到异常原因;
3. 累计成功/失败/总执行次数,便于粗看任务稳定性。
"""
clean_keys = self._clean_job_keys(job_keys)
if not clean_keys:
return {}
placeholders = ",".join(["%s"] * len(clean_keys))
summary_sql = f"""
SELECT
job_key,
MAX(CASE WHEN status = 'success' THEN triggered_at ELSE NULL END) AS latest_success_at,
MAX(CASE WHEN status = 'failed' THEN triggered_at ELSE NULL END) AS latest_failed_at,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS fail_count,
COUNT(*) AS total_count
FROM t_system_job_logs
WHERE job_key IN ({placeholders})
GROUP BY job_key
"""
latest_failed_sql = f"""
SELECT l.*
FROM t_system_job_logs l
INNER JOIN (
SELECT job_key, MAX(id) AS max_id
FROM t_system_job_logs
WHERE status = 'failed' AND job_key IN ({placeholders})
GROUP BY job_key
) t ON l.id = t.max_id
"""
summary_rows = self.execute_query(summary_sql, tuple(clean_keys)) or []
latest_failed_rows = self.execute_query(latest_failed_sql, tuple(clean_keys)) or []
result: Dict[str, Dict[str, Any]] = {}
for row in summary_rows:
job_key = str(row.get("job_key") or "").strip()
if not job_key:
continue
result[job_key] = {
"latest_success_at": row.get("latest_success_at"),
"latest_failed_at": row.get("latest_failed_at"),
"latest_failure_summary": "",
"latest_failure_detail": {},
"history_success_count": int(row.get("success_count") or 0),
"history_fail_count": int(row.get("fail_count") or 0),
"history_total_count": int(row.get("total_count") or 0),
}
for row in latest_failed_rows:
job_key = str(row.get("job_key") or "").strip()
if not job_key:
continue
detail = row.get("detail_json")
if isinstance(detail, str):
try:
detail = json.loads(detail)
except json.JSONDecodeError:
detail = {}
elif detail is None:
detail = {}
history = result.setdefault(
job_key,
{
"latest_success_at": None,
"latest_failed_at": row.get("triggered_at"),
"latest_failure_summary": "",
"latest_failure_detail": {},
"history_success_count": 0,
"history_fail_count": 0,
"history_total_count": 0,
},
)
history["latest_failed_at"] = row.get("triggered_at")
history["latest_failure_summary"] = str(row.get("summary") or "").strip()
history["latest_failure_detail"] = detail or {}
return result
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(