import asyncio from datetime import datetime, timedelta from typing import Callable, Awaitable, List, Dict class AsyncJob: def __init__(self): self.tasks: List[Callable[[], Awaitable]] = [] def every_seconds(self, seconds: int): def decorator(func: Callable): async def wrapper(): while True: await func() await asyncio.sleep(seconds) self.tasks.append(wrapper) return func return decorator def every_minutes(self, minutes: int): return self.every_seconds(minutes * 60) def every_hours(self, hours: int): return self.every_seconds(hours * 3600) def at_times(self, time_list: List[str]): def decorator(func: Callable): async def wrapper(): while True: now = datetime.now() for t in time_list: target = datetime.strptime(t, "%H:%M").replace(year=now.year, month=now.month, day=now.day) if target < now: target += timedelta(days=1) wait_seconds = (target - now).total_seconds() await asyncio.sleep(wait_seconds) await func() self.tasks.append(wrapper) return func return decorator def every_weekday_time(self, weekday: int, time_str: str): """ 每周 weekday(0=周一) 的 time_str(如10:00)时间执行 """ def decorator(func: Callable): async def wrapper(): while True: now = datetime.now() target_time = datetime.strptime(time_str, "%H:%M").time() # 构造下一个执行时间 days_ahead = (weekday - now.weekday() + 7) % 7 target_date = now.date() + timedelta(days=days_ahead) target_dt = datetime.combine(target_date, target_time) if target_dt <= now: target_dt += timedelta(days=7) sleep_secs = (target_dt - now).total_seconds() await asyncio.sleep(sleep_secs) await func() self.tasks.append(wrapper) return func return decorator async def run_all(self): await asyncio.gather(*(task() for task in self.tasks)) # 全局唯一 job 管理器 async_job = AsyncJob()