80 lines
2.5 KiB
Python
80 lines
2.5 KiB
Python
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from typing import Callable, Awaitable, List, Dict
|
||
|
||
|
||
class AsyncJob:
|
||
def __init__(self):
|
||
self.tasks: List[Callable[[], Awaitable]] = []
|
||
|
||
def every_seconds(self, seconds: int):
|
||
def decorator(func: Callable):
|
||
async def wrapper():
|
||
while True:
|
||
await func()
|
||
await asyncio.sleep(seconds)
|
||
|
||
self.tasks.append(wrapper)
|
||
return func
|
||
|
||
return decorator
|
||
|
||
def every_minutes(self, minutes: int):
|
||
return self.every_seconds(minutes * 60)
|
||
|
||
def every_hours(self, hours: int):
|
||
return self.every_seconds(hours * 3600)
|
||
|
||
def at_times(self, time_list: List[str]):
|
||
def decorator(func: Callable):
|
||
async def wrapper():
|
||
while True:
|
||
now = datetime.now()
|
||
for t in time_list:
|
||
target = datetime.strptime(t, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
|
||
if target < now:
|
||
target += timedelta(days=1)
|
||
wait_seconds = (target - now).total_seconds()
|
||
await asyncio.sleep(wait_seconds)
|
||
await func()
|
||
|
||
self.tasks.append(wrapper)
|
||
return func
|
||
|
||
return decorator
|
||
|
||
def every_weekday_time(self, weekday: int, time_str: str):
|
||
"""
|
||
每周 weekday(0=周一) 的 time_str(如10:00)时间执行
|
||
"""
|
||
|
||
def decorator(func: Callable):
|
||
async def wrapper():
|
||
while True:
|
||
now = datetime.now()
|
||
target_time = datetime.strptime(time_str, "%H:%M").time()
|
||
|
||
# 构造下一个执行时间
|
||
days_ahead = (weekday - now.weekday() + 7) % 7
|
||
target_date = now.date() + timedelta(days=days_ahead)
|
||
target_dt = datetime.combine(target_date, target_time)
|
||
|
||
if target_dt <= now:
|
||
target_dt += timedelta(days=7)
|
||
|
||
sleep_secs = (target_dt - now).total_seconds()
|
||
await asyncio.sleep(sleep_secs)
|
||
await func()
|
||
|
||
self.tasks.append(wrapper)
|
||
return func
|
||
|
||
return decorator
|
||
|
||
async def run_all(self):
|
||
await asyncio.gather(*(task() for task in self.tasks))
|
||
|
||
|
||
# 全局唯一 job 管理器
|
||
async_job = AsyncJob()
|