Files
abot/db/plugin_schedule_db.py
liuwei 3226fabcec 新增定时任务重启漏执行补偿机制
变更项:1) 系统任务与插件调度重载后基于应执行时间和执行日志对账,判定是否漏执行。2) 仅在应执行时间已过且日志未覆盖时补跑一次,避免重复补偿。3) system_job_db 与 plugin_schedule_db 新增 get_latest_log_time 查询。4) 增加容差窗口与中文注释,降低误判概率。
2026-04-17 09:38:15 +08:00

218 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
import json
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class PluginScheduleDBOperator(BaseDBOperator):
"""插件定时任务配置与日志表操作。"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
def init_tables(self) -> bool:
try:
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_plugin_schedules (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
plugin_name VARCHAR(128) NOT NULL,
action_key VARCHAR(64) NOT NULL,
action_name VARCHAR(128) NOT NULL,
description VARCHAR(255) DEFAULT '',
trigger_type VARCHAR(64) NOT NULL,
trigger_config JSON NOT NULL,
target_scope VARCHAR(64) NOT NULL DEFAULT 'all_enabled_groups',
target_config JSON DEFAULT NULL,
payload JSON DEFAULT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY uk_plugin_action (plugin_name, action_key)
)
"""
)
self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_plugin_schedule_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
schedule_id BIGINT NOT NULL,
triggered_at DATETIME DEFAULT CURRENT_TIMESTAMP,
status VARCHAR(32) NOT NULL,
summary VARCHAR(255) DEFAULT '',
detail_json JSON DEFAULT NULL,
INDEX idx_schedule_time (schedule_id, triggered_at)
)
"""
)
return True
except Exception as e:
logger.error(f"初始化插件调度表失败: {e}")
return False
@staticmethod
def _parse_json_field(row: Dict[str, Any], key: str):
value = row.get(key)
if isinstance(value, str):
try:
row[key] = json.loads(value)
except json.JSONDecodeError:
row[key] = {}
elif value is None:
row[key] = {}
def list_schedules(self) -> List[Dict[str, Any]]:
rows = self.execute_query("SELECT * FROM t_plugin_schedules ORDER BY plugin_name, action_name") or []
for row in rows:
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return rows
def list_enabled_schedules(self) -> List[Dict[str, Any]]:
rows = self.execute_query(
"SELECT * FROM t_plugin_schedules WHERE enabled = 1 ORDER BY plugin_name, action_name"
) or []
for row in rows:
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return rows
def get_schedule(self, schedule_id: int) -> Optional[Dict[str, Any]]:
row = self.execute_query(
"SELECT * FROM t_plugin_schedules WHERE id = %s",
(schedule_id,),
fetch_one=True,
)
if not row:
return None
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return row
def get_schedule_by_plugin_action(self, plugin_name: str, action_key: str) -> Optional[Dict[str, Any]]:
"""按插件名+动作键查询调度配置。"""
row = self.execute_query(
"""
SELECT * FROM t_plugin_schedules
WHERE plugin_name = %s AND action_key = %s
LIMIT 1
""",
(plugin_name, action_key),
fetch_one=True,
)
if not row:
return None
self._parse_json_field(row, "trigger_config")
self._parse_json_field(row, "target_config")
self._parse_json_field(row, "payload")
return row
def upsert_default_schedule(self, data: Dict[str, Any]) -> bool:
try:
sql = """
INSERT INTO t_plugin_schedules (
plugin_name, action_key, action_name, description,
trigger_type, trigger_config, target_scope, target_config, payload, enabled
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
action_name = VALUES(action_name),
description = VALUES(description)
"""
params = (
data["plugin_name"],
data["action_key"],
data["action_name"],
data.get("description", ""),
data["trigger_type"],
json.dumps(data.get("trigger_config", {}), ensure_ascii=False),
data.get("target_scope", "all_enabled_groups"),
json.dumps(data.get("target_config", {}), ensure_ascii=False),
json.dumps(data.get("payload", {}), ensure_ascii=False),
1 if data.get("enabled", False) else 0,
)
return self.execute_update(sql, params)
except Exception as e:
logger.error(f"upsert 插件默认调度失败: {e}, data={data}")
return False
def update_schedule(self, schedule_id: int, updates: Dict[str, Any]) -> bool:
fields = []
values = []
for key in (
"action_name",
"description",
"trigger_type",
"target_scope",
"enabled",
):
if key in updates:
fields.append(f"{key} = %s")
if key == "enabled":
values.append(1 if updates[key] else 0)
else:
values.append(updates[key])
for key in ("trigger_config", "target_config", "payload"):
if key in updates:
fields.append(f"{key} = %s")
values.append(json.dumps(updates.get(key, {}), ensure_ascii=False))
if not fields:
return True
values.append(schedule_id)
sql = f"UPDATE t_plugin_schedules SET {', '.join(fields)} WHERE id = %s"
return self.execute_update(sql, tuple(values))
def create_log(self, schedule_id: int, status: str, summary: str, detail: Dict[str, Any]) -> bool:
sql = """
INSERT INTO t_plugin_schedule_logs (schedule_id, status, summary, detail_json)
VALUES (%s, %s, %s, %s)
"""
params = (
schedule_id,
status,
summary,
json.dumps(detail or {}, ensure_ascii=False),
)
return self.execute_update(sql, params)
def get_logs(self, schedule_id: int, limit: int = 100) -> List[Dict[str, Any]]:
rows = self.execute_query(
"""
SELECT * FROM t_plugin_schedule_logs
WHERE schedule_id = %s
ORDER BY triggered_at DESC
LIMIT %s
""",
(schedule_id, int(limit)),
) or []
for row in rows:
self._parse_json_field(row, "detail_json")
return rows
def get_latest_log_time(self, schedule_id: int) -> Optional[datetime]:
"""获取调度任务最新一次执行日志时间。"""
row = self.execute_query(
"""
SELECT triggered_at
FROM t_plugin_schedule_logs
WHERE schedule_id = %s
ORDER BY triggered_at DESC
LIMIT 1
""",
(int(schedule_id),),
fetch_one=True,
) or {}
return row.get("triggered_at")