Files
abot/plugin_common/plugin_manager.py
2025-03-20 11:06:42 +08:00

442 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib
import inspect
import logging
import os
import sys
from typing import Dict, List, Any, Optional, Type, Tuple
from plugin_common.plugin_interface import PluginInterface, PluginStatus
from plugin_common.message_plugin_interface import MessagePluginInterface
from plugin_common.scheduled_plugin_interface import ScheduledPluginInterface
from plugin_common.plugin_registry import PluginRegistry
from plugin_common.event_system import EventSystem, EventType
class PluginManager:
"""插件管理器,负责插件的加载、初始化、启动、停止和卸载"""
def __init__(self, plugin_dir: str = "plugins"):
"""
初始化插件管理器
Args:
plugin_dir: 插件目录
"""
self.plugin_dir = plugin_dir
self.plugins: Dict[str, PluginInterface] = {} # 插件实例字典
self.plugin_modules = {} # 插件模块字典
self.module_to_plugin = {} # 模块名到插件名的映射
self.system_context = {} # 系统上下文
self.LOG = logging.getLogger(__name__)
# 确保插件目录存在
if not os.path.exists(self.plugin_dir):
os.makedirs(self.plugin_dir)
# 将插件目录添加到Python路径
if self.plugin_dir not in sys.path:
sys.path.insert(0, self.plugin_dir)
def set_system_context(self, context: Dict[str, Any]):
"""
设置系统上下文
Args:
context: 系统上下文
"""
self.system_context = context
def discover_plugins(self) -> List[str]:
"""
发现可用插件
Returns:
插件模块名称列表
"""
plugin_modules = []
# 遍历插件目录
for item in os.listdir(self.plugin_dir):
if os.path.isdir(os.path.join(self.plugin_dir, item)) and not item.startswith("__"):
# 检查是否有main.py文件
if os.path.exists(os.path.join(self.plugin_dir, item, "main.py")):
plugin_modules.append(item)
elif item.endswith(".py") and not item.startswith("__"):
# 单文件插件
plugin_modules.append(item[:-3])
self.LOG.info(f"PluginManager发现插件模块: {plugin_modules}")
return plugin_modules
def load_all_plugins(self) -> Dict[str, PluginInterface]:
"""
加载所有插件
Returns:
插件实例字典
"""
plugin_modules = self.discover_plugins()
loaded_plugins = []
for plugin_name in plugin_modules:
try:
plugin = self.load_plugin(plugin_name)
if plugin:
loaded_plugins.append(plugin_name)
# 自动启动插件
self.start_plugin(plugin.name)
except Exception as e:
self.LOG.error(f"PluginManager加载插件 {plugin_name} 时发生错误: {str(e)}", exc_info=True)
self.LOG.info(f"PluginManager成功加载插件: {loaded_plugins}")
self.LOG.info(f"PluginManagerself.plugins: {self.plugins}")
self.LOG.info(f"PluginManagerself.module_to_plugin: {self.module_to_plugin}")
return self.plugins
def load_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
加载插件
Args:
plugin_name: 插件名称(模块名)
Returns:
插件实例加载失败返回None
"""
try:
# 检查是否已有同名模块的插件加载
for name, plugin in self.plugins.items():
module_name = plugin.__class__.__module__.split('.')[-2]
if module_name == plugin_name:
self.LOG.info(f"PluginManager插件模块 {plugin_name} 已加载为 {name}")
return plugin
# 如果插件已加载,直接返回
if plugin_name in self.plugins:
return self.plugins[plugin_name]
# 确定插件路径和模块路径
plugin_path = os.path.join(self.plugin_dir, plugin_name)
# 加载模块
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, "main.py")):
# 目录插件从main.py加载
module_path = f"plugins.{plugin_name}.main"
try:
module = importlib.import_module(module_path)
self.plugin_modules[plugin_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入插件模块 {module_path} 失败: {e}")
return None
else:
# 单文件插件
plugin_path = self.plugin_dir
try:
module = importlib.import_module(plugin_name)
self.plugin_modules[plugin_name] = module
except ImportError as e:
self.LOG.error(f"PluginManager导入单文件插件 {plugin_name} 失败: {e}")
return None
# 查找插件类
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, PluginInterface) and
obj != PluginInterface and
obj != MessagePluginInterface and
obj != ScheduledPluginInterface):
plugin_class = obj
break
# 如果没有找到插件类尝试查找get_plugin函数
if plugin_class is None:
get_plugin_func = getattr(module, "get_plugin", None)
if callable(get_plugin_func):
plugin = get_plugin_func()
if isinstance(plugin, PluginInterface):
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件 {plugin_name} 加载配置失败")
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件 {plugin_name} 初始化失败")
return None
# 注册插件
PluginRegistry().register(plugin)
# 存储插件实例
self.plugins[plugin.name] = plugin
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
else:
self.LOG.error(f"PluginManager插件 {plugin_name} 的 get_plugin() 返回的不是有效的插件实例")
else:
self.LOG.error(f"PluginManager插件 {plugin_name} 中未找到有效的插件类或 get_plugin 函数")
return None
# 实例化插件
plugin = plugin_class()
plugin.status = PluginStatus.LOADED
# 设置插件路径
plugin.set_plugin_path(plugin_path)
# 加载插件配置
if not plugin.load_config():
self.LOG.error(f"PluginManager插件 {plugin_name} 加载配置失败")
return None
# 初始化插件
if not plugin.initialize(self.system_context):
self.LOG.error(f"PluginManager插件 {plugin_name} 初始化失败")
return None
# 注册插件
PluginRegistry().register(plugin)
# 在 load_plugin 方法中,修改存储插件实例的部分
# 存储插件实例
self.plugins[plugin.name] = plugin
# 添加模块名到插件名的映射
try:
module_name = plugin.__class__.__module__.split('.')[-2]
self.module_to_plugin[module_name] = plugin.name
except (IndexError, AttributeError):
self.LOG.warning(f"无法为插件 {plugin.name} 获取有效的模块名")
# 发布插件加载事件
EventSystem().publish(EventType.PLUGIN_LOADED, {"plugin": plugin})
return plugin
except Exception as e:
self.LOG.error(f"PluginManager加载插件 {plugin_name} 失败: {e}", exc_info=True)
return None
def unload_plugin(self, plugin_name: str) -> bool:
"""
卸载插件
Args:
plugin_name: 插件名称(可以是模块名或显示名称)
Returns:
卸载是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(plugin_name)
if not plugin:
self.LOG.info(f"PluginManager插件 {plugin_name} 未加载")
return False
# 停止插件
if plugin.status == PluginStatus.RUNNING:
if not plugin.stop():
self.LOG.info(f"PluginManager停止插件 {display_name} 失败")
return False
plugin.status = PluginStatus.STOPPED # 确保状态更新
# 清理插件资源
if not plugin.cleanup():
self.LOG.info(f"PluginManager清理插件 {display_name} 资源失败")
return False
# 设置状态为未加载
plugin.status = PluginStatus.UNLOADED
# 注销插件
PluginRegistry().unregister(display_name)
# 获取模块名,用于清理映射
try:
module_name = plugin.__class__.__module__.split('.')[-2]
# 清理模块名到插件名的映射
if module_name in self.module_to_plugin:
del self.module_to_plugin[module_name]
except (IndexError, AttributeError):
pass
# 移除插件实例
del self.plugins[display_name]
# 发布插件卸载事件
EventSystem().publish(EventType.PLUGIN_UNLOADED, {"plugin_name": display_name})
return True
def reload_plugin(self, plugin_name: str) -> Optional[PluginInterface]:
"""
重新加载插件
Args:
plugin_name: 插件名称(可以是模块名或显示名称)
Returns:
插件实例重新加载失败返回None
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(plugin_name)
if not plugin:
self.LOG.info(f"PluginManager插件 {plugin_name} 未加载,无法重载")
return None
# 记录原插件状态和模块名
was_running = plugin.status == PluginStatus.RUNNING
try:
module_name = plugin.__class__.__module__.split('.')[-2]
except (IndexError, AttributeError):
self.LOG.error(f"无法获取插件 {display_name} 的模块名,重载失败")
return None
# 卸载插件
if not self.unload_plugin(display_name):
self.LOG.info(f"卸载插件 {display_name} 失败,无法重载")
return None
# 重新导入模块
if module_name in self.plugin_modules:
try:
importlib.reload(self.plugin_modules[module_name])
except Exception as e:
self.LOG.info(f"重新导入插件模块 {module_name} 失败: {e}")
return None
# 加载插件
plugin = self.load_plugin(module_name)
# 如果原来是运行状态,则重新启动
if plugin and was_running:
self.start_plugin(plugin.name)
return plugin
def start_plugin(self, plugin_name: str) -> bool:
"""
启动插件
Args:
plugin_name: 插件名称(可以是模块名或显示名称)
Returns:
启动是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(plugin_name)
if not plugin:
self.LOG.info(f"PluginManager插件 {plugin_name} 未加载")
return False
if plugin.status == PluginStatus.RUNNING:
self.LOG.info(f"PluginManager插件 {display_name} 已经在运行")
return True
if plugin.start():
plugin.status = PluginStatus.RUNNING
self.LOG.info(f"PluginManager插件 {display_name} 状态变更为在运行")
return True
else:
plugin.status = PluginStatus.ERROR
self.LOG.info(f"PluginManager插件 {display_name} 状态变更为异常")
return False
def stop_plugin(self, plugin_name: str) -> bool:
"""
停止插件
Args:
plugin_name: 插件名称(可以是模块名或显示名称)
Returns:
停止是否成功
"""
# 查找插件
display_name, plugin = self.find_plugin_by_name(plugin_name)
if not plugin:
self.LOG.info(f"插件 {plugin_name} 未加载")
return False
if plugin.status != PluginStatus.RUNNING:
self.LOG.info(f"插件 {display_name} 未在运行")
return True
if plugin.stop():
plugin.status = PluginStatus.STOPPED
self.LOG.info(f"插件 {display_name} 状态变更为已停止")
return True
else:
plugin.status = PluginStatus.ERROR
self.LOG.info(f"插件 {display_name} 状态变更为异常")
return False
def shutdown_plugins(self) -> bool:
"""
卸载所有插件
Returns:
是否全部成功卸载
"""
success = True
# 创建插件名称的副本因为在卸载过程中会修改self.plugins字典
plugin_names = list(self.plugins.keys())
for plugin_name in plugin_names:
if not self.unload_plugin(plugin_name):
self.LOG.error(f"卸载插件 {plugin_name} 失败")
success = False
# 清空插件模块字典
self.plugin_modules.clear()
# 确保插件字典为空
if self.plugins:
self.LOG.warning(f"插件卸载后仍有 {len(self.plugins)} 个插件残留")
success = False
return success
def find_plugin_by_name(self, plugin_name: str) -> Tuple[Optional[str], Optional[PluginInterface]]:
"""
根据插件名称或模块名查找插件
Args:
plugin_name: 插件名称或模块名
Returns:
(插件显示名称, 插件实例) 元组,未找到返回 (None, None)
"""
# 直接通过显示名称查找
if plugin_name in self.plugins:
return plugin_name, self.plugins[plugin_name]
# 通过模块名查找
if plugin_name in self.module_to_plugin:
display_name = self.module_to_plugin[plugin_name]
return display_name, self.plugins.get(display_name)
# 遍历所有插件查找匹配的模块名
for name, plugin in self.plugins.items():
try:
module_name = plugin.__class__.__module__.split('.')[-2]
if module_name == plugin_name:
return name, plugin
except (IndexError, AttributeError):
continue
return None, None