from abc import abstractmethod from typing import Dict, Any, List, Tuple, Callable from base.plugin_common.plugin_interface import PluginInterface class ScheduledPluginInterface(PluginInterface): """定时任务插件接口,用于执行定时任务""" def __init__(self): super().__init__() self._jobs = [] # 存储注册的定时任务 @abstractmethod def register_jobs(self) -> List[Tuple[str, Callable, Dict[str, Any]]]: """ 注册定时任务 Returns: 任务列表,每个任务是一个元组 (job_id, job_func, job_params) job_id: 任务ID job_func: 任务函数 job_params: 任务参数,如{"trigger": "interval", "seconds": 60} """ pass def start(self) -> bool: """ 启动插件,注册定时任务 Returns: 启动是否成功 """ try: self._jobs = self.register_jobs() # 实际注册任务的逻辑将由插件管理器实现 return True except Exception as e: print(f"启动定时任务插件 {self.name} 失败: {e}") return False def stop(self) -> bool: """ 停止插件,取消定时任务 Returns: 停止是否成功 """ try: # 实际取消任务的逻辑将由插件管理器实现 self._jobs = [] return True except Exception as e: print(f"停止定时任务插件 {self.name} 失败: {e}") return False