397 lines
14 KiB
Python
397 lines
14 KiB
Python
import importlib
|
||
import inspect
|
||
import logging
|
||
import os
|
||
import sys
|
||
from typing import Dict, List, Any, Optional, Type
|
||
|
||
from plugin_common.plugin_interface import PluginInterface, PluginStatus
|
||
from plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
|
||
from plugin_common.plugin_registry import PluginRegistry
|
||
from plugin_common.event_system import EventSystem, EventType
|
||
|
||
|
||
class PluginManager:
|
||
"""插件管理器,负责插件的加载、初始化、启动、停止和卸载"""
|
||
|
||
def __init__(self, plugin_dir: str = "plugins"):
|
||
"""
|
||
初始化插件管理器
|
||
|
||
Args:
|
||
plugin_dir: 插件目录
|
||
"""
|
||
self.plugin_dir = plugin_dir
|
||
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典
|
||
self.plugin_modules = {} # 插件模块字典
|
||
self.system_context = {} # 系统上下文
|
||
|
||
self.LOG = logging.getLogger(__name__)
|
||
# 确保插件目录存在
|
||
if not os.path.exists(self.plugin_dir):
|
||
os.makedirs(self.plugin_dir)
|
||
|
||
# 将插件目录添加到Python路径
|
||
if self.plugin_dir not in sys.path:
|
||
sys.path.insert(0, self.plugin_dir)
|
||
|
||
def set_system_context(self, context: Dict[str, Any]):
|
||
"""
|
||
设置系统上下文
|
||
|
||
Args:
|
||
context: 系统上下文
|
||
"""
|
||
self.system_context = context
|
||
|
||
def discover_plugins(self) -> List[str]:
|
||
"""
|
||
发现可用插件
|
||
|
||
Returns:
|
||
插件模块名称列表
|
||
"""
|
||
plugin_modules = []
|
||
|
||
# 遍历插件目录
|
||
for item in os.listdir(self.plugin_dir):
|
||
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
|
||
# 检查是否有main.py文件
|
||
if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")):
|
||
plugin_modules.append(item)
|
||
elif item.endswith(".py") and not item.startswith("__"):
|
||
# 单文件插件
|
||
plugin_modules.append(item[:-3])
|
||
self.LOG.info(f"PluginManager:发现插件模块: {plugin_modules}")
|
||
return plugin_modules
|
||
|
||
def load_all_plugins(self) -> Dict[str, PluginInterface]:
|
||
"""
|
||
加载所有插件
|
||
|
||
Returns:
|
||
插件实例字典
|
||
"""
|
||
plugin_modules = self.discover_plugins()
|
||
loaded_plugins = []
|
||
|
||
for plugin_name in plugin_modules:
|
||
try:
|
||
plugin = self.load_plugin(plugin_name)
|
||
if plugin:
|
||
loaded_plugins.append(plugin_name)
|
||
# 自动启动插件
|
||
self.start_plugin(plugin.name)
|
||
except Exception as e:
|
||
self.LOG.error(f"PluginManager:加载插件 {plugin_name} 时发生错误: {str(e)}", exc_info=True)
|
||
|
||
self.LOG.info(f"PluginManager:成功加载插件: {loaded_plugins}")
|
||
return self.plugins
|
||
|
||
def load_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
|
||
"""
|
||
加载插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
插件实例,加载失败返回None
|
||
"""
|
||
try:
|
||
# 如果插件已加载,直接返回
|
||
if plugin_name in self.plugins:
|
||
return self.plugins[plugin_name]
|
||
|
||
# 确定插件路径和模块路径
|
||
plugin_path = os.path.join(self.plugin_dir, plugin_name)
|
||
|
||
# 加载模块
|
||
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")):
|
||
# 目录插件,从main.py加载
|
||
module_path = f"plugins.{plugin_name}.main"
|
||
try:
|
||
module = importlib.import_module(module_path)
|
||
self.plugin_modules[plugin_name] = module
|
||
except ImportError as e:
|
||
self.LOG.error(f"PluginManager:导入插件模块 {module_path} 失败: {e}")
|
||
return None
|
||
else:
|
||
# 单文件插件
|
||
plugin_path = self.plugin_dir
|
||
try:
|
||
module = importlib.import_module(plugin_name)
|
||
self.plugin_modules[plugin_name] = module
|
||
except ImportError as e:
|
||
self.LOG.error(f"PluginManager:导入单文件插件 {plugin_name} 失败: {e}")
|
||
return None
|
||
|
||
# 查找插件类
|
||
plugin_class = None
|
||
for name, obj in inspect.getmembers(module):
|
||
if (inspect.isclass(obj) and
|
||
issubclass(obj, PluginInterface) and
|
||
obj != PluginInterface and
|
||
obj != MessagePluginInterface and
|
||
obj != ScheduledPluginInterface):
|
||
plugin_class = obj
|
||
break
|
||
|
||
# 如果没有找到插件类,尝试查找get_plugin函数
|
||
if plugin_class is None:
|
||
get_plugin_func = getattr(module, "get_plugin", None)
|
||
if callable(get_plugin_func):
|
||
plugin = get_plugin_func()
|
||
if isinstance(plugin, PluginInterface):
|
||
# 设置插件路径
|
||
plugin.set_plugin_path(plugin_path)
|
||
|
||
# 加载插件配置
|
||
if not plugin.load_config():
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 加载配置失败")
|
||
return None
|
||
|
||
# 初始化插件
|
||
if not plugin.initialize(self.system_context):
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 初始化失败")
|
||
return None
|
||
|
||
# 注册插件
|
||
PluginRegistry().register(plugin)
|
||
|
||
# 存储插件实例
|
||
self.plugins[plugin.name] = plugin
|
||
|
||
# 发布插件加载事件
|
||
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
|
||
|
||
return plugin
|
||
else:
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 的 get_plugin() 返回的不是有效的插件实例")
|
||
else:
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 中未找到有效的插件类或 get_plugin 函数")
|
||
return None
|
||
|
||
# 实例化插件
|
||
plugin = plugin_class()
|
||
plugin.status = PluginStatus.LOADED
|
||
|
||
# 设置插件路径
|
||
plugin.set_plugin_path(plugin_path)
|
||
|
||
# 加载插件配置
|
||
if not plugin.load_config():
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 加载配置失败")
|
||
return None
|
||
|
||
# 初始化插件
|
||
if not plugin.initialize(self.system_context):
|
||
self.LOG.error(f"PluginManager:插件 {plugin_name} 初始化失败")
|
||
return None
|
||
|
||
# 注册插件
|
||
PluginRegistry().register(plugin)
|
||
|
||
# 存储插件实例
|
||
self.plugins[plugin.name] = plugin
|
||
|
||
# 发布插件加载事件
|
||
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
|
||
|
||
return plugin
|
||
|
||
except Exception as e:
|
||
self.LOG.error(f"PluginManager:加载插件 {plugin_name} 失败: {e}", exc_info=True)
|
||
return None
|
||
|
||
def unload_plugin(self, plugin_name: str) -> bool:
|
||
"""
|
||
卸载插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
卸载是否成功
|
||
"""
|
||
if plugin_name not in self.plugins:
|
||
self.LOG.info(f"PluginManager:插件 {plugin_name} 未加载")
|
||
return False
|
||
|
||
plugin = self.plugins[plugin_name]
|
||
|
||
# 停止插件
|
||
if plugin.status == PluginStatus.RUNNING:
|
||
if not plugin.stop():
|
||
self.LOG.info(f"PluginManager:停止插件 {plugin_name} 失败")
|
||
return False
|
||
plugin.status = PluginStatus.STOPPED # 确保状态更新
|
||
|
||
# 清理插件资源
|
||
if not plugin.cleanup():
|
||
self.LOG.info(f"PluginManager:清理插件 {plugin_name} 资源失败")
|
||
return False
|
||
|
||
# 设置状态为未加载
|
||
plugin.status = PluginStatus.UNLOADED
|
||
|
||
# 注销插件
|
||
PluginRegistry().unregister(plugin_name)
|
||
|
||
# 移除插件实例
|
||
del self.plugins[plugin_name]
|
||
|
||
# 发布插件卸载事件
|
||
EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": plugin_name})
|
||
|
||
return True
|
||
|
||
def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
|
||
"""
|
||
重新加载插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
插件实例,重新加载失败返回None
|
||
"""
|
||
# 记录原插件状态
|
||
was_running = False
|
||
if plugin_name in self.plugins:
|
||
was_running = self.plugins[plugin_name].status == PluginStatus.RUNNING
|
||
|
||
# 卸载插件
|
||
if not self.unload_plugin(plugin_name):
|
||
self.LOG.info(f"卸载插件 {plugin_name} 失败")
|
||
return None
|
||
|
||
# 重新导入模块
|
||
if plugin_name in self.plugin_modules:
|
||
try:
|
||
importlib.reload(self.plugin_modules[plugin_name])
|
||
except Exception as e:
|
||
self.LOG.info(f"重新导入插件模块 {plugin_name} 失败: {e}")
|
||
return None
|
||
|
||
# 加载插件
|
||
plugin = self.load_plugin(plugin_name)
|
||
|
||
# 如果原来是运行状态,则重新启动
|
||
if plugin and was_running:
|
||
self.start_plugin(plugin.name)
|
||
|
||
return plugin
|
||
|
||
def start_plugin(self, plugin_name: str) -> bool:
|
||
"""
|
||
启动插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
启动是否成功
|
||
"""
|
||
if plugin_name not in self.plugins:
|
||
self.LOG.info(f"PluginManager:插件 {plugin_name} 未加载")
|
||
return False
|
||
|
||
plugin = self.plugins[plugin_name]
|
||
|
||
if plugin.status == PluginStatus.RUNNING:
|
||
self.LOG.info(f"PluginManager:插件 {plugin_name} 已经在运行")
|
||
return True
|
||
|
||
if plugin.start():
|
||
plugin.status = PluginStatus.RUNNING
|
||
self.LOG.info(f"PluginManager:插件 {plugin_name} 状态变更为在运行")
|
||
return True
|
||
else:
|
||
plugin.status = PluginStatus.ERROR
|
||
self.LOG.info(f"PluginManager:插件 {plugin_name} 状态变更为异常")
|
||
return False
|
||
|
||
def stop_plugin(self, plugin_name: str) -> bool:
|
||
"""
|
||
停止插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
停止是否成功
|
||
"""
|
||
if plugin_name not in self.plugins:
|
||
self.LOG.info(f"插件 {plugin_name} 未加载")
|
||
return False
|
||
|
||
plugin = self.plugins[plugin_name]
|
||
|
||
if plugin.status != PluginStatus.RUNNING:
|
||
self.LOG.info(f"插件 {plugin_name} 未在运行")
|
||
return True
|
||
|
||
if plugin.stop():
|
||
plugin.status = PluginStatus.STOPPED
|
||
self.LOG.info(f"插件 {plugin_name} 状态变更为已停止")
|
||
return True
|
||
else:
|
||
plugin.status = PluginStatus.ERROR
|
||
self.LOG.info(f"插件 {plugin_name} 状态变更为异常")
|
||
return False
|
||
|
||
def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
|
||
"""
|
||
重新加载插件
|
||
|
||
Args:
|
||
plugin_name: 插件名称
|
||
|
||
Returns:
|
||
插件实例,重新加载失败返回None
|
||
"""
|
||
# 卸载插件
|
||
if plugin_name in self.plugins:
|
||
if not self.unload_plugin(plugin_name):
|
||
self.LOG.info(f"卸载插件 {plugin_name} 失败")
|
||
return None
|
||
|
||
# 重新导入模块
|
||
if plugin_name in self.plugin_modules:
|
||
try:
|
||
importlib.reload(self.plugin_modules[plugin_name])
|
||
except Exception as e:
|
||
self.LOG.info(f"重新导入插件模块 {plugin_name} 失败: {e}")
|
||
return None
|
||
|
||
# 加载插件
|
||
return self.load_plugin(plugin_name)
|
||
|
||
def shutdown_plugins(self) -> bool:
|
||
"""
|
||
卸载所有插件
|
||
|
||
Returns:
|
||
是否全部成功卸载
|
||
"""
|
||
success = True
|
||
# 创建插件名称的副本,因为在卸载过程中会修改self.plugins字典
|
||
plugin_names = list(self.plugins.keys())
|
||
|
||
for plugin_name in plugin_names:
|
||
if not self.unload_plugin(plugin_name):
|
||
self.LOG.error(f"卸载插件 {plugin_name} 失败")
|
||
success = False
|
||
|
||
# 清空插件模块字典
|
||
self.plugin_modules.clear()
|
||
|
||
# 确保插件字典为空
|
||
if self.plugins:
|
||
self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留")
|
||
success = False
|
||
|
||
return success
|