Files
abot/base/plugin_common/scheduled_plugin_interface.py

55 lines
1.6 KiB
Python

from abc import abstractmethod
from typing import Dict, Any, List, Tuple, Callable
from base.plugin_common.plugin_interface import PluginInterface
class ScheduledPluginInterface(PluginInterface):
"""定时任务插件接口,用于执行定时任务"""
def __init__(self):
super().__init__()
self._jobs = [] # 存储注册的定时任务
@abstractmethod
def register_jobs(self) -> List[Tuple[str, Callable, Dict[str, Any]]]:
"""
注册定时任务
Returns:
任务列表,每个任务是一个元组 (job_id, job_func, job_params)
job_id: 任务ID
job_func: 任务函数
job_params: 任务参数,如{"trigger": "interval", "seconds": 60}
"""
pass
def start(self) -> bool:
"""
启动插件,注册定时任务
Returns:
启动是否成功
"""
try:
self._jobs = self.register_jobs()
# 实际注册任务的逻辑将由插件管理器实现
return True
except Exception as e:
print(f"启动定时任务插件 {self.name} 失败: {e}")
return False
def stop(self) -> bool:
"""
停止插件,取消定时任务
Returns:
停止是否成功
"""
try:
# 实际取消任务的逻辑将由插件管理器实现
self._jobs = []
return True
except Exception as e:
print(f"停止定时任务插件 {self.name} 失败: {e}")
return False