Files
abot/job_decorators.py
2025-03-20 15:17:33 +08:00

171 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import functools
import inspect
import logging
import weakref
import sys
from typing import Callable, Optional, Dict, List, Any
LOG = logging.getLogger("JobDecorator")
# 任务注册表类,管理所有任务和实例
class TaskRegistry:
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = TaskRegistry()
return cls._instance
def __init__(self):
if TaskRegistry._instance is not None:
raise RuntimeError("请使用 TaskRegistry.get_instance() 获取实例")
self.tasks = [] # 存储所有被装饰的任务
self.instances = [] # 存储所有实例的弱引用
def register_task(self, task_info):
"""注册任务信息"""
self.tasks.append(task_info)
LOG.info(f"已注册任务: {task_info['name']}, cron: {task_info['cron']}")
def register_instance(self, instance):
"""注册实例到实例列表"""
# 检查实例是否已注册
for ref in self.instances:
if ref() is instance:
return
# 添加实例的弱引用
self.instances.append(weakref.ref(instance))
LOG.info(f"已注册实例: {instance.__class__.__name__}")
# 清理已失效的弱引用
self.instances = [ref for ref in self.instances if ref() is not None]
# 尝试为这个实例注册任务
self.register_tasks_for_instance(instance)
def register_tasks_for_instance(self, instance):
"""为指定实例注册任务"""
class_name = instance.__class__.__name__
for task in self.tasks:
# 检查任务是否属于这个类
if task["class_name"] == class_name and hasattr(instance, task["method_name"]):
method = getattr(instance, task["method_name"])
# 注册任务
if hasattr(instance, 'add_job'):
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, cron: {task['cron']}")
instance.add_job(method, task["cron"], task["name"])
elif hasattr(instance, 'onEveryTime'):
# 将cron表达式转换为时间字符串 (简化处理,仅支持每天固定时间)
cron_parts = task["cron"].split()
if len(cron_parts) >= 2:
minute, hour = cron_parts[0], cron_parts[1]
# 移除可能的前导0
if minute.startswith('0') and len(minute) > 1:
minute = minute[1:]
if hour.startswith('0') and len(hour) > 1:
hour = hour[1:]
time_str = f"{hour}:{minute}"
LOG.info(f"{class_name} 添加定时任务: {task['method_name']}, 时间: {time_str}")
instance.onEveryTime(time_str, method)
def scan_for_instances(self):
"""扫描已加载的模块,查找所有已创建的类实例"""
LOG.info("扫描已加载的模块,查找类实例...")
# 获取所有已加载的模块
for module_name, module in sys.modules.items():
# 跳过内置模块和标准库模块
if module_name.startswith('_') or not hasattr(module, '__file__') or module.__file__ is None:
continue
# 查找模块中的所有全局变量
for var_name in dir(module):
try:
var = getattr(module, var_name)
# 检查是否是类实例
if isinstance(var, object) and not isinstance(var, type) and hasattr(var, '__class__'):
# 检查类是否有被装饰的方法
class_name = var.__class__.__name__
has_decorated_method = False
for task in self.tasks:
if task["class_name"] == class_name and hasattr(var, task["method_name"]):
has_decorated_method = True
break
if has_decorated_method:
LOG.info(f"在模块 {module_name} 中找到实例: {class_name}")
self.register_instance(var)
except Exception as e:
# 忽略获取属性时的错误
pass
def initialize(self):
"""初始化任务系统,扫描所有已创建的实例并注册任务"""
LOG.info("初始化任务系统")
# 扫描已加载的模块,查找所有已创建的类实例
self.scan_for_instances()
# 清理已失效的弱引用
self.instances = [ref for ref in self.instances if ref() is not None]
# 为所有实例注册任务
for ref in self.instances:
instance = ref()
if instance:
self.register_tasks_for_instance(instance)
# 输出已注册的任务信息
LOG.info(f"已注册 {len(self.tasks)} 个任务")
for task in self.tasks:
LOG.info(f"任务: {task['name']}, 方法: {task['method_name']}, cron: {task['cron']}")
# 获取任务注册表实例
registry = TaskRegistry.get_instance()
def scheduled_job(cron: str, name: Optional[str] = None, enabled: bool = True):
"""
定时任务装饰器,用于标记需要定时执行的方法
:param cron: cron表达式例如 "0 0 * * *" 表示每天0点执行一次
:param name: 任务名称,默认使用方法名
:param enabled: 是否启用该任务默认为True
"""
def decorator(func):
task_name = name or func.__name__
# 记录任务信息
task_info = {
"func": func,
"cron": cron,
"name": task_name,
"enabled": enabled,
"method_name": func.__name__,
"class_name": func.__qualname__.split('.')[0] if '.' in func.__qualname__ else None
}
registry.register_task(task_info)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 如果是实例方法的第一次调用,注册实例
if args and not isinstance(args[0], type) and hasattr(args[0], '__class__'):
registry.register_instance(args[0])
return func(*args, **kwargs)
return wrapper
return decorator
def initialize_job_system():
"""初始化任务系统,扫描所有已创建的实例并注册任务"""
registry.initialize()