From 7b08e8dec8d3710c6f929119b9d0a276c1794963 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 10 Jun 2025 17:28:13 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins/message_push_task/main.py | 189 ++++++++++++++++-------------- 1 file changed, 102 insertions(+), 87 deletions(-) diff --git a/plugins/message_push_task/main.py b/plugins/message_push_task/main.py index 1fe693c..736ee30 100644 --- a/plugins/message_push_task/main.py +++ b/plugins/message_push_task/main.py @@ -133,6 +133,25 @@ class MessagePushTask(MessagePluginInterface): try: self.LOG.info(f"开始处理定时任务: {task['task_id']}") + # 检查任务是否应该执行 + now = datetime.now() + schedule_time = task['schedule_time'] + + # 对于重复任务,检查是否到达执行时间点 + if task['schedule_type'] == 'recurring': + recurring_time = task.get('recurring_time') + if recurring_time: + try: + hour, minute = map(int, recurring_time.split(':')) + # 检查当前时间是否在目标时间的前后5秒内 + target_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + time_diff = abs((now - target_time).total_seconds()) + if time_diff > 5: # 允许5秒的误差 + continue + except (ValueError, AttributeError): + self.LOG.error(f"无效的执行时间格式: {recurring_time}") + continue + # 更新任务状态为执行中 self.db.update_task(task['task_id'], {'status': 'running'}) @@ -253,117 +272,113 @@ class MessagePushTask(MessagePluginInterface): except Exception as e: self.LOG.error(f"处理定时任务出错: {e}") - def _calculate_next_schedule_time(self, current_time: datetime, interval: str, end_time: datetime = None, - recurring_time: str = None, weekly_days: List[int] = None, - monthly_day: int = None) -> Optional[datetime]: - """计算下次执行时间 - - Args: - current_time: 当前执行时间 - interval: 重复间隔(daily/weekly/monthly) - end_time: 结束时间 - recurring_time: 重复执行时间(HH:mm格式) - weekly_days: 每周执行日列表(0-6,0表示周日) - monthly_day: 每月执行日(1-31) - - Returns: - 下次执行时间,如果已超过结束时间则返回None - """ + def _calculate_next_schedule_time(self, schedule_time: datetime, recurring_interval: str, + recurring_end: datetime, recurring_time: str, + weekly_days: str = None, monthly_day: int = None) -> Optional[datetime]: + """计算下次执行时间""" try: - if not end_time or current_time < end_time: - # 解析执行时间 - if recurring_time: - try: - hour, minute = map(int, recurring_time.split(':')) - if not (0 <= hour <= 23 and 0 <= minute <= 59): - self.LOG.error(f"无效的执行时间格式: {recurring_time}") - return None - except ValueError: - self.LOG.error(f"执行时间格式错误: {recurring_time}") - return None - else: - hour, minute = current_time.hour, current_time.minute + # 解析执行时间 + if recurring_time: + try: + hour, minute = map(int, recurring_time.split(':')) + except (ValueError, AttributeError): + self.LOG.error(f"无效的执行时间格式: {recurring_time}") + return None + else: + hour, minute = schedule_time.hour, schedule_time.minute - if interval == 'daily': - # 如果是首次执行,使用当前时间 - if current_time == datetime.now(): - next_time = current_time - else: - # 否则,设置为明天的同一时间 - next_time = current_time + timedelta(days=1) - next_time = next_time.replace(hour=hour, minute=minute) + # 获取当前时间 + now = datetime.now() + + # 如果已经超过结束时间,返回None + if recurring_end and now > recurring_end: + return None + + # 根据重复间隔计算下次执行时间 + if recurring_interval == 'daily': + # 每天执行 + next_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_time <= now: + next_time = next_time + timedelta(days=1) - elif interval == 'weekly' and weekly_days: - # 验证每周执行日 - if not all(0 <= day <= 6 for day in weekly_days): - self.LOG.error(f"无效的每周执行日: {weekly_days}") + elif recurring_interval == 'weekly': + # 每周执行 + try: + if isinstance(weekly_days, str): + weekly_days = json.loads(weekly_days) + if not weekly_days: return None - # 获取当前是周几(0-6,0表示周日) - current_weekday = current_time.weekday() + # 获取当前是周几(0-6,0是周一) + current_weekday = now.weekday() + # 找到下一个执行日 next_weekday = None for day in sorted(weekly_days): + day = int(day) if day > current_weekday: next_weekday = day break + + # 如果本周没有下一个执行日,取下周的第一个执行日 if next_weekday is None: - # 如果当前是本周最后一个执行日,则从下周第一个执行日开始 - next_weekday = min(weekly_days) + next_weekday = int(weekly_days[0]) days_ahead = 7 - current_weekday + next_weekday else: days_ahead = next_weekday - current_weekday - next_time = current_time + timedelta(days=days_ahead) - next_time = next_time.replace(hour=hour, minute=minute) + next_time = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead) + + except (json.JSONDecodeError, ValueError, IndexError) as e: + self.LOG.error(f"处理每周执行日失败: {e}") + return None - elif interval == 'monthly' and monthly_day: - # 验证每月执行日 - if not (1 <= monthly_day <= 31): - self.LOG.error(f"无效的每月执行日: {monthly_day}") + elif recurring_interval == 'monthly': + # 每月执行 + try: + if not monthly_day: return None # 获取当前日期 - current_day = current_time.day - # 计算下一个执行日期 - if current_day < monthly_day: - # 如果当前日期小于执行日期,则在本月执行 - try: - next_time = current_time.replace(day=monthly_day) - except ValueError: - # 处理无效日期(如2月30日) - if current_time.month == 12: - next_time = current_time.replace(year=current_time.year + 1, month=1, day=1) - else: - next_time = current_time.replace(month=current_time.month + 1, day=1) - else: - # 否则在下个月执行 - if current_time.month == 12: - next_time = current_time.replace(year=current_time.year + 1, month=1, day=1) - else: - next_time = current_time.replace(month=current_time.month + 1, day=1) - - # 尝试设置到指定的日期 - try: - next_time = next_time.replace(day=monthly_day) - except ValueError: - # 如果下个月的指定日期无效,则使用下个月的最后一天 - if next_time.month == 12: - next_time = next_time.replace(year=next_time.year + 1, month=1, day=1) - else: - next_time = next_time.replace(month=next_time.month + 1, day=1) - next_time = next_time - timedelta(days=1) + current_day = now.day + current_month = now.month + current_year = now.year - next_time = next_time.replace(hour=hour, minute=minute) - else: + # 如果当前日期已经过了执行日,移到下个月 + if current_day >= monthly_day: + if current_month == 12: + next_month = 1 + next_year = current_year + 1 + else: + next_month = current_month + 1 + next_year = current_year + else: + next_month = current_month + next_year = current_year + + # 处理无效日期(如2月30日) + try: + next_time = datetime(next_year, next_month, monthly_day, hour, minute) + except ValueError: + # 如果日期无效,使用该月的最后一天 + if next_month == 12: + last_day = 31 + else: + last_day = (datetime(next_year, next_month + 1, 1) - timedelta(days=1)).day + next_time = datetime(next_year, next_month, last_day, hour, minute) + + except Exception as e: + self.LOG.error(f"处理每月执行日失败: {e}") return None + else: + return None - # 检查是否超过结束时间 - if end_time and next_time > end_time: - return None + # 检查是否超过结束时间 + if recurring_end and next_time > recurring_end: + return None + + return next_time - return next_time - return None except Exception as e: self.LOG.error(f"计算下次执行时间失败: {e}") return None