Files
abot/db/system_job_db.py
2026-05-01 12:45:40 +08:00

188 lines
6.8 KiB
Python

# -*- coding: utf-8 -*-
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class SystemJobDBOperator(BaseDBOperator):
"""系统级定时任务配置表操作。"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def init_tables(self) -> bool:
"""初始化系统任务配置表与日志表。"""
try:
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_system_jobs (
job_key VARCHAR(64) PRIMARY KEY,
name VARCHAR(128) NOT NULL,
description VARCHAR(255) DEFAULT '',
trigger_type VARCHAR(64) NOT NULL,
trigger_config JSON NOT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
"""
)
# 系统任务执行日志表:用于持久化记录每次任务执行结果,避免重启后日志丢失。
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_system_job_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_key VARCHAR(64) NOT NULL,
triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(32) NOT NULL,
summary VARCHAR(255) DEFAULT '',
detail_json JSON DEFAULT NULL,
duration_ms INT DEFAULT NULL,
INDEX idx_job_time (job_key, triggered_at)
)
"""
)
return True
except Exception as e:
logger.error(f"初始化 t_system_jobs 失败: {e}")
return False
def list_jobs(self) -> List[Dict[str, Any]]:
rows = self.execute_query("SELECT * FROM t_system_jobs ORDER BY created_at ASC") or []
for row in rows:
cfg = row.get("trigger_config")
if isinstance(cfg, str):
try:
row["trigger_config"] = json.loads(cfg)
except json.JSONDecodeError:
row["trigger_config"] = {}
return rows
def get_job(self, job_key: str) -> Optional[Dict[str, Any]]:
row = self.execute_query("SELECT * FROM t_system_jobs WHERE job_key = %s", (job_key,), fetch_one=True)
if not row:
return None
cfg = row.get("trigger_config")
if isinstance(cfg, str):
try:
row["trigger_config"] = json.loads(cfg)
except json.JSONDecodeError:
row["trigger_config"] = {}
return row
def upsert_job(self, data: Dict[str, Any]) -> bool:
try:
sql = """
INSERT INTO t_system_jobs (
job_key, name, description, trigger_type, trigger_config, enabled
) VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
trigger_type = VALUES(trigger_type),
trigger_config = VALUES(trigger_config),
enabled = VALUES(enabled)
"""
params = (
data["job_key"],
data["name"],
data.get("description", ""),
data["trigger_type"],
json.dumps(data.get("trigger_config", {}), ensure_ascii=False),
1 if data.get("enabled", True) else 0,
)
return self.execute_update(sql, params)
except Exception as e:
logger.error(f"upsert 系统任务失败: {e}, data={data}")
return False
def update_job(self, job_key: str, updates: Dict[str, Any]) -> bool:
fields = []
values: List[Any] = []
for key in ("name", "description", "trigger_type", "enabled"):
if key in updates:
fields.append(f"{key} = %s")
if key == "enabled":
values.append(1 if updates[key] else 0)
else:
values.append(updates[key])
if "trigger_config" in updates:
fields.append("trigger_config = %s")
values.append(json.dumps(updates.get("trigger_config", {}), ensure_ascii=False))
if not fields:
return True
values.append(job_key)
sql = f"UPDATE t_system_jobs SET {', '.join(fields)} WHERE job_key = %s"
return self.execute_update(sql, tuple(values))
def delete_job(self, job_key: str) -> bool:
return self.execute_update("DELETE FROM t_system_jobs WHERE job_key = %s", (job_key,))
def create_job_log(
self,
job_key: str,
status: str,
summary: str,
detail: Optional[Dict[str, Any]] = None,
duration_ms: Optional[int] = None,
) -> bool:
"""写入系统任务执行日志。"""
sql = """
INSERT INTO t_system_job_logs (job_key, status, summary, detail_json, duration_ms)
VALUES (%s, %s, %s, %s, %s)
"""
params = (
str(job_key),
str(status),
str(summary or ""),
json.dumps(detail or {}, ensure_ascii=False),
duration_ms if duration_ms is not None else None,
)
return self.execute_update(sql, params)
def get_job_logs(self, job_key: str, limit: int = 100) -> List[Dict[str, Any]]:
"""获取系统任务持久化日志。"""
rows = self.execute_query(
"""
SELECT * FROM t_system_job_logs
WHERE job_key = %s
ORDER BY triggered_at DESC
LIMIT %s
""",
(str(job_key), int(limit)),
) or []
for row in rows:
detail = row.get("detail_json")
if isinstance(detail, str):
try:
row["detail_json"] = json.loads(detail)
except json.JSONDecodeError:
row["detail_json"] = {}
elif detail is None:
row["detail_json"] = {}
return rows
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(
"""
SELECT triggered_at
FROM t_system_job_logs
WHERE job_key = %s
ORDER BY triggered_at DESC
LIMIT 1
""",
(str(job_key),),
fetch_one=True,
) or {}
return row.get("triggered_at")