import json import re import time import uuid from typing import Any, Dict, List, Optional, Tuple import aiohttp from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from wechat_ipad import WechatAPIClient class MaiBotAdapterPlugin(MessagePluginInterface): """将外部部署的 MaiBot WebUI 聊天能力桥接到 abot 的消息插件。""" FEATURE_KEY = "MAIBOT_CHAT" FEATURE_DESCRIPTION = "🤖 MaiBot 对话桥接 [麦麦, maibot, mai]" @property def name(self) -> str: return "MaiBot对话" @property def version(self) -> str: return "1.0.0" @property def description(self) -> str: return "通过 MaiBot WebUI 的统一 WebSocket 协议接入外部 MaiBot 对话能力" @property def author(self) -> str: return "Codex" @property def command_prefix(self) -> Optional[str]: """命令插件沿用现有空前缀约定,直接匹配第一个词。""" return "" @property def commands(self) -> List[str]: return self._commands @property def feature_key(self) -> Optional[str]: return self.FEATURE_KEY @property def feature_description(self) -> Optional[str]: return self.FEATURE_DESCRIPTION def __init__(self): super().__init__() # 注册权限特征,便于按群开关此插件。 self.feature = self.register_feature() self._commands: List[str] = ["麦麦", "maibot", "mai"] self._enabled = True self._allow_group_at = True self._command_tip = "麦麦 你好" self._server_url = "" self._access_token = "" self._connect_timeout = 15 self._reply_timeout = 90 self._session_scope = "room" self._verify_ssl = True self._config_ready = False def initialize(self, context: Dict[str, Any]) -> bool: """初始化插件配置与上下文引用。""" self.LOG.debug(f"正在初始化 {self.name} 插件...") maibot_config = self._config.get("MaiBotAdapter", {}) or {} self._commands = [str(item).strip() for item in maibot_config.get("commands", self._commands) if str(item).strip()] self._enabled = bool(maibot_config.get("enable", True)) self._allow_group_at = bool(maibot_config.get("allow_group_at", True)) self._command_tip = str(maibot_config.get("command-tip", self._command_tip)).strip() self._server_url = str(maibot_config.get("server_url", "") or "").rstrip("/") self._access_token = str(maibot_config.get("access_token", "") or "").strip() self._connect_timeout = max(5, int(maibot_config.get("connect_timeout", 15) or 15)) self._reply_timeout = max(10, int(maibot_config.get("reply_timeout", 90) or 90)) self._session_scope = str(maibot_config.get("session_scope", "room") or "room").strip().lower() self._verify_ssl = bool(maibot_config.get("verify_ssl", True)) # 这里不因为配置缺失而让插件初始化失败: # 1. 这样插件可以先被系统正常加载,后续热更新 TOML 即可生效; # 2. 真正处理消息时会再次检查配置完备性,并打印更清晰的日志。 self._config_ready = bool(self._server_url and self._access_token) if not self._config_ready: self.LOG.warning( f"[{self.name}] 当前 server_url/access_token 未配置完整,插件会加载成功但不会实际处理消息" ) self.LOG.debug( f"[{self.name}] 初始化完成: commands={self._commands}, " f"allow_group_at={self._allow_group_at}, server_url={self._server_url}, " f"session_scope={self._session_scope}, verify_ssl={self._verify_ssl}" ) return True def start(self) -> bool: self.status = PluginStatus.RUNNING self.LOG.debug(f"[{self.name}] 插件已启动") return True def stop(self) -> bool: self.status = PluginStatus.STOPPED self.LOG.info(f"[{self.name}] 插件已停止") return True def can_process(self, message: Dict[str, Any]) -> bool: """判断当前消息是否该由 MaiBot 对话插件接管。""" if not self._enabled: return False if not self._config_ready: return False content = str(message.get("content", "") or "").strip() if not content: return False first_token = content.split(" ", 1)[0] if first_token in self._commands: return True if self._allow_group_at and bool(message.get("is_at", False)) and str(message.get("roomid", "") or "").strip(): return True return False @plugin_stats_decorator(plugin_name="MaiBot对话") @group_feature_rate_limit(max_per_minute=5, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """处理命令对话或群聊 @ 对话,并将问题转发到远端 MaiBot。""" content = str(message.get("content", "") or "").strip() sender = str(message.get("sender", "") or "").strip() roomid = str(message.get("roomid", "") or "").strip() target = roomid if roomid else sender gbm: GroupBotManager = message.get("gbm") bot: WechatAPIClient = message.get("bot") self.LOG.info( f"[{self.name}] 收到消息: sender={sender}, roomid={roomid}, " f"is_at={message.get('is_at', False)}, content_preview={content[:120]}" ) if roomid and gbm and self.feature and gbm.get_group_permission(target, self.feature) == PermissionStatus.DISABLED: self.LOG.info(f"[{self.name}] 群 {target} 未启用功能权限,跳过处理") return False, "没有权限" query = self._extract_query(message) if not query: if bot: await bot.send_text_message(target, self._command_tip, sender if roomid else "") return False, "没有提供问题内容" session_key = self._build_session_key(roomid=roomid, sender=sender) self.LOG.info( f"[{self.name}] 准备请求 MaiBot: session_key={session_key}, " f"query_len={len(query)}, query_preview={query[:120]}" ) try: response = await self._query_maibot(session_key=session_key, query=query) response = self._normalize_response_text(response) if not response: self.LOG.warning(f"[{self.name}] MaiBot 返回空响应: session_key={session_key}") return False, "MaiBot 返回空响应" if bot: await bot.send_text_message(target, response, sender if roomid else "") self.LOG.info( f"[{self.name}] MaiBot 回复成功: session_key={session_key}, " f"reply_len={len(response)}, reply_preview={response[:120]}" ) return True, "发送成功" except Exception as exc: self.LOG.exception(f"[{self.name}] 请求 MaiBot 失败: {exc}") if bot: await bot.send_text_message(target, f"❌MaiBot 对话失败:{exc}", sender if roomid else "") return False, f"MaiBot 对话失败: {exc}" def _extract_query(self, message: Dict[str, Any]) -> str: """从命令消息或 @ 消息中提取真正发给 MaiBot 的文本。""" content = str(message.get("content", "") or "").strip() roomid = str(message.get("roomid", "") or "").strip() is_at = bool(message.get("is_at", False)) first_token = content.split(" ", 1)[0] if content else "" if first_token in self._commands: parts = content.split(" ", 1) return parts[1].strip() if len(parts) > 1 else "" if is_at and roomid: # 兼容微信里常见的 “@机器人[空白]内容” 形式,去掉最前面的 @ 提及部分。 return re.sub(r"^@.*?[\u2005|\s]+", "", content).strip() return "" def _build_session_key(self, roomid: str, sender: str) -> str: """按配置生成 MaiBot 会话键,决定上下文是按群共享还是按人隔离。""" normalized_roomid = roomid or "private" normalized_sender = sender or "unknown" if self._session_scope == "sender": return f"abot:{normalized_roomid}:{normalized_sender}" return f"abot:{normalized_roomid}" @staticmethod def _normalize_response_text(response: str) -> str: """简单清理回复文本,避免把 WS 结构层遗留空白直接发回群里。""" normalized_text = str(response or "").replace("\r\n", "\n").strip() normalized_text = re.sub(r"\n{3,}", "\n\n", normalized_text) return normalized_text async def _query_maibot(self, session_key: str, query: str) -> str: """执行一次完整的 MaiBot HTTP 认证 + WS 会话对话流程。""" client_timeout = aiohttp.ClientTimeout(total=self._reply_timeout + self._connect_timeout + 10) cookie_jar = aiohttp.CookieJar(unsafe=True) ssl_option = None if self._verify_ssl else False async with aiohttp.ClientSession(timeout=client_timeout, cookie_jar=cookie_jar) as session: await self._login_webui(session=session, ssl_option=ssl_option) ws_token = await self._fetch_ws_token(session=session, ssl_option=ssl_option) ws_url = self._build_ws_url(ws_token) self.LOG.info(f"[{self.name}] 正在连接 MaiBot WebSocket: ws_url={ws_url}") async with session.ws_connect( ws_url, ssl=ssl_option, heartbeat=30, receive_timeout=self._reply_timeout + 10, timeout=self._connect_timeout, ) as websocket: client_session_id = f"{session_key}:{uuid.uuid4().hex[:8]}" # 这里显式打开逻辑聊天会话,确保后续回复都能按 session 维度关联回来。 open_request_id = f"open_{uuid.uuid4().hex}" await websocket.send_json( { "op": "call", "id": open_request_id, "domain": "chat", "method": "session.open", "session": client_session_id, "data": { "restore": True, "user_id": session_key, "user_name": "ABotBridge", }, } ) await self._wait_for_call_ok( websocket=websocket, request_id=open_request_id, expected_session=client_session_id, ) send_request_id = f"send_{uuid.uuid4().hex}" await websocket.send_json( { "op": "call", "id": send_request_id, "domain": "chat", "method": "message.send", "session": client_session_id, "data": { "content": query, "user_name": "ABotBridge", }, } ) await self._wait_for_call_ok( websocket=websocket, request_id=send_request_id, expected_session=client_session_id, ) reply_text = await self._wait_for_bot_message( websocket=websocket, expected_session=client_session_id, ) # 关闭逻辑会话是“尽力而为”动作: # 1. 即使关闭失败,也不影响当前已经拿到的回复; # 2. 因为 WebSocket 断开后服务端也会清理连接,所以这里不把异常上抛。 try: close_request_id = f"close_{uuid.uuid4().hex}" await websocket.send_json( { "op": "call", "id": close_request_id, "domain": "chat", "method": "session.close", "session": client_session_id, "data": {}, } ) except Exception as close_exc: self.LOG.warning(f"[{self.name}] 关闭 MaiBot 逻辑会话失败: {close_exc}") return reply_text async def _login_webui(self, session: aiohttp.ClientSession, ssl_option: Any) -> None: """使用 MaiBot WebUI token 登录,以便后续换取一次性 ws-token。""" verify_url = f"{self._server_url}/api/webui/auth/verify" payload = {"token": self._access_token} self.LOG.info(f"[{self.name}] 正在校验 MaiBot token: verify_url={verify_url}") async with session.post(verify_url, json=payload, ssl=ssl_option) as response: response_text = await response.text() if response.status != 200: raise RuntimeError(f"MaiBot token 校验失败,HTTP {response.status}: {response_text[:200]}") try: response_data = json.loads(response_text) except json.JSONDecodeError as exc: raise RuntimeError(f"MaiBot token 校验响应不是合法 JSON: {response_text[:200]}") from exc if not bool(response_data.get("valid")): raise RuntimeError(f"MaiBot token 无效: {response_data.get('message') or response_text[:200]}") async def _fetch_ws_token(self, session: aiohttp.ClientSession, ssl_option: Any) -> str: """通过已登录的 Cookie 换取一次性 WebSocket 临时 token。""" token_url = f"{self._server_url}/api/webui/ws-token" self.LOG.info(f"[{self.name}] 正在申请 MaiBot ws-token: url={token_url}") async with session.get(token_url, ssl=ssl_option) as response: response_text = await response.text() if response.status != 200: raise RuntimeError(f"MaiBot ws-token 获取失败,HTTP {response.status}: {response_text[:200]}") try: response_data = json.loads(response_text) except json.JSONDecodeError as exc: raise RuntimeError(f"MaiBot ws-token 响应不是合法 JSON: {response_text[:200]}") from exc if not bool(response_data.get("success")): raise RuntimeError(f"MaiBot ws-token 获取失败: {response_data.get('message') or response_text[:200]}") ws_token = str(response_data.get("token", "") or "").strip() if not ws_token: raise RuntimeError("MaiBot ws-token 为空") return ws_token def _build_ws_url(self, ws_token: str) -> str: """根据 server_url 自动转换成统一 WebSocket 地址。""" if self._server_url.startswith("https://"): base_ws_url = "wss://" + self._server_url[len("https://"):] elif self._server_url.startswith("http://"): base_ws_url = "ws://" + self._server_url[len("http://"):] else: raise RuntimeError(f"不支持的 MaiBot server_url: {self._server_url}") return f"{base_ws_url}/ws?token={ws_token}" async def _wait_for_call_ok( self, websocket: aiohttp.ClientWebSocketResponse, request_id: str, expected_session: str, ) -> Dict[str, Any]: """等待某次 WebSocket call 的确认响应。""" deadline = time.time() + self._reply_timeout while time.time() < deadline: message = await self._receive_ws_json(websocket) # 统一 WebSocket 的准备事件不参与业务判断,直接忽略。 if message.get("domain") == "system" and message.get("event") == "ready": continue if message.get("op") != "response": continue if str(message.get("id") or "") != request_id: continue if not bool(message.get("ok")): error_info = message.get("error") or {} raise RuntimeError(f"MaiBot 调用失败: {error_info}") data = message.get("data") or {} if expected_session and str(data.get("session") or expected_session) != expected_session: self.LOG.warning( f"[{self.name}] 收到 session 不匹配的响应: expected={expected_session}, actual={data.get('session')}" ) return message raise TimeoutError(f"等待 MaiBot 请求确认超时: request_id={request_id}") async def _wait_for_bot_message( self, websocket: aiohttp.ClientWebSocketResponse, expected_session: str, ) -> str: """等待目标会话真正的机器人消息事件。""" deadline = time.time() + self._reply_timeout while time.time() < deadline: message = await self._receive_ws_json(websocket) if message.get("op") != "event": continue if str(message.get("domain") or "") != "chat": continue if str(message.get("session") or "") != expected_session: continue event_name = str(message.get("event") or "").strip() data = message.get("data") or {} data_type = str(data.get("type") or "").strip() # typing / history / system / user_message 都属于过程事件,继续等待即可。 if event_name == "bot_message" and data_type == "bot_message": reply_text = str(data.get("content", "") or "").strip() if reply_text: return reply_text if event_name == "error" or data_type == "error": raise RuntimeError(str(data.get("content") or "MaiBot 返回错误事件")) raise TimeoutError(f"等待 MaiBot 回复超时: session={expected_session}") async def _receive_ws_json(self, websocket: aiohttp.ClientWebSocketResponse) -> Dict[str, Any]: """从 WebSocket 中读取一条 JSON 消息,并统一处理异常场景。""" message = await websocket.receive() if message.type == aiohttp.WSMsgType.TEXT: try: payload = json.loads(message.data) except json.JSONDecodeError as exc: raise RuntimeError(f"MaiBot WebSocket 返回了非法 JSON: {message.data[:200]}") from exc if isinstance(payload, dict): return payload raise RuntimeError(f"MaiBot WebSocket 返回了非对象 JSON: {payload}") if message.type == aiohttp.WSMsgType.CLOSED: raise RuntimeError("MaiBot WebSocket 已关闭") if message.type == aiohttp.WSMsgType.ERROR: raise RuntimeError(f"MaiBot WebSocket 出错: {websocket.exception()}") raise RuntimeError(f"MaiBot WebSocket 返回了不支持的消息类型: {message.type}")