新增定时任务重启漏执行补偿机制

变更项:1) 系统任务与插件调度重载后基于应执行时间和执行日志对账,判定是否漏执行。2) 仅在应执行时间已过且日志未覆盖时补跑一次,避免重复补偿。3) system_job_db 与 plugin_schedule_db 新增 get_latest_log_time 查询。4) 增加容差窗口与中文注释,降低误判概率。
This commit is contained in:
liuwei
2026-04-17 09:38:15 +08:00
parent 6af91756d3
commit 3226fabcec
4 changed files with 216 additions and 2 deletions

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from loguru import logger from loguru import logger
@@ -199,3 +200,18 @@ class PluginScheduleDBOperator(BaseDBOperator):
for row in rows: for row in rows:
self._parse_json_field(row, "detail_json") self._parse_json_field(row, "detail_json")
return rows return rows
def get_latest_log_time(self, schedule_id: int) -> Optional[datetime]:
"""获取调度任务最新一次执行日志时间。"""
row = self.execute_query(
"""
SELECT triggered_at
FROM t_plugin_schedule_logs
WHERE schedule_id = %s
ORDER BY triggered_at DESC
LIMIT 1
""",
(int(schedule_id),),
fetch_one=True,
) or {}
return row.get("triggered_at")

View File

@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
from datetime import datetime
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from loguru import logger from loguru import logger
@@ -169,3 +170,18 @@ class SystemJobDBOperator(BaseDBOperator):
elif detail is None: elif detail is None:
row["detail_json"] = {} row["detail_json"] = {}
return rows return rows
def get_latest_log_time(self, job_key: str) -> Optional[datetime]:
"""获取任务最新一次执行日志时间。"""
row = self.execute_query(
"""
SELECT triggered_at
FROM t_system_job_logs
WHERE job_key = %s
ORDER BY triggered_at DESC
LIMIT 1
""",
(str(job_key),),
fetch_one=True,
) or {}
return row.get("triggered_at")

View File

@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import annotations from __future__ import annotations
from datetime import datetime import asyncio
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from loguru import logger from loguru import logger
@@ -18,6 +19,7 @@ class PluginScheduleManager:
self.plugin_manager = plugin_manager self.plugin_manager = plugin_manager
self.db = plugin_schedule_db self.db = plugin_schedule_db
self._schedule_job_map: Dict[int, str] = {} self._schedule_job_map: Dict[int, str] = {}
self._compensation_tolerance_seconds = 120
def init_and_load(self): def init_and_load(self):
self.db.init_tables() self.db.init_tables()
@@ -134,6 +136,79 @@ class PluginScheduleManager:
enabled_groups.append(gid) enabled_groups.append(gid)
return enabled_groups return enabled_groups
@staticmethod
def _latest_expected_run_before_now(trigger_type: str, trigger_config: Dict[str, Any], now: datetime) -> datetime | None:
cfg = trigger_config or {}
if trigger_type == "every_seconds":
seconds = int(cfg.get("seconds") or 0)
if seconds <= 0:
return None
return now - timedelta(seconds=seconds)
if trigger_type == "at_times":
time_list = cfg.get("time_list") or []
candidates = []
for text in time_list:
try:
tm = datetime.strptime(str(text), "%H:%M").time()
except Exception:
continue
dt = datetime.combine(now.date(), tm)
if dt > now:
dt -= timedelta(days=1)
candidates.append(dt)
return max(candidates) if candidates else None
if trigger_type in ("every_weekday_time", "every_week_time"):
try:
weekday = int(cfg.get("weekday"))
tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time()
except Exception:
return None
days_ago = (now.weekday() - weekday + 7) % 7
dt = datetime.combine((now - timedelta(days=days_ago)).date(), tm)
if dt > now:
dt -= timedelta(days=7)
return dt
if trigger_type == "every_month_last_day_time":
try:
tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time()
except Exception:
return None
if now.month == 12:
next_month = datetime(now.year + 1, 1, 1)
else:
next_month = datetime(now.year, now.month + 1, 1)
last_day = next_month - timedelta(days=1)
dt = datetime.combine(last_day.date(), tm)
if dt > now:
if now.month == 1:
prev_next_month = datetime(now.year, 1, 1)
else:
prev_next_month = datetime(now.year, now.month, 1)
prev_last_day = prev_next_month - timedelta(days=1)
dt = datetime.combine(prev_last_day.date(), tm)
return dt
return None
@staticmethod
def _run_coro_blocking(coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
return loop.create_task(coro)
def _should_compensate_once(self, schedule_id: int, trigger_type: str, trigger_config: Dict[str, Any]) -> bool:
expected_at = self._latest_expected_run_before_now(trigger_type, trigger_config, datetime.now())
if not expected_at:
return False
latest_log_at = self.db.get_latest_log_time(schedule_id)
if not latest_log_at:
return False
return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds))
async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]: async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]:
schedule_id = int(schedule_row["id"]) schedule_id = int(schedule_row["id"])
action_key = schedule_row.get("action_key") action_key = schedule_row.get("action_key")
@@ -202,6 +277,19 @@ class PluginScheduleManager:
) )
self._schedule_job_map[schedule_id] = job_id self._schedule_job_map[schedule_id] = job_id
# 重启/重载补偿:如果最近一次应执行时间已过且日志未覆盖,补跑一次。
try:
trigger_type = row.get("trigger_type", "at_times")
trigger_config = row.get("trigger_config", {"time_list": ["09:00"]})
if self._should_compensate_once(schedule_id, trigger_type, trigger_config):
logger.warning(
f"插件调度触发漏执行补偿: schedule_id={schedule_id}, "
f"plugin={row.get('plugin_name')}, action={row.get('action_key')}"
)
self._run_coro_blocking(_runner())
except Exception as e:
logger.error(f"插件调度漏执行补偿失败: schedule_id={schedule_id}, error={e}")
def list_schedules_with_runtime(self) -> List[Dict[str, Any]]: def list_schedules_with_runtime(self) -> List[Dict[str, Any]]:
db_rows = self.db.list_schedules() db_rows = self.db.list_schedules()
runtime_rows = async_job.get_jobs_snapshot() runtime_rows = async_job.get_jobs_snapshot()

