调试定时装饰器,优化代码

This commit is contained in:
liuwei
2025-03-20 15:22:12 +08:00
parent aec1ccb0b8
commit 56e63c049d

View File

@@ -49,29 +49,54 @@ class TaskRegistry:
def register_tasks_for_instance(self, instance):
"""为指定实例注册任务"""
class_name = instance.__class__.__name__
registered_count = 0
for task in self.tasks:
# 检查任务是否属于这个类
if task["class_name"] == class_name and hasattr(instance, task["method_name"]):
method = getattr(instance, task["method_name"])
# 注册任务
if hasattr(instance, 'add_job'):
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, cron: {task['cron']}")
instance.add_job(method, task["cron"], task["name"])
elif hasattr(instance, 'onEveryTime'):
# 将cron表达式转换为时间字符串 (简化处理,仅支持每天固定时间)
cron_parts = task["cron"].split()
if len(cron_parts) >= 2:
minute, hour = cron_parts[0], cron_parts[1]
# 移除可能的前导0
if minute.startswith('0') and len(minute) > 1:
minute = minute[1:]
if hour.startswith('0') and len(hour) > 1:
hour = hour[1:]
time_str = f"{hour}:{minute}"
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, 时间: {time_str}")
instance.onEveryTime(time_str, method)
# 使用schedule库注册任务
self._register_with_schedule(instance, method, task)
registered_count += 1
LOG.info(f"{class_name} 注册了 {registered_count} 个定时任务")
return registered_count
def _register_with_schedule(self, instance, method, task):
"""使用schedule库注册任务"""
import schedule
cron = task["cron"]
name = task["name"]
LOG.info(f"使用schedule注册任务: {name}, cron: {cron}")
# 解析cron表达式
parts = cron.split()
if len(parts) < 5:
LOG.error(f"无效的cron表达式: {cron}")
return
minute, hour, day, month, weekday = parts[:5]
# 处理简单的情况
if minute == "*/1" and hour == "*" and day == "*" and month == "*" and weekday == "*":
# 每分钟执行
LOG.info(f"设置每分钟执行任务: {name}")
schedule.every(1).minutes.do(method)
elif minute.isdigit() and hour.isdigit() and day == "*" and month == "*":
# 每天固定时间执行
time_str = f"{hour.zfill(2)}:{minute.zfill(2)}"
LOG.info(f"设置每天 {time_str} 执行任务: {name}")
schedule.every().day.at(time_str).do(method)
else:
LOG.warning(f"不支持的cron表达式: {cron}将使用onEveryTime方法")
# 尝试使用onEveryTime方法
if hasattr(instance, 'onEveryTime'):
if minute.isdigit() and hour.isdigit():
time_str = f"{hour.zfill(2)}:{minute.zfill(2)}"
LOG.info(f"使用onEveryTime注册任务: {name}, 时间: {time_str}")
instance.onEveryTime(time_str, method)
def scan_for_instances(self):
"""扫描已加载的模块,查找所有已创建的类实例"""