from enum import Enum, auto from typing import Dict, Any, Callable, List import threading class EventType(Enum): """事件类型枚举""" SYSTEM_STARTUP = auto() SYSTEM_SHUTDOWN = auto() PLUGIN_LOADED = auto() PLUGIN_UNLOADED = auto() MESSAGE_RECEIVED = auto() MESSAGE_PROCESSED = auto() CUSTOM_EVENT = auto() class EventSystem: """事件系统,用于插件间通信""" _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super(EventSystem, cls).__new__(cls) cls._instance._subscribers = {} cls._instance._initialized = False return cls._instance def __init__(self): if not self._initialized: self._subscribers = {} self._initialized = True def subscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None: """ 订阅事件 Args: event_type: 事件类型 callback: 回调函数,接收事件数据 """ if event_type not in self._subscribers: self._subscribers[event_type] = [] if callback not in self._subscribers[event_type]: self._subscribers[event_type].append(callback) def unsubscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None: """ 取消订阅事件 Args: event_type: 事件类型 callback: 回调函数 """ if event_type in self._subscribers and callback in self._subscribers[event_type]: self._subscribers[event_type].remove(callback) def publish(self, event_type: EventType, data: Dict[str, Any]) -> None: """ 发布事件 Args: event_type: 事件类型 data: 事件数据 """ if event_type in self._subscribers: for callback in self._subscribers[event_type]: try: callback(data) except Exception as e: print(f"事件处理错误: {e}") def get_subscribers(self, event_type: EventType) -> List[Callable]: """ 获取事件订阅者 Args: event_type: 事件类型 Returns: 订阅者列表 """ return self._subscribers.get(event_type, [])