View File

@@ -2,7 +2,8 @@
from __future__ import annotations from __future__ import annotations
import inspect import inspect
from datetime import datetime import asyncio
from datetime import datetime, timedelta
from typing import Any, Awaitable, Callable, Dict, List from typing import Any, Awaitable, Callable, Dict, List
from loguru import logger from loguru import logger
@@ -60,6 +61,89 @@ class SystemJobLoader:
self.db = system_job_db self.db = system_job_db
self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)} self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)}
self._registered_job_ids: List[str] = [] self._registered_job_ids: List[str] = []
# 防止系统时钟误差导致“刚执行完又被判定漏跑”,给一个小容差窗口。
self._compensation_tolerance_seconds = 120
@staticmethod
def _latest_expected_run_before_now(trigger_type: str, trigger_config: Dict[str, Any], now: datetime) -> datetime | None:
"""根据调度配置计算“当前时刻之前最近一次应执行时间”。
注意:这里只用于漏执行补偿判定,不用于替代 async_job 的正式调度。
"""
cfg = trigger_config or {}
if trigger_type == "every_seconds":
seconds = int(cfg.get("seconds") or 0)
if seconds <= 0:
return None
return now - timedelta(seconds=seconds)
if trigger_type == "at_times":
time_list = cfg.get("time_list") or []
candidates = []
for text in time_list:
try:
tm = datetime.strptime(str(text), "%H:%M").time()
except Exception:
continue
dt = datetime.combine(now.date(), tm)
if dt > now:
dt -= timedelta(days=1)
candidates.append(dt)
return max(candidates) if candidates else None
if trigger_type in ("every_weekday_time", "every_week_time"):
try:
weekday = int(cfg.get("weekday"))
tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time()
except Exception:
return None
days_ago = (now.weekday() - weekday + 7) % 7
dt = datetime.combine((now - timedelta(days=days_ago)).date(), tm)
if dt > now:
dt -= timedelta(days=7)
return dt
if trigger_type == "every_month_last_day_time":
try:
tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time()
except Exception:
return None
# 先算本月最后一天
if now.month == 12:
next_month = datetime(now.year + 1, 1, 1)
else:
next_month = datetime(now.year, now.month + 1, 1)
last_day = next_month - timedelta(days=1)
dt = datetime.combine(last_day.date(), tm)
if dt > now:
# 回退到上个月最后一天
if now.month == 1:
prev_next_month = datetime(now.year, 1, 1)
else:
prev_next_month = datetime(now.year, now.month, 1)
prev_last_day = prev_next_month - timedelta(days=1)
dt = datetime.combine(prev_last_day.date(), tm)
return dt
return None
@staticmethod
def _run_coro_blocking(coro):
"""在同步上下文执行协程:无事件循环则阻塞执行,有事件循环则丢给当前循环。"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
return loop.create_task(coro)
def _should_compensate_once(self, job_key: str, trigger_type: str, trigger_config: Dict[str, Any]) -> bool:
expected_at = self._latest_expected_run_before_now(trigger_type, trigger_config, datetime.now())
if not expected_at:
return False
latest_log_at = self.db.get_latest_log_time(job_key)
if not latest_log_at:
# 没有历史执行日志时不做补偿,避免首次上线就批量触发一次。
return False
return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds))
def init_and_load(self): def init_and_load(self):
self.db.init_tables() self.db.init_tables()
@@ -140,3 +224,13 @@ class SystemJobLoader:
job_key=job_key, job_key=job_key,
) )
self._registered_job_ids.append(job_id) self._registered_job_ids.append(job_id)
# 重载后执行一次漏执行补偿:若最近一次“应执行时间”晚于最新日志,则补跑一次。
try:
trigger_type = row.get("trigger_type", definition["trigger_type"])
trigger_config = row.get("trigger_config", definition["trigger_config"])
if self._should_compensate_once(job_key, trigger_type, trigger_config):
logger.warning(f"系统任务触发漏执行补偿: job_key={job_key}")
self._run_coro_blocking(_wrapped_handler())
except Exception as e:
logger.error(f"系统任务漏执行补偿失败: job_key={job_key}, error={e}")