55 lines
1.6 KiB
Python
55 lines
1.6 KiB
Python
from abc import abstractmethod
|
|
from typing import Dict, Any, List, Tuple, Callable
|
|
|
|
from base.plugin_common.plugin_interface import PluginInterface
|
|
|
|
|
|
class ScheduledPluginInterface(PluginInterface):
|
|
"""定时任务插件接口,用于执行定时任务"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._jobs = [] # 存储注册的定时任务
|
|
|
|
@abstractmethod
|
|
def register_jobs(self) -> List[Tuple[str, Callable, Dict[str, Any]]]:
|
|
"""
|
|
注册定时任务
|
|
|
|
Returns:
|
|
任务列表,每个任务是一个元组 (job_id, job_func, job_params)
|
|
job_id: 任务ID
|
|
job_func: 任务函数
|
|
job_params: 任务参数,如{"trigger": "interval", "seconds": 60}
|
|
"""
|
|
pass
|
|
|
|
def start(self) -> bool:
|
|
"""
|
|
启动插件,注册定时任务
|
|
|
|
Returns:
|
|
启动是否成功
|
|
"""
|
|
try:
|
|
self._jobs = self.register_jobs()
|
|
# 实际注册任务的逻辑将由插件管理器实现
|
|
return True
|
|
except Exception as e:
|
|
print(f"启动定时任务插件 {self.name} 失败: {e}")
|
|
return False
|
|
|
|
def stop(self) -> bool:
|
|
"""
|
|
停止插件,取消定时任务
|
|
|
|
Returns:
|
|
停止是否成功
|
|
"""
|
|
try:
|
|
# 实际取消任务的逻辑将由插件管理器实现
|
|
self._jobs = []
|
|
return True
|
|
except Exception as e:
|
|
print(f"停止定时任务插件 {self.name} 失败: {e}")
|
|
return False |