From 3226fabcec8da35d01f120ce3b4828572982f20f Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 17 Apr 2026 09:38:15 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=87=8D=E5=90=AF=E6=BC=8F=E6=89=A7=E8=A1=8C=E8=A1=A5?= =?UTF-8?q?=E5=81=BF=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更项:1) 系统任务与插件调度重载后基于应执行时间和执行日志对账,判定是否漏执行。2) 仅在应执行时间已过且日志未覆盖时补跑一次,避免重复补偿。3) system_job_db 与 plugin_schedule_db 新增 get_latest_log_time 查询。4) 增加容差窗口与中文注释,降低误判概率。 --- db/plugin_schedule_db.py | 16 ++++++ db/system_job_db.py | 16 ++++++ utils/plugin_schedule_manager.py | 90 +++++++++++++++++++++++++++++- utils/system_jobs.py | 96 +++++++++++++++++++++++++++++++- 4 files changed, 216 insertions(+), 2 deletions(-) diff --git a/db/plugin_schedule_db.py b/db/plugin_schedule_db.py index 05f4ee1..1702552 100644 --- a/db/plugin_schedule_db.py +++ b/db/plugin_schedule_db.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import json +from datetime import datetime from typing import Any, Dict, List, Optional from loguru import logger @@ -199,3 +200,18 @@ class PluginScheduleDBOperator(BaseDBOperator): for row in rows: self._parse_json_field(row, "detail_json") return rows + + def get_latest_log_time(self, schedule_id: int) -> Optional[datetime]: + """获取调度任务最新一次执行日志时间。""" + row = self.execute_query( + """ + SELECT triggered_at + FROM t_plugin_schedule_logs + WHERE schedule_id = %s + ORDER BY triggered_at DESC + LIMIT 1 + """, + (int(schedule_id),), + fetch_one=True, + ) or {} + return row.get("triggered_at") diff --git a/db/system_job_db.py b/db/system_job_db.py index 9ec0e81..9a10d0f 100644 --- a/db/system_job_db.py +++ b/db/system_job_db.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import json +from datetime import datetime from typing import Any, Dict, List, Optional from loguru import logger @@ -169,3 +170,18 @@ class SystemJobDBOperator(BaseDBOperator): elif detail is None: row["detail_json"] = {} return rows + + def get_latest_log_time(self, job_key: str) -> Optional[datetime]: + """获取任务最新一次执行日志时间。""" + row = self.execute_query( + """ + SELECT triggered_at + FROM t_system_job_logs + WHERE job_key = %s + ORDER BY triggered_at DESC + LIMIT 1 + """, + (str(job_key),), + fetch_one=True, + ) or {} + return row.get("triggered_at") diff --git a/utils/plugin_schedule_manager.py b/utils/plugin_schedule_manager.py index 2fa24b8..cb4577d 100644 --- a/utils/plugin_schedule_manager.py +++ b/utils/plugin_schedule_manager.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from datetime import datetime +import asyncio +from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from loguru import logger @@ -18,6 +19,7 @@ class PluginScheduleManager: self.plugin_manager = plugin_manager self.db = plugin_schedule_db self._schedule_job_map: Dict[int, str] = {} + self._compensation_tolerance_seconds = 120 def init_and_load(self): self.db.init_tables() @@ -134,6 +136,79 @@ class PluginScheduleManager: enabled_groups.append(gid) return enabled_groups + @staticmethod + def _latest_expected_run_before_now(trigger_type: str, trigger_config: Dict[str, Any], now: datetime) -> datetime | None: + cfg = trigger_config or {} + if trigger_type == "every_seconds": + seconds = int(cfg.get("seconds") or 0) + if seconds <= 0: + return None + return now - timedelta(seconds=seconds) + + if trigger_type == "at_times": + time_list = cfg.get("time_list") or [] + candidates = [] + for text in time_list: + try: + tm = datetime.strptime(str(text), "%H:%M").time() + except Exception: + continue + dt = datetime.combine(now.date(), tm) + if dt > now: + dt -= timedelta(days=1) + candidates.append(dt) + return max(candidates) if candidates else None + + if trigger_type in ("every_weekday_time", "every_week_time"): + try: + weekday = int(cfg.get("weekday")) + tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time() + except Exception: + return None + days_ago = (now.weekday() - weekday + 7) % 7 + dt = datetime.combine((now - timedelta(days=days_ago)).date(), tm) + if dt > now: + dt -= timedelta(days=7) + return dt + + if trigger_type == "every_month_last_day_time": + try: + tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time() + except Exception: + return None + if now.month == 12: + next_month = datetime(now.year + 1, 1, 1) + else: + next_month = datetime(now.year, now.month + 1, 1) + last_day = next_month - timedelta(days=1) + dt = datetime.combine(last_day.date(), tm) + if dt > now: + if now.month == 1: + prev_next_month = datetime(now.year, 1, 1) + else: + prev_next_month = datetime(now.year, now.month, 1) + prev_last_day = prev_next_month - timedelta(days=1) + dt = datetime.combine(prev_last_day.date(), tm) + return dt + return None + + @staticmethod + def _run_coro_blocking(coro): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + return loop.create_task(coro) + + def _should_compensate_once(self, schedule_id: int, trigger_type: str, trigger_config: Dict[str, Any]) -> bool: + expected_at = self._latest_expected_run_before_now(trigger_type, trigger_config, datetime.now()) + if not expected_at: + return False + latest_log_at = self.db.get_latest_log_time(schedule_id) + if not latest_log_at: + return False + return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds)) + async def _run_one_schedule(self, schedule_row: Dict[str, Any]) -> Dict[str, Any]: schedule_id = int(schedule_row["id"]) action_key = schedule_row.get("action_key") @@ -202,6 +277,19 @@ class PluginScheduleManager: ) self._schedule_job_map[schedule_id] = job_id + # 重启/重载补偿:如果最近一次应执行时间已过且日志未覆盖,补跑一次。 + try: + trigger_type = row.get("trigger_type", "at_times") + trigger_config = row.get("trigger_config", {"time_list": ["09:00"]}) + if self._should_compensate_once(schedule_id, trigger_type, trigger_config): + logger.warning( + f"插件调度触发漏执行补偿: schedule_id={schedule_id}, " + f"plugin={row.get('plugin_name')}, action={row.get('action_key')}" + ) + self._run_coro_blocking(_runner()) + except Exception as e: + logger.error(f"插件调度漏执行补偿失败: schedule_id={schedule_id}, error={e}") + def list_schedules_with_runtime(self) -> List[Dict[str, Any]]: db_rows = self.db.list_schedules() runtime_rows = async_job.get_jobs_snapshot() diff --git a/utils/system_jobs.py b/utils/system_jobs.py index 18c4587..a6141cd 100644 --- a/utils/system_jobs.py +++ b/utils/system_jobs.py @@ -2,7 +2,8 @@ from __future__ import annotations import inspect -from datetime import datetime +import asyncio +from datetime import datetime, timedelta from typing import Any, Awaitable, Callable, Dict, List from loguru import logger @@ -60,6 +61,89 @@ class SystemJobLoader: self.db = system_job_db self._job_defs = {item["job_key"]: item for item in get_system_job_definitions(robot)} self._registered_job_ids: List[str] = [] + # 防止系统时钟误差导致“刚执行完又被判定漏跑”,给一个小容差窗口。 + self._compensation_tolerance_seconds = 120 + + @staticmethod + def _latest_expected_run_before_now(trigger_type: str, trigger_config: Dict[str, Any], now: datetime) -> datetime | None: + """根据调度配置计算“当前时刻之前最近一次应执行时间”。 + + 注意:这里只用于漏执行补偿判定,不用于替代 async_job 的正式调度。 + """ + cfg = trigger_config or {} + if trigger_type == "every_seconds": + seconds = int(cfg.get("seconds") or 0) + if seconds <= 0: + return None + return now - timedelta(seconds=seconds) + + if trigger_type == "at_times": + time_list = cfg.get("time_list") or [] + candidates = [] + for text in time_list: + try: + tm = datetime.strptime(str(text), "%H:%M").time() + except Exception: + continue + dt = datetime.combine(now.date(), tm) + if dt > now: + dt -= timedelta(days=1) + candidates.append(dt) + return max(candidates) if candidates else None + + if trigger_type in ("every_weekday_time", "every_week_time"): + try: + weekday = int(cfg.get("weekday")) + tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time() + except Exception: + return None + days_ago = (now.weekday() - weekday + 7) % 7 + dt = datetime.combine((now - timedelta(days=days_ago)).date(), tm) + if dt > now: + dt -= timedelta(days=7) + return dt + + if trigger_type == "every_month_last_day_time": + try: + tm = datetime.strptime(str(cfg.get("time_str") or ""), "%H:%M").time() + except Exception: + return None + # 先算本月最后一天 + if now.month == 12: + next_month = datetime(now.year + 1, 1, 1) + else: + next_month = datetime(now.year, now.month + 1, 1) + last_day = next_month - timedelta(days=1) + dt = datetime.combine(last_day.date(), tm) + if dt > now: + # 回退到上个月最后一天 + if now.month == 1: + prev_next_month = datetime(now.year, 1, 1) + else: + prev_next_month = datetime(now.year, now.month, 1) + prev_last_day = prev_next_month - timedelta(days=1) + dt = datetime.combine(prev_last_day.date(), tm) + return dt + return None + + @staticmethod + def _run_coro_blocking(coro): + """在同步上下文执行协程:无事件循环则阻塞执行,有事件循环则丢给当前循环。""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + return loop.create_task(coro) + + def _should_compensate_once(self, job_key: str, trigger_type: str, trigger_config: Dict[str, Any]) -> bool: + expected_at = self._latest_expected_run_before_now(trigger_type, trigger_config, datetime.now()) + if not expected_at: + return False + latest_log_at = self.db.get_latest_log_time(job_key) + if not latest_log_at: + # 没有历史执行日志时不做补偿,避免首次上线就批量触发一次。 + return False + return latest_log_at < (expected_at - timedelta(seconds=self._compensation_tolerance_seconds)) def init_and_load(self): self.db.init_tables() @@ -140,3 +224,13 @@ class SystemJobLoader: job_key=job_key, ) self._registered_job_ids.append(job_id) + + # 重载后执行一次漏执行补偿:若最近一次“应执行时间”晚于最新日志,则补跑一次。 + try: + trigger_type = row.get("trigger_type", definition["trigger_type"]) + trigger_config = row.get("trigger_config", definition["trigger_config"]) + if self._should_compensate_once(job_key, trigger_type, trigger_config): + logger.warning(f"系统任务触发漏执行补偿: job_key={job_key}") + self._run_coro_blocking(_wrapped_handler()) + except Exception as e: + logger.error(f"系统任务漏执行补偿失败: job_key={job_key}, error={e}")