Files
ProxyAuto/services/scheduler.py

125 lines
3.2 KiB
Python

"""后台定时任务调度器 - 支持每台机器独立调度"""
from __future__ import annotations
import logging
import threading
from datetime import datetime, timezone
from typing import Any
from zoneinfo import ZoneInfo
from apscheduler.schedulers.background import BackgroundScheduler
logger = logging.getLogger(__name__)
_scheduler: BackgroundScheduler | None = None
_lock = threading.Lock()
# 使用上海时区
SHANGHAI_TZ = ZoneInfo("Asia/Shanghai")
def _get_job_id(machine_id: int) -> str:
return f"ip_change_machine_{machine_id}"
def get_scheduler() -> BackgroundScheduler:
global _scheduler
with _lock:
if _scheduler is None:
_scheduler = BackgroundScheduler(timezone=SHANGHAI_TZ)
_scheduler.start()
return _scheduler
def get_scheduler_status() -> dict[str, Any]:
"""获取整体调度器状态"""
scheduler = get_scheduler()
jobs = scheduler.get_jobs()
return {
"running": len(jobs) > 0,
"job_count": len(jobs),
}
def get_machine_scheduler_status(machine_id: int) -> dict[str, Any]:
"""获取指定机器的调度状态"""
scheduler = get_scheduler()
job_id = _get_job_id(machine_id)
job = scheduler.get_job(job_id)
next_run_time = None
if job and job.next_run_time:
# 确保转换为上海时区
next_run_time = job.next_run_time.astimezone(SHANGHAI_TZ).isoformat()
return {
"running": bool(job),
"next_run_time": next_run_time,
"job_id": job_id,
}
def _job_func(machine_id: int) -> None:
"""定时任务执行函数"""
from services.ip_change import run_ip_change_for_machine
run_ip_change_for_machine(machine_id)
def start_machine_auto(machine_id: int, interval_seconds: int) -> None:
"""启动指定机器的自动任务"""
scheduler = get_scheduler()
job_id = _get_job_id(machine_id)
interval = max(10, interval_seconds)
scheduler.add_job(
_job_func,
"interval",
args=[machine_id],
seconds=interval,
id=job_id,
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=30,
)
logger.info("Machine %d auto job scheduled: every %ss", machine_id, interval)
def stop_machine_auto(machine_id: int) -> None:
"""停止指定机器的自动任务"""
scheduler = get_scheduler()
job_id = _get_job_id(machine_id)
job = scheduler.get_job(job_id)
if job:
scheduler.remove_job(job_id)
logger.info("Machine %d auto job stopped", machine_id)
def update_all_schedulers() -> None:
"""根据数据库配置更新所有机器的调度器状态"""
from database import get_session, ProxyMachine
session = get_session()
try:
machines = session.query(ProxyMachine).filter_by(auto_enabled=True).all()
for machine in machines:
start_machine_auto(machine.id, machine.change_interval_seconds)
finally:
session.close()
# 兼容旧接口
def start_auto(interval_seconds: int) -> None:
"""兼容旧接口"""
pass
def stop_auto() -> None:
"""兼容旧接口"""
pass
def update_scheduler_from_config() -> None:
"""兼容旧接口"""
update_all_schedulers()