插件化项目优化,支持将代码改造为插件,支持自动加载

This commit is contained in:
liuwei
2025-03-18 13:57:39 +08:00
parent 3c757161c4
commit bcca2dab28
13 changed files with 1033 additions and 388 deletions

View File

@@ -0,0 +1,85 @@
from enum import Enum, auto
from typing import Dict, Any, Callable, List
import threading
class EventType(Enum):
"""事件类型枚举"""
SYSTEM_STARTUP = auto()
SYSTEM_SHUTDOWN = auto()
PLUGIN_LOADED = auto()
PLUGIN_UNLOADED = auto()
MESSAGE_RECEIVED = auto()
MESSAGE_PROCESSED = auto()
CUSTOM_EVENT = auto()
class EventSystem:
"""事件系统,用于插件间通信"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(EventSystem, cls).__new__(cls)
cls._instance._subscribers = {}
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self._subscribers = {}
self._initialized = True
def subscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
订阅事件
Args:
event_type: 事件类型
callback: 回调函数,接收事件数据
"""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
if callback not in self._subscribers[event_type]:
self._subscribers[event_type].append(callback)
def unsubscribe(self, event_type: EventType, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
取消订阅事件
Args:
event_type: 事件类型
callback: 回调函数
"""
if event_type in self._subscribers and callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, data: Dict[str, Any]) -> None:
"""
发布事件
Args:
event_type: 事件类型
data: 事件数据
"""
if event_type in self._subscribers:
for callback in self._subscribers[event_type]:
try:
callback(data)
except Exception as e:
print(f"事件处理错误: {e}")
def get_subscribers(self, event_type: EventType) -> List[Callable]:
"""
获取事件订阅者
Args:
event_type: 事件类型
Returns:
订阅者列表
"""
return self._subscribers.get(event_type, [])

View File

@@ -0,0 +1,47 @@
from typing import Dict, Any, Tuple, Optional, List
from plugin_common.plugin_interface import PluginInterface
class MessagePluginInterface(PluginInterface):
"""消息处理插件接口"""
@property
def command_prefix(self) -> Optional[str]:
"""命令前缀,如 '/'"""
return None
@property
def commands(self) -> List[str]:
"""支持的命令列表"""
return []
def can_process(self, message: Dict[str, Any]) -> bool:
"""
检查插件是否可以处理该消息
Args:
message: 消息字典,包含消息的各种属性
Returns:
是否可以处理
"""
# 默认实现:检查是否是命令
if self.command_prefix and self.commands:
content = message.get("content", "")
if content.startswith(self.command_prefix):
command = content[len(self.command_prefix):].split()[0]
return command in self.commands
return False
def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
处理消息
Args:
message: 消息字典,包含消息的各种属性,以及发送消息所需的对象
- wcf: WcfAPI对象可用于发送消息
- message_util: 消息工具类,提供更高级的消息处理功能
Returns:
(是否已处理, 处理结果)
"""
raise NotImplementedError("子类必须实现此方法")

View File

@@ -0,0 +1,167 @@
import os
import toml
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Any, List, Optional
class PluginStatus(Enum):
"""插件状态枚举"""
UNLOADED = 0 # 未加载
LOADED = 1 # 已加载但未启动
RUNNING = 2 # 运行中
STOPPED = 3 # 已停止
ERROR = 4 # 错误状态
class PluginInterface(ABC):
"""插件基础接口,所有插件必须实现此接口"""
@property
@abstractmethod
def name(self) -> str:
"""插件名称"""
pass
@property
@abstractmethod
def version(self) -> str:
"""插件版本"""
pass
@property
@abstractmethod
def description(self) -> str:
"""插件描述"""
pass
@property
@abstractmethod
def author(self) -> str:
"""插件作者"""
pass
@property
def dependencies(self) -> List[str]:
"""插件依赖,返回依赖的其他插件名称列表"""
return []
@property
def status(self) -> PluginStatus:
"""获取插件当前状态"""
return self._status
@status.setter
def status(self, value: PluginStatus):
"""设置插件状态"""
self._status = value
def __init__(self):
"""初始化插件"""
self._status = PluginStatus.UNLOADED
self._config = {}
self._plugin_path = ""
def load_config(self) -> bool:
"""
从插件目录下的config.toml加载配置
Returns:
加载是否成功
"""
try:
config_path = os.path.join(self._plugin_path, "config.toml")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
plugin_config = toml.load(f)
self._config.update(plugin_config)
print(f"{config_path} 加载插件配置成功")
return True
else:
print(f"插件配置文件 {config_path} 不存在,使用默认配置")
return True # 配置文件不存在也视为成功,使用默认配置
except Exception as e:
print(f"加载插件配置失败: {e}")
return False
def set_plugin_path(self, path: str) -> None:
"""
设置插件路径
Args:
path: 插件路径
"""
self._plugin_path = path
def get_plugin_path(self) -> str:
"""
获取插件路径
Returns:
插件路径
"""
return self._plugin_path
@abstractmethod
def initialize(self, context: Dict[str, Any]) -> bool:
"""
初始化插件
Args:
context: 插件上下文,包含系统环境和配置信息
Returns:
初始化是否成功
"""
pass
@abstractmethod
def start(self) -> bool:
"""
启动插件
Returns:
启动是否成功
"""
pass
@abstractmethod
def stop(self) -> bool:
"""
停止插件
Returns:
停止是否成功
"""
pass
def configure(self, config: Dict[str, Any]) -> bool:
"""
配置插件
Args:
config: 插件配置
Returns:
配置是否成功
"""
self._config.update(config)
return True
def get_config(self) -> Dict[str, Any]:
"""
获取插件配置
Returns:
插件配置
"""
return self._config
def cleanup(self) -> bool:
"""
清理插件资源,在卸载前调用
Returns:
清理是否成功
"""
return True

View File

@@ -0,0 +1,284 @@
import importlib
import inspect
import os
import sys
from typing import Dict, List, Any, Optional, Type
from plugin_common.plugin_interface import PluginInterface, PluginStatus
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.event_system import EventSystem, EventType
class PluginManager:
"""插件管理器,负责插件的加载、初始化、启动、停止和卸载"""
def __init__(self, plugin_dir: str = "plugins"):
"""
初始化插件管理器
Args:
plugin_dir: 插件目录
"""
self.plugin_dir = plugin_dir
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典
self.plugin_modules = {} # 插件模块字典
self.system_context = {} # 系统上下文
# 确保插件目录存在
if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir)
# 将插件目录添加到Python路径
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
Args:
context: 系统上下文
"""
self.system_context = context
def discover_plugins(self) -> List[str]:
"""
发现可用插件
Returns:
插件模块名称列表
"""
plugin_modules = []
# 遍历插件目录
for item in os.listdir(self.plugin_dir):
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
# 检查是否有__init__.py文件
if os.path.exists(os.path.join(self.plugin_dir, item, "__init__.py")):
plugin_modules.append(item)
elif item.endswith(".py") and not item.startswith("__"):
# 单文件插件
plugin_modules.append(item[:-3])
return plugin_modules
def load_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
加载插件
Args:
plugin_name: 插件名称
Returns:
插件实例加载失败返回None
"""
try:
# 如果插件已加载,直接返回
if plugin_name in self.plugins:
return self.plugins[plugin_name]
# 确定插件路径
if os.path.isdir(os.path.join(self.plugin_dir, plugin_name)):
plugin_path = os.path.join(self.plugin_dir, plugin_name)
# 优先从main.py加载插件
main_module_path = f"{plugin_name}.main"
if os.path.exists(os.path.join(plugin_path, "main.py")):
try:
module = importlib.import_module(main_module_path)
self.plugin_modules[plugin_name] = module
except ImportError:
# 如果main.py导入失败尝试从__init__.py加载
module = importlib.import_module(plugin_name)
self.plugin_modules[plugin_name] = module
else:
# 如果没有main.py从__init__.py加载
module = importlib.import_module(plugin_name)
self.plugin_modules[plugin_name] = module
else:
# 单文件插件
plugin_path = self.plugin_dir
module = importlib.import_module(plugin_name)
self.plugin_modules[plugin_name] = module
# 查找插件类
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginInterface) and
obj != PluginInterface and
obj != MessagePluginInterface and
obj != ScheduledPluginInterface):
plugin_class = obj
break
if plugin_class is None:
print(f"插件 {plugin_name} 中未找到有效的插件类")
return None
# 实例化插件
plugin = plugin_class()
plugin.status = PluginStatus.LOADED
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
print(f"插件 {plugin_name} 加载配置失败")
return None
# 初始化插件
if not plugin.initialize(self.system_context):
print(f"插件 {plugin_name} 初始化失败")
return None
# 注册插件
PluginRegistry().register(plugin)
# 存储插件实例
self.plugins[plugin.name] = plugin
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
except Exception as e:
print(f"加载插件 {plugin_name} 失败: {e}")
return None
def load_all_plugins(self) -> Dict[str, PluginInterface]:
"""
加载所有插件
Returns:
插件实例字典
"""
plugin_modules = self.discover_plugins()
for module_name in plugin_modules:
self.load_plugin(module_name)
return self.plugins
def unload_plugin(self, plugin_name: str) -> bool:
"""
卸载插件
Args:
plugin_name: 插件名称
Returns:
卸载是否成功
"""
if plugin_name not in self.plugins:
print(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
# 停止插件
if plugin.status == PluginStatus.RUNNING:
if not plugin.stop():
print(f"停止插件 {plugin_name} 失败")
return False
# 清理插件资源
if not plugin.cleanup():
print(f"清理插件 {plugin_name} 资源失败")
return False
# 注销插件
PluginRegistry().unregister(plugin_name)
# 移除插件实例
del self.plugins[plugin_name]
# 发布插件卸载事件
EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": plugin_name})
return True
def start_plugin(self, plugin_name: str) -> bool:
"""
启动插件
Args:
plugin_name: 插件名称
Returns:
启动是否成功
"""
if plugin_name not in self.plugins:
print(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
if plugin.status == PluginStatus.RUNNING:
print(f"插件 {plugin_name} 已经在运行")
return True
if plugin.start():
plugin.status = PluginStatus.RUNNING
return True
else:
plugin.status = PluginStatus.ERROR
return False
def stop_plugin(self, plugin_name: str) -> bool:
"""
停止插件
Args:
plugin_name: 插件名称
Returns:
停止是否成功
"""
if plugin_name not in self.plugins:
print(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
if plugin.status != PluginStatus.RUNNING:
print(f"插件 {plugin_name} 未在运行")
return True
if plugin.stop():
plugin.status = PluginStatus.STOPPED
return True
else:
plugin.status = PluginStatus.ERROR
return False
def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
重新加载插件
Args:
plugin_name: 插件名称
Returns:
插件实例重新加载失败返回None
"""
# 卸载插件
if plugin_name in self.plugins:
if not self.unload_plugin(plugin_name):
print(f"卸载插件 {plugin_name} 失败")
return None
# 重新导入模块
if plugin_name in self.plugin_modules:
try:
importlib.reload(self.plugin_modules[plugin_name])
except Exception as e:
print(f"重新导入插件模块 {plugin_name} 失败: {e}")
return None
# 加载插件
return self.load_plugin(plugin_name)

View File

@@ -0,0 +1,94 @@
from typing import Dict, List, Optional, Type
from plugin_common.plugin_interface import PluginInterface, PluginStatus
class PluginRegistry:
"""插件注册表,维护已加载插件的信息和状态"""
_instance = None
def __new__(cls):
"""单例模式"""
if cls._instance is None:
cls._instance = super(PluginRegistry, cls).__new__(cls)
cls._instance._plugins = {}
return cls._instance
def register(self, plugin: PluginInterface) -> bool:
"""
注册插件
Args:
plugin: 插件实例
Returns:
注册是否成功
"""
if plugin.name in self._plugins:
print(f"插件 {plugin.name} 已存在")
return False
self._plugins[plugin.name] = plugin
return True
def unregister(self, plugin_name: str) -> bool:
"""
注销插件
Args:
plugin_name: 插件名称
Returns:
注销是否成功
"""
if plugin_name not in self._plugins:
print(f"插件 {plugin_name} 不存在")
return False
del self._plugins[plugin_name]
return True
def get_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
获取插件实例
Args:
plugin_name: 插件名称
Returns:
插件实例不存在返回None
"""
return self._plugins.get(plugin_name)
def get_all_plugins(self) -> Dict[str, PluginInterface]:
"""
获取所有插件
Returns:
插件字典,键为插件名称,值为插件实例
"""
return self._plugins.copy()
def get_plugins_by_status(self, status: PluginStatus) -> List[PluginInterface]:
"""
获取指定状态的插件
Args:
status: 插件状态
Returns:
插件列表
"""
return [p for p in self._plugins.values() if p.status == status]
def get_plugins_by_type(self, plugin_type: Type) -> List[PluginInterface]:
"""
获取指定类型的插件
Args:
plugin_type: 插件类型
Returns:
插件列表
"""
return [p for p in self._plugins.values() if isinstance(p, plugin_type)]

View File

@@ -0,0 +1,55 @@
from abc import abstractmethod
from typing import Dict, Any, List, Tuple, Callable
from plugin_common.plugin_interface import PluginInterface, PluginStatus
class ScheduledPluginInterface(PluginInterface):
"""定时任务插件接口,用于执行定时任务"""
def __init__(self):
super().__init__()
self._jobs = [] # 存储注册的定时任务
@abstractmethod
def register_jobs(self) -> List[Tuple[str, Callable, Dict[str, Any]]]:
"""
注册定时任务
Returns:
任务列表,每个任务是一个元组 (job_id, job_func, job_params)
job_id: 任务ID
job_func: 任务函数
job_params: 任务参数,如{"trigger": "interval", "seconds": 60}
"""
pass
def start(self) -> bool:
"""
启动插件,注册定时任务
Returns:
启动是否成功
"""
try:
self._jobs = self.register_jobs()
# 实际注册任务的逻辑将由插件管理器实现
return True
except Exception as e:
print(f"启动定时任务插件 {self.name} 失败: {e}")
return False
def stop(self) -> bool:
"""
停止插件,取消定时任务
Returns:
停止是否成功
"""
try:
# 实际取消任务的逻辑将由插件管理器实现
self._jobs = []
return True
except Exception as e:
print(f"停止定时任务插件 {self.name} 失败: {e}")
return False