Files
abot/plugin_common/plugin_manager.py
2025-03-19 10:22:20 +08:00

342 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib
import inspect
import logging
import os
import sys
from typing import Dict, List, Any, Optional, Type
from plugin_common.plugin_interface import PluginInterface, PluginStatus
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.event_system import EventSystem, EventType
class PluginManager:
"""插件管理器,负责插件的加载、初始化、启动、停止和卸载"""
def __init__(self, plugin_dir: str = "plugins"):
"""
初始化插件管理器
Args:
plugin_dir: 插件目录
"""
self.plugin_dir = plugin_dir
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典
self.plugin_modules = {} # 插件模块字典
self.system_context = {} # 系统上下文
self.LOG = logging.getLogger(__name__)
# 确保插件目录存在
if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir)
# 将插件目录添加到Python路径
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
Args:
context: 系统上下文
"""
self.system_context = context
def discover_plugins(self) -> List[str]:
"""
发现可用插件
Returns:
插件模块名称列表
"""
plugin_modules = []
# 遍历插件目录
for item in os.listdir(self.plugin_dir):
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
# 检查是否有__init__.py文件
if os.path.exists(os.path.join(self.plugin_dir, item, "__init__.py")):
plugin_modules.append(item)
elif item.endswith(".py") and not item.startswith("__"):
# 单文件插件
plugin_modules.append(item[:-3])
self.LOG.info(f"plugin_modules:{plugin_modules}")
return plugin_modules
def load_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
加载插件
Args:
plugin_name: 插件名称
Returns:
插件实例加载失败返回None
"""
try:
# 如果插件已加载,直接返回
if plugin_name in self.plugins:
return self.plugins[plugin_name]
# 确定插件路径
if os.path.isdir(os.path.join(self.plugin_dir, plugin_name)):
plugin_path = os.path.join(self.plugin_dir, plugin_name)
# 直接从main.py加载插件不再尝试从__init__.py加载
main_module_path = f"{plugin_name}.main"
if os.path.exists(os.path.join(plugin_path, "main.py")):
try:
module = importlib.import_module(main_module_path)
self.plugin_modules[plugin_name] = module
except ImportError as e:
self.LOG.error(f"导入插件模块 {main_module_path} 失败: {e}")
return None
else:
self.LOG.error(f"插件 {plugin_name} 缺少 main.py 文件")
return None
else:
# 单文件插件
plugin_path = self.plugin_dir
module = importlib.import_module(plugin_name)
self.plugin_modules[plugin_name] = module
# 查找插件类
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginInterface) and
obj != PluginInterface and
obj != MessagePluginInterface and
obj != ScheduledPluginInterface):
plugin_class = obj
break
# 如果插件类为空尝试查找get_plugin函数
if plugin_class is None:
get_plugin_func = getattr(module, "get_plugin", None)
if callable(get_plugin_func):
plugin = get_plugin_func()
if isinstance(plugin, PluginInterface):
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"插件 {plugin_name} 加载配置失败")
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"插件 {plugin_name} 初始化失败")
return None
# 注册插件
PluginRegistry().register(plugin)
# 存储插件实例
self.plugins[plugin.name] = plugin
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
else:
self.LOG.error(f"插件 {plugin_name} 的 get_plugin() 返回的不是有效的插件实例")
else:
self.LOG.error(f"插件 {plugin_name} 中未找到有效的插件类或 get_plugin 函数")
return None
# 实例化插件
plugin = plugin_class()
plugin.status = PluginStatus.LOADED
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"插件 {plugin_name} 加载配置失败")
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"插件 {plugin_name} 初始化失败")
return None
# 注册插件
PluginRegistry().register(plugin)
# 存储插件实例
self.plugins[plugin.name] = plugin
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
except Exception as e:
self.LOG.error(f"加载插件 {plugin_name} 失败: {e}")
return None
def load_all_plugins(self) -> Dict[str, PluginInterface]:
"""
加载所有插件
Returns:
插件实例字典
"""
plugin_modules = self.discover_plugins()
for module_name in plugin_modules:
self.load_plugin(module_name)
return self.plugins
def unload_plugin(self, plugin_name: str) -> bool:
"""
卸载插件
Args:
plugin_name: 插件名称
Returns:
卸载是否成功
"""
if plugin_name not in self.plugins:
self.LOG.info(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
# 停止插件
if plugin.status == PluginStatus.RUNNING:
if not plugin.stop():
self.LOG.info(f"停止插件 {plugin_name} 失败")
return False
# 清理插件资源
if not plugin.cleanup():
self.LOG.info(f"清理插件 {plugin_name} 资源失败")
return False
# 注销插件
PluginRegistry().unregister(plugin_name)
# 移除插件实例
del self.plugins[plugin_name]
# 发布插件卸载事件
EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": plugin_name})
return True
def start_plugin(self, plugin_name: str) -> bool:
"""
启动插件
Args:
plugin_name: 插件名称
Returns:
启动是否成功
"""
if plugin_name not in self.plugins:
self.LOG.info(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
if plugin.status == PluginStatus.RUNNING:
self.LOG.info(f"插件 {plugin_name} 已经在运行")
return True
if plugin.start():
plugin.status = PluginStatus.RUNNING
return True
else:
plugin.status = PluginStatus.ERROR
return False
def stop_plugin(self, plugin_name: str) -> bool:
"""
停止插件
Args:
plugin_name: 插件名称
Returns:
停止是否成功
"""
if plugin_name not in self.plugins:
self.LOG.info(f"插件 {plugin_name} 未加载")
return False
plugin = self.plugins[plugin_name]
if plugin.status != PluginStatus.RUNNING:
self.LOG.info(f"插件 {plugin_name} 未在运行")
return True
if plugin.stop():
plugin.status = PluginStatus.STOPPED
return True
else:
plugin.status = PluginStatus.ERROR
return False
def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
重新加载插件
Args:
plugin_name: 插件名称
Returns:
插件实例重新加载失败返回None
"""
# 卸载插件
if plugin_name in self.plugins:
if not self.unload_plugin(plugin_name):
self.LOG.info(f"卸载插件 {plugin_name} 失败")
return None
# 重新导入模块
if plugin_name in self.plugin_modules:
try:
importlib.reload(self.plugin_modules[plugin_name])
except Exception as e:
self.LOG.info(f"重新导入插件模块 {plugin_name} 失败: {e}")
return None
# 加载插件
return self.load_plugin(plugin_name)
def shutdown_plugins(self) -> bool:
"""
卸载所有插件
Returns:
是否全部成功卸载
"""
success = True
# 创建插件名称的副本因为在卸载过程中会修改self.plugins字典
plugin_names = list(self.plugins.keys())
for plugin_name in plugin_names:
if not self.unload_plugin(plugin_name):
self.LOG.error(f"卸载插件 {plugin_name} 失败")
success = False
# 清空插件模块字典
self.plugin_modules.clear()
# 确保插件字典为空
if self.plugins:
self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留")
success = False
return success