加入周期任务里面的每个周期的具体时间

This commit is contained in:
liuwei
2025-06-10 16:27:51 +08:00
parent 0e43a7d488
commit 0570328152
3 changed files with 161 additions and 16 deletions

View File

@@ -1,5 +1,5 @@
import json
from datetime import datetime
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Tuple
from loguru import logger
@@ -224,7 +224,10 @@ class MessagePushTask(MessagePluginInterface):
next_time = self._calculate_next_schedule_time(
task['schedule_time'],
task['recurring_interval'],
task['recurring_end']
task['recurring_end'],
task['recurring_time'],
task['weekly_days'],
task['monthly_day']
)
if next_time:
self.db.update_task(task['task_id'], {
@@ -250,30 +253,108 @@ class MessagePushTask(MessagePluginInterface):
except Exception as e:
self.LOG.error(f"处理定时任务出错: {e}")
def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None) -> \
Optional[datetime]:
def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None,
recurring_time: str = None, weekly_days: List[int] = None,
monthly_day: int = None) -> Optional[datetime]:
"""计算下次执行时间
Args:
current_time: 当前执行时间
interval: 重复间隔daily/weekly/monthly
end_time: 结束时间
recurring_time: 重复执行时间HH:mm格式
weekly_days: 每周执行日列表0-60表示周日
monthly_day: 每月执行日1-31
Returns:
下次执行时间如果已超过结束时间则返回None
"""
try:
if not end_time or current_time < end_time:
# 解析执行时间
if recurring_time:
try:
hour, minute = map(int, recurring_time.split(':'))
if not (0 <= hour <= 23 and 0 <= minute <= 59):
self.LOG.error(f"无效的执行时间格式: {recurring_time}")
return None
except ValueError:
self.LOG.error(f"执行时间格式错误: {recurring_time}")
return None
else:
hour, minute = current_time.hour, current_time.minute
if interval == 'daily':
next_time = current_time.replace(day=current_time.day + 1)
elif interval == 'weekly':
next_time = current_time.replace(day=current_time.day + 7)
elif interval == 'monthly':
# 处理月份边界情况
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1)
# 如果是首次执行,使用当前时间
if current_time == datetime.now():
next_time = current_time
else:
next_time = current_time.replace(month=current_time.month + 1)
# 否则,设置为明天的同一时间
next_time = current_time + timedelta(days=1)
next_time = next_time.replace(hour=hour, minute=minute)
elif interval == 'weekly' and weekly_days:
# 验证每周执行日
if not all(0 <= day <= 6 for day in weekly_days):
self.LOG.error(f"无效的每周执行日: {weekly_days}")
return None
# 获取当前是周几0-60表示周日
current_weekday = current_time.weekday()
# 找到下一个执行日
next_weekday = None
for day in sorted(weekly_days):
if day > current_weekday:
next_weekday = day
break
if next_weekday is None:
# 如果当前是本周最后一个执行日,则从下周第一个执行日开始
next_weekday = min(weekly_days)
days_ahead = 7 - current_weekday + next_weekday
else:
days_ahead = next_weekday - current_weekday
next_time = current_time + timedelta(days=days_ahead)
next_time = next_time.replace(hour=hour, minute=minute)
elif interval == 'monthly' and monthly_day:
# 验证每月执行日
if not (1 <= monthly_day <= 31):
self.LOG.error(f"无效的每月执行日: {monthly_day}")
return None
# 获取当前日期
current_day = current_time.day
# 计算下一个执行日期
if current_day < monthly_day:
# 如果当前日期小于执行日期,则在本月执行
try:
next_time = current_time.replace(day=monthly_day)
except ValueError:
# 处理无效日期如2月30日
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1, day=1)
else:
next_time = current_time.replace(month=current_time.month + 1, day=1)
else:
# 否则在下个月执行
if current_time.month == 12:
next_time = current_time.replace(year=current_time.year + 1, month=1, day=1)
else:
next_time = current_time.replace(month=current_time.month + 1, day=1)
# 尝试设置到指定的日期
try:
next_time = next_time.replace(day=monthly_day)
except ValueError:
# 如果下个月的指定日期无效,则使用下个月的最后一天
if next_time.month == 12:
next_time = next_time.replace(year=next_time.year + 1, month=1, day=1)
else:
next_time = next_time.replace(month=next_time.month + 1, day=1)
next_time = next_time - timedelta(days=1)
next_time = next_time.replace(hour=hour, minute=minute)
else:
return None