270 lines
9.1 KiB
Python
270 lines
9.1 KiB
Python
"""
|
||
MCP 管理插件
|
||
|
||
管理 MCP (Model Context Protocol) 服务器,将 MCP 工具自动注册到 ToolRegistry,
|
||
使 AI 可以调用各种 MCP 服务器提供的工具。
|
||
|
||
功能:
|
||
- 自动连接配置的 MCP 服务器
|
||
- 将 MCP 工具转换为 OpenAI 格式并注册
|
||
- 支持热重载(禁用/启用插件时自动管理连接)
|
||
- 提供管理命令(查看状态、重连等)
|
||
"""
|
||
|
||
import tomllib
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List
|
||
from loguru import logger
|
||
|
||
from utils.plugin_base import PluginBase
|
||
from utils.decorators import on_text_message
|
||
from utils.tool_registry import get_tool_registry
|
||
|
||
from .mcp_client import MCPManager, MCPServerConfig
|
||
|
||
|
||
class MCPManagerPlugin(PluginBase):
|
||
"""MCP 管理插件"""
|
||
|
||
description = "MCP 服务器管理,自动注册 MCP 工具到 AI"
|
||
author = "ShiHao"
|
||
version = "1.0.0"
|
||
|
||
# 高优先级加载,确保在其他插件之前注册工具
|
||
load_priority = 90
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.config = None
|
||
self.mcp_manager: MCPManager = None
|
||
self._registered_tools: List[str] = [] # 已注册的工具名列表
|
||
|
||
async def async_init(self):
|
||
"""插件异步初始化"""
|
||
# 读取配置
|
||
config_path = Path(__file__).parent / "config.toml"
|
||
with open(config_path, "rb") as f:
|
||
self.config = tomllib.load(f)
|
||
|
||
mcp_config = self.config.get("mcp", {})
|
||
|
||
if not mcp_config.get("enabled", True):
|
||
logger.info("MCPManager: MCP 功能已禁用")
|
||
return
|
||
|
||
# 初始化 MCP 管理器
|
||
self.mcp_manager = MCPManager(
|
||
tool_timeout=mcp_config.get("tool_timeout", 60),
|
||
server_start_timeout=mcp_config.get("server_start_timeout", 30)
|
||
)
|
||
|
||
# 连接所有配置的服务器
|
||
servers = mcp_config.get("servers", [])
|
||
if not servers:
|
||
logger.info("MCPManager: 未配置任何 MCP 服务器")
|
||
return
|
||
|
||
if mcp_config.get("auto_connect", True):
|
||
await self._connect_all_servers(servers)
|
||
|
||
logger.success(f"MCPManager 插件已加载,已连接 {len(self.mcp_manager.clients)} 个 MCP 服务器")
|
||
|
||
async def _connect_all_servers(self, servers: List[Dict]):
|
||
"""连接所有配置的服务器"""
|
||
for server_data in servers:
|
||
try:
|
||
config = MCPServerConfig.from_dict(server_data)
|
||
success = await self.mcp_manager.add_server(config)
|
||
|
||
if success:
|
||
# 注册工具到 ToolRegistry
|
||
await self._register_server_tools(config.name)
|
||
|
||
except Exception as e:
|
||
logger.error(f"MCPManager: 连接服务器 {server_data.get('name', '未知')} 失败: {e}")
|
||
|
||
async def _register_server_tools(self, server_name: str):
|
||
"""将服务器的工具注册到 ToolRegistry"""
|
||
client = self.mcp_manager.clients.get(server_name)
|
||
if not client:
|
||
return
|
||
|
||
registry = get_tool_registry()
|
||
prefix = client.config.tool_prefix
|
||
|
||
for tool in client.tools.values():
|
||
schema = tool.to_openai_schema(prefix)
|
||
tool_name = schema["function"]["name"]
|
||
|
||
# 创建工具执行器
|
||
async def executor(name: str, arguments: Dict, bot, from_wxid: str, _tn=tool_name) -> Dict:
|
||
return await self.mcp_manager.call_tool(_tn, arguments)
|
||
|
||
# 注册到 ToolRegistry
|
||
success = registry.register(
|
||
name=tool_name,
|
||
plugin_name=f"MCP:{server_name}",
|
||
schema=schema,
|
||
executor=executor,
|
||
timeout=self.mcp_manager.tool_timeout,
|
||
priority=40 # 比普通插件优先级稍低
|
||
)
|
||
|
||
if success:
|
||
self._registered_tools.append(tool_name)
|
||
logger.debug(f"MCPManager: 注册工具 {tool_name}")
|
||
|
||
logger.info(f"MCPManager: 从 {server_name} 注册了 {len(client.tools)} 个工具")
|
||
|
||
async def on_disable(self):
|
||
"""插件禁用时清理"""
|
||
await super().on_disable()
|
||
|
||
# 注销所有已注册的工具
|
||
registry = get_tool_registry()
|
||
for tool_name in self._registered_tools:
|
||
registry.unregister(tool_name)
|
||
self._registered_tools.clear()
|
||
|
||
# 关闭所有 MCP 服务器连接
|
||
if self.mcp_manager:
|
||
await self.mcp_manager.shutdown()
|
||
self.mcp_manager = None
|
||
|
||
logger.info("MCPManager: 已清理所有 MCP 连接和工具注册")
|
||
|
||
# ==================== 管理命令 ====================
|
||
|
||
@on_text_message(priority=80)
|
||
async def handle_admin_commands(self, bot, message: dict):
|
||
"""处理管理命令"""
|
||
content = message.get("Content", "").strip()
|
||
|
||
# 只响应管理员的命令
|
||
# TODO: 从主配置读取管理员列表
|
||
if not content.startswith("/mcp"):
|
||
return True
|
||
|
||
parts = content.split()
|
||
if len(parts) < 2:
|
||
return True
|
||
|
||
cmd = parts[1].lower()
|
||
reply_to = message.get("FromWxid", "")
|
||
|
||
if cmd == "status":
|
||
await self._cmd_status(bot, reply_to)
|
||
return False
|
||
|
||
elif cmd == "list":
|
||
await self._cmd_list_tools(bot, reply_to)
|
||
return False
|
||
|
||
elif cmd == "reload":
|
||
await self._cmd_reload(bot, reply_to)
|
||
return False
|
||
|
||
return True
|
||
|
||
async def _cmd_status(self, bot, reply_to: str):
|
||
"""查看 MCP 服务器状态"""
|
||
if not self.mcp_manager:
|
||
await bot.send_text(reply_to, "MCP 功能未启用")
|
||
return
|
||
|
||
servers = self.mcp_manager.list_servers()
|
||
if not servers:
|
||
await bot.send_text(reply_to, "没有已连接的 MCP 服务器")
|
||
return
|
||
|
||
lines = ["📡 MCP 服务器状态:"]
|
||
for s in servers:
|
||
status = "✅" if s["connected"] else "❌"
|
||
lines.append(f"{status} {s['name']}: {s['tools_count']} 个工具")
|
||
|
||
await bot.send_text(reply_to, "\n".join(lines))
|
||
|
||
async def _cmd_list_tools(self, bot, reply_to: str):
|
||
"""列出所有 MCP 工具"""
|
||
if not self.mcp_manager:
|
||
await bot.send_text(reply_to, "MCP 功能未启用")
|
||
return
|
||
|
||
servers = self.mcp_manager.list_servers()
|
||
if not servers:
|
||
await bot.send_text(reply_to, "没有已连接的 MCP 服务器")
|
||
return
|
||
|
||
lines = ["🔧 MCP 工具列表:"]
|
||
for s in servers:
|
||
if s["tools"]:
|
||
lines.append(f"\n【{s['name']}】")
|
||
for tool in s["tools"][:10]: # 限制显示数量
|
||
lines.append(f" • {tool}")
|
||
if len(s["tools"]) > 10:
|
||
lines.append(f" ... 还有 {len(s['tools']) - 10} 个")
|
||
|
||
await bot.send_text(reply_to, "\n".join(lines))
|
||
|
||
async def _cmd_reload(self, bot, reply_to: str):
|
||
"""重新加载 MCP 服务器"""
|
||
await bot.send_text(reply_to, "正在重新加载 MCP 服务器...")
|
||
|
||
# 清理现有连接
|
||
if self.mcp_manager:
|
||
registry = get_tool_registry()
|
||
for tool_name in self._registered_tools:
|
||
registry.unregister(tool_name)
|
||
self._registered_tools.clear()
|
||
await self.mcp_manager.shutdown()
|
||
|
||
# 重新读取配置
|
||
config_path = Path(__file__).parent / "config.toml"
|
||
with open(config_path, "rb") as f:
|
||
self.config = tomllib.load(f)
|
||
|
||
mcp_config = self.config.get("mcp", {})
|
||
|
||
# 重新初始化
|
||
self.mcp_manager = MCPManager(
|
||
tool_timeout=mcp_config.get("tool_timeout", 60),
|
||
server_start_timeout=mcp_config.get("server_start_timeout", 30)
|
||
)
|
||
|
||
servers = mcp_config.get("servers", [])
|
||
await self._connect_all_servers(servers)
|
||
|
||
await bot.send_text(
|
||
reply_to,
|
||
f"MCP 重新加载完成,已连接 {len(self.mcp_manager.clients)} 个服务器"
|
||
)
|
||
|
||
# ==================== LLM 工具接口(备用) ====================
|
||
|
||
def get_llm_tools(self) -> List[Dict]:
|
||
"""
|
||
返回 MCP 工具列表(备用接口)
|
||
|
||
注意:工具已通过 ToolRegistry 注册,此方法仅供参考
|
||
"""
|
||
if not self.mcp_manager:
|
||
return []
|
||
return self.mcp_manager.get_all_tools()
|
||
|
||
async def execute_llm_tool(
|
||
self,
|
||
tool_name: str,
|
||
arguments: Dict[str, Any],
|
||
bot,
|
||
from_wxid: str
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
执行 MCP 工具(备用接口)
|
||
|
||
注意:工具已通过 ToolRegistry 注册,此方法仅供备用
|
||
"""
|
||
if not self.mcp_manager:
|
||
return {"success": False, "error": "MCP 功能未启用"}
|
||
|
||
return await self.mcp_manager.call_tool(tool_name, arguments)
|