From a6f3eb968805a0502144484e7b35ab324b9c00ea Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 29 Apr 2026 09:58:50 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84MaiBot=E9=80=82=E9=85=8D?= =?UTF-8?q?=E5=99=A8=E4=B8=BA=E5=AE=98=E6=96=B9API=20Server=E9=95=BF?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E9=87=87=E9=9B=86=E6=A8=A1=E5=BC=8F=E5=B9=B6?= =?UTF-8?q?=E8=A1=A5=E5=85=A8Dashboard=E8=AF=B4=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../maibot_adapter/DEPLOY_192.168.2.240.md | 219 ++-- plugins/maibot_adapter/README.md | 61 +- plugins/maibot_adapter/config.toml | 129 ++- plugins/maibot_adapter/main.py | 975 ++++++++++++------ 4 files changed, 871 insertions(+), 513 deletions(-) diff --git a/plugins/maibot_adapter/DEPLOY_192.168.2.240.md b/plugins/maibot_adapter/DEPLOY_192.168.2.240.md index 30c2500..15bc3d0 100644 --- a/plugins/maibot_adapter/DEPLOY_192.168.2.240.md +++ b/plugins/maibot_adapter/DEPLOY_192.168.2.240.md @@ -1,91 +1,50 @@ -# 192.168.2.240 部署 MaiBot 清单 +# 192.168.2.240 上的 MaiBot 部署说明 -这份文档用于把 `MaiBot` 部署到 `192.168.2.240`,供 `plugins/maibot_adapter` 调用。 +这份文档记录当前 `MaiBot 官方镜像` 在 `192.168.2.240` 的实际部署状态,供 `plugins/maibot_adapter` 对接使用。 -## 当前状态 +## 当前结论 -我已经在 `abot` 里新增了 `maibot_adapter` 插件,但当前还没有这台服务器的 SSH 凭据。 -如果你把 SSH 用户名/密码,或者私钥登录方式给我,我就可以直接按这份清单远程执行。 +1. `MaiBot` 使用官方镜像 `sengokucola/maibot:latest` 部署。 +2. 没有改动 MaiBot 源码,dashboard 通过官方 `maibot-dashboard==1.0.0` 静态资源挂载方式补齐。 +3. `abot` 侧不再走 WebUI 聊天页,而是改走 `官方 API Server`。 +4. 现在的推荐接入地址如下: + - WebUI / Dashboard:`http://192.168.2.240:18001/` + - WebUI Health:`http://192.168.2.240:18001/api/webui/health` + - 官方 API Server WS:`ws://192.168.2.240:18009/ws` -## 目标 +## 当前 docker-compose 关键点 -部署完成后,`abot` 将使用以下接口访问 MaiBot: +容器名: -1. `POST /api/webui/auth/verify` -2. `GET /api/webui/ws-token` -3. `WS /ws?token=...` - -因此最终需要保证: - -1. `http://192.168.2.240:8001/api/webui/health` 可访问 -2. 你手里有一个可用的 `MaiBot WebUI token` -3. `plugins/maibot_adapter/config.toml` 中填入同一个 `server_url` 与 `access_token` - -## 推荐部署方式 - -当前优先建议“宿主机 Python 直跑”: - -1. 部署简单,方便先打通插件联调 -2. 日志更容易直接看 -3. 后面稳定后再考虑改成 Docker Compose - -## 服务器执行步骤 - -以下命令默认按 Linux 服务器写。 - -### 1. 安装基础依赖 - -```bash -sudo apt update -sudo apt install -y git python3 python3-venv python3-pip +```text +maibot-core-lite ``` -### 2. 拉取 MaiBot +镜像: -```bash -cd /opt -sudo git clone https://github.com/Mai-with-u/MaiBot.git -sudo chown -R $USER:$USER /opt/MaiBot -cd /opt/MaiBot +```text +sengokucola/maibot:latest ``` -### 3. 创建虚拟环境并安装依赖 +核心端口映射: -```bash -python3 -m venv .venv -source .venv/bin/activate -python -m pip install --upgrade pip -pip install -e . +```text +18001 -> 8001 # WebUI / Dashboard / WebUI API +18009 -> 8090 # 官方 API Server ``` -如果 `pip install -e .` 遇到单个依赖下载慢,可以切换镜像源再执行。 +## Dashboard 访问地址 -### 4. 首次启动 +当前已经补全完成,可直接访问: -```bash -source /opt/MaiBot/.venv/bin/activate -cd /opt/MaiBot -python bot.py +```text +http://192.168.2.240:18001/ ``` -说明: +如果只想看健康状态: -1. `bot.py` 是 MaiBot 主入口 -2. 首次启动时通常需要你按它自己的配置流程完成基础设置 -3. WebUI 默认监听端口通常是 `8001` - -### 5. 验证健康状态 - -本机执行: - -```bash -curl http://127.0.0.1:8001/api/webui/health -``` - -局域网执行: - -```bash -curl http://192.168.2.240:8001/api/webui/health +```text +http://192.168.2.240:18001/api/webui/health ``` 期待返回: @@ -94,87 +53,61 @@ curl http://192.168.2.240:8001/api/webui/health {"status":"healthy","service":"MaiBot WebUI"} ``` -### 6. 获取 WebUI token +## embedding 模型现状 -这一步有两种方式: +当前远端已切到 Ollama 向量服务: -1. 按 MaiBot 首次配置流程,在 WebUI 中设置 token -2. 如果已经有现成 token,直接记下来给 `maibot_adapter` 使用 +```text +ollama_base_url = "http://192.168.2.50:11434/v1" +embedding_model = "bge-m3:latest" +``` -最终你需要把 token 填到: +这套配置已经验证可用,MaiBot 日志里能看到 embedding 维度识别成功。 -`plugins/maibot_adapter/config.toml` +## 官方 API Server 接入方式 + +`maibot_adapter` 现在用的是官方 API Server 协议,不再使用: + +1. `POST /api/webui/auth/verify` +2. `GET /api/webui/ws-token` +3. `WS /api/webui/ws?token=...` + +现在实际使用的是: + +1. `WS ws://192.168.2.240:18009/ws` +2. 握手头: + - `x-uuid` + - `x-apikey` + - `x-platform` +3. 消息包类型: + - `sys_std` +4. 负载结构: + - `APIMessageBase` + +## abot 配置建议 + +当前 `plugins/maibot_adapter/config.toml` 推荐值: ```toml -server_url = "http://192.168.2.240:8001" -access_token = "你的MaiBotToken" +[MaiBotAdapter] +enable = true +collect_group_messages = true +collect_private_messages = true +enable_reply_output = true +reply_group_messages = true +reply_private_messages = true +respect_group_feature_switch = true +mention_user_on_group_reply = false +api_server_ws_url = "ws://192.168.2.240:18009/ws" +api_key = "abot-maibot" +platform_name = "abot-maibot" ``` -### 7. 建议做成 systemd 服务 +## 设计边界 -创建 `/etc/systemd/system/maibot.service` +当前这套接法遵循你已经确认过的原则: -```ini -[Unit] -Description=MaiBot Service -After=network.target - -[Service] -Type=simple -User=root -WorkingDirectory=/opt/MaiBot -ExecStart=/opt/MaiBot/.venv/bin/python /opt/MaiBot/bot.py -Restart=always -RestartSec=5 -Environment=PYTHONUNBUFFERED=1 - -[Install] -WantedBy=multi-user.target -``` - -启用并启动: - -```bash -sudo systemctl daemon-reload -sudo systemctl enable maibot -sudo systemctl start maibot -sudo systemctl status maibot -``` - -看日志: - -```bash -sudo journalctl -u maibot -f -``` - -## 与 abot 对接 - -部署好后,回到 `abot`: - -1. 编辑 `plugins/maibot_adapter/config.toml` -2. 填入: - - `server_url = "http://192.168.2.240:8001"` - - `access_token = "你的MaiBotToken"` -3. 重启 `abot` 或热加载插件 - -测试指令: - -```text -麦麦 你好 -``` - -或者在群里直接: - -```text -@机器人 你好 -``` - -## 备注 - -如果你后面希望: - -1. `ai_auto_response` 也复用 MaiBot -2. 做自动插话而不是只做命令对话 -3. 做会话长驻、减少每次重新认证和建连成本 - -下一步就可以把 `maibot_adapter` 再往“共享长连接 + 自动回复桥”方向升级。 +1. 消息默认发送给 MaiBot 做采集。 +2. 是否真的回复,由开关和群功能权限控制。 +3. 不重新开发 MaiBot 对话/记忆本体能力。 +4. 尽量把“参与聊天”的判断留在 MaiBot 侧,而不是在 abot 里再做一遍复杂前置策略。 diff --git a/plugins/maibot_adapter/README.md b/plugins/maibot_adapter/README.md index 95fa8bb..d09ba61 100644 --- a/plugins/maibot_adapter/README.md +++ b/plugins/maibot_adapter/README.md @@ -1,35 +1,50 @@ # MaiBot Adapter 插件 -这个插件用于把外部独立部署的 `MaiBot WebUI` 对接到 `abot` 的插件体系里。 +这个插件现在已经改成对接 `MaiBot 官方 API Server`,不再依赖 `WebUI 聊天页` 那套 token + ws-token + `/api/webui/ws` 流程。 ## 当前能力 -1. 支持命令触发: - - `麦麦 你好` - - `maibot 你是谁` -2. 支持群聊里 `@机器人` 后把文本转发给 MaiBot。 -3. 插件内部会自动执行: - - `POST /api/webui/auth/verify` - - `GET /api/webui/ws-token` - - `WS /ws?token=...` -4. 每次消息会走一个独立的 WebSocket 逻辑会话,并等待 `bot_message` 事件作为最终回复。 +1. 默认把 abot 收到的群聊/私聊消息旁路发送给 MaiBot,作为上下文与记忆输入。 +2. 默认不阻断其它插件链路,`ai_auto_response`、`Dify`、命令插件仍可继续照常处理。 +3. 是否真的把 MaiBot 的回复发回微信,由 `enable_reply_output`、`reply_group_messages`、`reply_private_messages` 以及群功能开关共同控制。 +4. 使用官方 API Server 协议: + - WebSocket 握手头:`x-uuid` / `x-apikey` / `x-platform` + - 消息包类型:`sys_std` + - 负载结构:`APIMessageBase` +5. 支持把图片、语音、视频、位置、应用消息转换成文本占位符发给 MaiBot,方便它做长期上下文理解。 -## 配置说明 +## 推荐定位 + +这套插件更适合做两件事: + +1. 让 MaiBot 成为 `ai_auto_response` 之外的一条“长期对话与记忆”能力链路。 +2. 先把全量消息采集给 MaiBot,等效果稳定后,再逐步提高它的主动参与度。 + +## 配置重点 见 `config.toml`: -1. `server_url` - - MaiBot WebUI 地址,例如 `http://192.168.2.240:8001` -2. `access_token` - - MaiBot WebUI 登录 token -3. `session_scope` - - `room`:同群共享语境 - - `sender`:同群内每个用户独立语境 +1. `api_server_ws_url` + - 例如 `ws://192.168.2.240:18009/ws` +2. `api_key` + - 用于官方 API Server 路由 +3. `enable_reply_output` + - 控制“MaiBot 能不能实际开口说话” +4. `respect_group_feature_switch` + - 控制群里是否还要继续受 abot 功能权限开关约束 -## 部署建议 +## 现阶段建议 -如果你后面要让 `ai_auto_response` 也改走 MaiBot,建议顺序如下: +如果你打算逐步替代 `ai_auto_response`,建议按这个顺序推进: -1. 先用这个插件验证 MaiBot 对话质量与延迟。 -2. 确认稳定后,再把 `ai_auto_response` 的“决策/生成”改成调用同一套桥接逻辑。 -3. 最后再决定是否完全下线 `ai_auto_response` 原本的 LLM 直连链路。 +1. 先保持 `collect_group_messages=true`、`enable_reply_output=false`,只观察 MaiBot 记忆与理解效果。 +2. 然后打开 `enable_reply_output=true`,但先让少量测试群开启 `MAIBOT_CHAT` 功能权限。 +3. 确认稳定后,再决定是让 MaiBot 成为独立对话插件,还是进一步吸收进 `ai_auto_response` 主流程。 + +## Dashboard / WebUI + +当前远端 `192.168.2.240` 已经补好官方 dashboard 静态资源,访问地址如下: + +1. WebUI 首页:`http://192.168.2.240:18001/` +2. 健康检查:`http://192.168.2.240:18001/api/webui/health` +3. 官方 API Server WebSocket:`ws://192.168.2.240:18009/ws` diff --git a/plugins/maibot_adapter/config.toml b/plugins/maibot_adapter/config.toml index f4f87d5..a6b7aff 100644 --- a/plugins/maibot_adapter/config.toml +++ b/plugins/maibot_adapter/config.toml @@ -1,45 +1,104 @@ [MaiBotAdapter] enable = true -# 命令触发词: -# 1. 不需要前缀,直接使用 “麦麦 你好” 这种形式即可; -# 2. 群聊里也支持 @机器人 后直接转发给 MaiBot; -# 3. 如果你后面希望只保留自动插话入口,可以把命令缩减到一个内部调试词。 -commands = ["麦麦", "maibot", "mai"] -command-tip = """ -🤖MaiBot 对话指令: -麦麦 你好 -""" +# 是否默认采集群消息到 MaiBot: +# 1. 这是“把消息送给 MaiBot 做上下文与记忆”的总开关; +# 2. 关闭后,群消息不会再进入 MaiBot; +# 3. 这个开关只影响采集,不影响 MaiBot 已经生成中的历史任务。 +collect_group_messages = true -# 是否允许群聊里通过 @机器人 触发。 -allow_group_at = true +# 是否默认采集私聊消息到 MaiBot: +# 1. 打开后,私聊也会进入 MaiBot 的上下文链路; +# 2. 如果你暂时只想把 MaiBot 当群聊人格,就可以先关掉; +# 3. 当前按你的要求默认开启“全量采集”能力。 +collect_private_messages = true -# MaiBot WebUI 服务根地址: -# 1. 不要带末尾斜杠; -# 2. 插件会基于这个地址自动请求: -# - /api/webui/auth/verify -# - /api/webui/ws-token -# - /ws -server_url = "http://192.168.2.240:8001" +# 是否允许把 MaiBot 的回复真正发回微信: +# 1. true 表示 MaiBot 不仅接收消息,也可以实际参与说话; +# 2. false 表示只做上下文采集与记忆沉淀,不做回声输出; +# 3. 这是你要求的“采集默认开启,回复单独用开关控制”的核心开关。 +enable_reply_output = true -# MaiBot WebUI 的访问令牌: -# 1. 这是你在 MaiBot WebUI 登录页里使用的 token; -# 2. 插件会先调用 auth/verify 写入 Cookie,再换取一次性 ws-token; -# 3. 建议部署完成后改成你自己的正式 token。 -access_token = "" +# 是否允许 MaiBot 往群里回复: +# 1. 只影响群输出,不影响群消息采集; +# 2. 如果你希望先观察 MaiBot 记忆效果、不让它插话,可以改成 false; +# 3. 与 enable_reply_output 共同生效。 +reply_group_messages = true -# WebSocket 聊天的超时配置(秒): -# 1. connect_timeout 控制 HTTP/WS 建连超时; -# 2. reply_timeout 控制发送问题后等待 MaiBot 回复的最长时间。 -connect_timeout = 15 -reply_timeout = 90 +# 是否允许 MaiBot 往私聊里回复: +# 1. 只影响私聊输出,不影响私聊消息采集; +# 2. 如果你担心它先在私聊里误触发,可以单独关闭; +# 3. 与 enable_reply_output 共同生效。 +reply_private_messages = true -# 会话维度: -# 1. room 表示同一个群共享一个 MaiBot 身份语境; -# 2. sender 表示每个发言人各自独立会话; -# 3. 当前默认 room,更适合群聊人格连续性。 -session_scope = "room" +# 群功能开关是否参与“能不能往群里说话”的判断: +# 1. true:继续复用 abot 现有的群功能权限体系; +# 2. false:只要 enable_reply_output/reply_group_messages 打开,就允许 MaiBot 在群里说话; +# 3. 无论 true 还是 false,消息都会照常采集并发送给 MaiBot。 +respect_group_feature_switch = true -# 是否校验 HTTPS 证书。 -# 如果你后面给 MaiBot 挂了自签名证书,可以临时改成 false。 +# 群回复时是否自动 @ 触发者: +# 1. false:直接发普通文本,更像群里自然插话; +# 2. true:会尝试 @ 原发言人,更适合点对点回应; +# 3. 当前默认 false,更贴近“参与聊天”的体验。 +mention_user_on_group_reply = false + +# MaiBot 官方 API Server 的 WebSocket 地址: +# 1. 这里必须填官方 API Server 暴露出来的 /ws; +# 2. 当前远端部署已经映射为 18009 -> 8090; +# 3. 不再使用 WebUI 的 /api/webui/ws-token 和 /api/webui/ws。 +api_server_ws_url = "ws://192.168.2.240:18009/ws" + +# MaiBot API Server 的 api_key: +# 1. 这个值会进入握手头 x-apikey 和消息维度 message_dim.api_key; +# 2. MaiBot 服务端会基于它做用户路由与会话归属; +# 3. 当前远端为了联调方便允许空列表校验,但客户端这里依旧建议明确填入一个稳定值。 +api_key = "abot-maibot" + +# 平台名: +# 1. 会进入握手头 x-platform; +# 2. 也会进入 message_info.platform 与 message_dim.platform; +# 3. 建议保持稳定,后续日志、路由和排障都会更清晰。 +platform_name = "abot-maibot" + +# 是否校验 SSL 证书: +# 1. 当前局域网 http/ws 直连场景保持 true 也不会有问题; +# 2. 如果以后切成 wss 且用自签名证书,可临时改成 false; +# 3. 生产上仍建议优先使用有效证书。 verify_ssl = true + +# 建连超时(秒): +# 1. 控制首次连接和重连时的握手超时; +# 2. 局域网场景通常 10~15 秒足够; +# 3. 太短容易误判网络抖动,太长则故障恢复慢。 +connect_timeout = 15 + +# 接收超时(秒): +# 1. 长连接空闲时不代表异常,因此这里要比 connect_timeout 大很多; +# 2. aiohttp 会结合 heartbeat 做活性检测; +# 3. 当前默认 120 秒,适合持续在线聊天场景。 +receive_timeout = 120 + +# 心跳间隔(秒): +# 1. 用于保持官方 API Server 长连接活性; +# 2. 太短会增加无意义流量,太长会让断线发现变慢; +# 3. 当前默认 20 秒,与 maim_message 默认配置接近。 +heartbeat_interval = 20 + +# 重连间隔(秒): +# 1. 长连接断开后,这里控制多久再尝试重连; +# 2. 当前默认 5 秒,适合局域网服务; +# 3. 如果后面网络更不稳定,可以适当调大。 +reconnect_delay = 5 + +# 本地待发送消息队列长度: +# 1. 这是 abot -> MaiBot 的缓冲区; +# 2. 队列满了以后,新消息会直接丢弃并打印详细日志; +# 3. 默认 500,足够覆盖绝大多数群聊高峰。 +queue_maxsize = 500 + +# 单条消息发送到 MaiBot 的最大重试次数: +# 1. 只处理瞬时断线这类临时故障; +# 2. 超过次数后会落日志并丢弃,避免消息永久堆积; +# 3. 当前默认 3 次,兼顾稳定性与可控性。 +max_send_retries = 3 diff --git a/plugins/maibot_adapter/main.py b/plugins/maibot_adapter/main.py index 716e390..3f288ad 100644 --- a/plugins/maibot_adapter/main.py +++ b/plugins/maibot_adapter/main.py @@ -1,5 +1,5 @@ +import asyncio import json -import re import time import uuid from typing import Any, Dict, List, Optional, Tuple @@ -8,17 +8,16 @@ import aiohttp from base.plugin_common.message_plugin_interface import MessagePluginInterface from base.plugin_common.plugin_interface import PluginStatus -from utils.decorator.plugin_decorators import plugin_stats_decorator -from utils.decorator.rate_limit_decorator import group_feature_rate_limit from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from wechat_ipad import WechatAPIClient +from wechat_ipad.models.message import AppMessageType, MessageType, WxMessage class MaiBotAdapterPlugin(MessagePluginInterface): - """将外部部署的 MaiBot WebUI 聊天能力桥接到 abot 的消息插件。""" + """将 abot 的微信消息桥接到 MaiBot 官方 API Server。""" FEATURE_KEY = "MAIBOT_CHAT" - FEATURE_DESCRIPTION = "🤖 MaiBot 对话桥接 [麦麦, maibot, mai]" + FEATURE_DESCRIPTION = "🤖 MaiBot 对话桥接 [官方API Server 接入]" @property def name(self) -> str: @@ -26,11 +25,11 @@ class MaiBotAdapterPlugin(MessagePluginInterface): @property def version(self) -> str: - return "1.0.0" + return "2.0.0" @property def description(self) -> str: - return "通过 MaiBot WebUI 的统一 WebSocket 协议接入外部 MaiBot 对话能力" + return "默认采集群聊/私聊消息到 MaiBot,并按开关决定是否输出 MaiBot 回复" @property def author(self) -> str: @@ -38,12 +37,15 @@ class MaiBotAdapterPlugin(MessagePluginInterface): @property def command_prefix(self) -> Optional[str]: - """命令插件沿用现有空前缀约定,直接匹配第一个词。""" - return "" + # 这个插件已经不是命令式插件: + # 1. 现在它的职责是“消息旁路采集 + 可选回复输出”; + # 2. 触发入口不再依赖“麦麦 xxx”这种命令; + # 3. 因此这里返回 None,让插件管理器不要按命令插件路径理解它。 + return None @property def commands(self) -> List[str]: - return self._commands + return [] @property def feature_key(self) -> Optional[str]: @@ -55,392 +57,741 @@ class MaiBotAdapterPlugin(MessagePluginInterface): def __init__(self): super().__init__() - # 注册权限特征,便于按群开关此插件。 self.feature = self.register_feature() - self._commands: List[str] = ["麦麦", "maibot", "mai"] + + # 基础开关。 self._enabled = True - self._allow_group_at = True - self._command_tip = "麦麦 你好" - self._server_url = "" - self._access_token = "" - self._connect_timeout = 15 - self._reply_timeout = 90 - self._session_scope = "room" + self._collect_group_messages = True + self._collect_private_messages = True + self._enable_reply_output = True + self._reply_group_messages = True + self._reply_private_messages = True + self._respect_group_feature_switch = True + self._mention_user_on_group_reply = False + + # 协议与连接配置。 + self._api_server_ws_url = "" + self._api_key = "" + self._platform_name = "abot-maibot" self._verify_ssl = True + self._connect_timeout = 15 + self._receive_timeout = 120 + self._heartbeat_interval = 20 + self._reconnect_delay = 5 + self._queue_maxsize = 500 + self._max_send_retries = 3 + + # 运行时状态。 self._config_ready = False + self._runtime_started = False + self._runtime_lock: Optional[asyncio.Lock] = None + self._outbound_queue: Optional[asyncio.Queue] = None + self._connected_event: Optional[asyncio.Event] = None + self._tasks: List[asyncio.Task] = [] + self._client_session: Optional[aiohttp.ClientSession] = None + self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None + self._connection_uuid = "" + self._last_bot: Optional[WechatAPIClient] = None def initialize(self, context: Dict[str, Any]) -> bool: - """初始化插件配置与上下文引用。""" + """加载插件配置。""" self.LOG.debug(f"正在初始化 {self.name} 插件...") - maibot_config = self._config.get("MaiBotAdapter", {}) or {} - self._commands = [str(item).strip() for item in maibot_config.get("commands", self._commands) if str(item).strip()] - self._enabled = bool(maibot_config.get("enable", True)) - self._allow_group_at = bool(maibot_config.get("allow_group_at", True)) - self._command_tip = str(maibot_config.get("command-tip", self._command_tip)).strip() - self._server_url = str(maibot_config.get("server_url", "") or "").rstrip("/") - self._access_token = str(maibot_config.get("access_token", "") or "").strip() - self._connect_timeout = max(5, int(maibot_config.get("connect_timeout", 15) or 15)) - self._reply_timeout = max(10, int(maibot_config.get("reply_timeout", 90) or 90)) - self._session_scope = str(maibot_config.get("session_scope", "room") or "room").strip().lower() - self._verify_ssl = bool(maibot_config.get("verify_ssl", True)) + plugin_config = self._config.get("MaiBotAdapter", {}) or {} + self._enabled = bool(plugin_config.get("enable", True)) + self._collect_group_messages = bool(plugin_config.get("collect_group_messages", True)) + self._collect_private_messages = bool(plugin_config.get("collect_private_messages", True)) + self._enable_reply_output = bool(plugin_config.get("enable_reply_output", True)) + self._reply_group_messages = bool(plugin_config.get("reply_group_messages", True)) + self._reply_private_messages = bool(plugin_config.get("reply_private_messages", True)) + self._respect_group_feature_switch = bool(plugin_config.get("respect_group_feature_switch", True)) + self._mention_user_on_group_reply = bool(plugin_config.get("mention_user_on_group_reply", False)) - # 这里不因为配置缺失而让插件初始化失败: - # 1. 这样插件可以先被系统正常加载,后续热更新 TOML 即可生效; - # 2. 真正处理消息时会再次检查配置完备性,并打印更清晰的日志。 - self._config_ready = bool(self._server_url and self._access_token) + self._api_server_ws_url = str(plugin_config.get("api_server_ws_url", "") or "").strip() + self._api_key = str(plugin_config.get("api_key", "") or "").strip() + self._platform_name = str(plugin_config.get("platform_name", "abot-maibot") or "abot-maibot").strip() + self._verify_ssl = bool(plugin_config.get("verify_ssl", True)) + self._connect_timeout = max(5, int(plugin_config.get("connect_timeout", 15) or 15)) + self._receive_timeout = max(30, int(plugin_config.get("receive_timeout", 120) or 120)) + self._heartbeat_interval = max(10, int(plugin_config.get("heartbeat_interval", 20) or 20)) + self._reconnect_delay = max(1, int(plugin_config.get("reconnect_delay", 5) or 5)) + self._queue_maxsize = max(50, int(plugin_config.get("queue_maxsize", 500) or 500)) + self._max_send_retries = max(1, int(plugin_config.get("max_send_retries", 3) or 3)) + + # 官方 API Server 至少需要 ws 地址与 api_key: + # 1. 地址用于建立长期 WebSocket; + # 2. api_key 会进入握手头和消息维度,MaiBot 用它做用户路由; + # 3. 两者任一缺失,都不适合继续放消息进队列。 + self._config_ready = bool(self._api_server_ws_url and self._api_key) if not self._config_ready: - self.LOG.warning( - f"[{self.name}] 当前 server_url/access_token 未配置完整,插件会加载成功但不会实际处理消息" - ) + self.LOG.warning(f"[{self.name}] api_server_ws_url/api_key 未配置完整,插件会加载成功但不会转发消息") - self.LOG.debug( - f"[{self.name}] 初始化完成: commands={self._commands}, " - f"allow_group_at={self._allow_group_at}, server_url={self._server_url}, " - f"session_scope={self._session_scope}, verify_ssl={self._verify_ssl}" + self.LOG.info( + f"[{self.name}] 初始化完成: enabled={self._enabled}, " + f"collect_group_messages={self._collect_group_messages}, collect_private_messages={self._collect_private_messages}, " + f"enable_reply_output={self._enable_reply_output}, reply_group_messages={self._reply_group_messages}, " + f"reply_private_messages={self._reply_private_messages}, api_server_ws_url={self._api_server_ws_url}, " + f"platform_name={self._platform_name}, queue_maxsize={self._queue_maxsize}" ) return True def start(self) -> bool: self.status = PluginStatus.RUNNING - self.LOG.debug(f"[{self.name}] 插件已启动") + self.LOG.info(f"[{self.name}] 插件已启动,等待首条消息后再懒启动长连接") return True def stop(self) -> bool: + # stop 是同步接口,不能直接 await 清理: + # 1. 这里先把状态切为 STOPPED,让后台协程自行退出; + # 2. 再取消当前任务,尽量让连接尽快释放; + # 3. 真正的 session/ws 关闭逻辑放在异步清理函数里做“尽力而为”处理。 self.status = PluginStatus.STOPPED + for task in list(self._tasks): + if not task.done(): + task.cancel() + self._tasks = [] + self._runtime_started = False self.LOG.info(f"[{self.name}] 插件已停止") return True def can_process(self, message: Dict[str, Any]) -> bool: - """判断当前消息是否该由 MaiBot 对话插件接管。""" - if not self._enabled: + """只要消息允许被采集,就让插件旁路处理一次。""" + if not self._enabled or not self._config_ready: return False - if not self._config_ready: + if not self._is_supported_message(message): return False - content = str(message.get("content", "") or "").strip() - if not content: + full_msg = message.get("full_wx_msg") + if isinstance(full_msg, WxMessage) and full_msg.from_self(): return False - first_token = content.split(" ", 1)[0] - if first_token in self._commands: - return True + roomid = str(message.get("roomid", "") or "").strip() + if roomid: + return self._collect_group_messages + return self._collect_private_messages - if self._allow_group_at and bool(message.get("is_at", False)) and str(message.get("roomid", "") or "").strip(): - return True - - return False - - @plugin_stats_decorator(plugin_name="MaiBot对话") - @group_feature_rate_limit(max_per_minute=5, feature_key=FEATURE_KEY) async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - """处理命令对话或群聊 @ 对话,并将问题转发到远端 MaiBot。""" - content = str(message.get("content", "") or "").strip() - sender = str(message.get("sender", "") or "").strip() - roomid = str(message.get("roomid", "") or "").strip() - target = roomid if roomid else sender - gbm: GroupBotManager = message.get("gbm") + """将消息写入 MaiBot 采集队列,但默认不阻断后续插件。""" + await self._ensure_runtime_started() + bot: WechatAPIClient = message.get("bot") + if bot is not None: + self._last_bot = bot - self.LOG.info( - f"[{self.name}] 收到消息: sender={sender}, roomid={roomid}, " - f"is_at={message.get('is_at', False)}, content_preview={content[:120]}" - ) + outbound_queue = self._outbound_queue + if outbound_queue is None: + self.LOG.warning(f"[{self.name}] outbound_queue 尚未就绪,忽略本次消息") + return False, "runtime_not_ready" - if roomid and gbm and self.feature and gbm.get_group_permission(target, self.feature) == PermissionStatus.DISABLED: - self.LOG.info(f"[{self.name}] 群 {target} 未启用功能权限,跳过处理") - return False, "没有权限" - - query = self._extract_query(message) - if not query: - if bot: - await bot.send_text_message(target, self._command_tip, sender if roomid else "") - return False, "没有提供问题内容" - - session_key = self._build_session_key(roomid=roomid, sender=sender) - self.LOG.info( - f"[{self.name}] 准备请求 MaiBot: session_key={session_key}, " - f"query_len={len(query)}, query_preview={query[:120]}" - ) + normalized_content = self._normalize_message_content(message) + if not normalized_content: + self.LOG.debug(f"[{self.name}] 消息无有效文本表示,跳过转发") + return False, "empty_content" + payload = self._build_outbound_payload(message=message, normalized_content=normalized_content) try: - response = await self._query_maibot(session_key=session_key, query=query) - response = self._normalize_response_text(response) - if not response: - self.LOG.warning(f"[{self.name}] MaiBot 返回空响应: session_key={session_key}") - return False, "MaiBot 返回空响应" - - if bot: - await bot.send_text_message(target, response, sender if roomid else "") + outbound_queue.put_nowait(payload) self.LOG.info( - f"[{self.name}] MaiBot 回复成功: session_key={session_key}, " - f"reply_len={len(response)}, reply_preview={response[:120]}" + f"[{self.name}] 消息已入队: roomid={payload['roomid']}, sender={payload['sender']}, " + f"msg_type={payload['message_type']}, queue_size={outbound_queue.qsize()}, " + f"content_preview={normalized_content[:120]}" ) - return True, "发送成功" - except Exception as exc: - self.LOG.exception(f"[{self.name}] 请求 MaiBot 失败: {exc}") - if bot: - await bot.send_text_message(target, f"❌MaiBot 对话失败:{exc}", sender if roomid else "") - return False, f"MaiBot 对话失败: {exc}" + except asyncio.QueueFull: + self.LOG.warning( + f"[{self.name}] 消息队列已满,丢弃本次转发: roomid={payload['roomid']}, sender={payload['sender']}, " + f"msg_type={payload['message_type']}, content_preview={normalized_content[:120]}" + ) + return False, "queue_full" - def _extract_query(self, message: Dict[str, Any]) -> str: - """从命令消息或 @ 消息中提取真正发给 MaiBot 的文本。""" - content = str(message.get("content", "") or "").strip() - roomid = str(message.get("roomid", "") or "").strip() - is_at = bool(message.get("is_at", False)) - first_token = content.split(" ", 1)[0] if content else "" + # 这里故意返回 False: + # 1. 该插件的主职责是“默认把消息送给 MaiBot 做长期上下文采集”; + # 2. 它不应该吞掉本地其它插件,例如 ai_auto_response、Dify、命令插件; + # 3. 是否真的在微信里说话,由 MaiBot 返回消息和本插件的 reply 开关共同决定。 + return False, "queued" - if first_token in self._commands: - parts = content.split(" ", 1) - return parts[1].strip() if len(parts) > 1 else "" + async def _ensure_runtime_started(self) -> None: + """在首条消息到来时懒启动后台协程。""" + if self._runtime_started: + return - if is_at and roomid: - # 兼容微信里常见的 “@机器人[空白]内容” 形式,去掉最前面的 @ 提及部分。 - return re.sub(r"^@.*?[\u2005|\s]+", "", content).strip() + if self._runtime_lock is None: + self._runtime_lock = asyncio.Lock() - return "" + async with self._runtime_lock: + if self._runtime_started: + return - def _build_session_key(self, roomid: str, sender: str) -> str: - """按配置生成 MaiBot 会话键,决定上下文是按群共享还是按人隔离。""" - normalized_roomid = roomid or "private" - normalized_sender = sender or "unknown" + self._outbound_queue = asyncio.Queue(maxsize=self._queue_maxsize) + self._connected_event = asyncio.Event() + self._connection_uuid = f"abot_{uuid.uuid4().hex}" + self._tasks = [ + asyncio.create_task(self._connection_loop(), name="maibot_adapter_connection_loop"), + asyncio.create_task(self._sender_loop(), name="maibot_adapter_sender_loop"), + ] + self._runtime_started = True + self.LOG.info( + f"[{self.name}] 后台运行时已启动: connection_uuid={self._connection_uuid}, " + f"queue_maxsize={self._queue_maxsize}, heartbeat_interval={self._heartbeat_interval}" + ) - if self._session_scope == "sender": - return f"abot:{normalized_roomid}:{normalized_sender}" - return f"abot:{normalized_roomid}" + async def _connection_loop(self) -> None: + """常驻维护官方 API Server 长连接,并负责接收 MaiBot 回包。""" + while self.status == PluginStatus.RUNNING: + try: + await self._connect_websocket() + await self._receive_loop() + except asyncio.CancelledError: + raise + except Exception as exc: + self.LOG.exception(f"[{self.name}] 连接循环异常,稍后重连: {exc}") + finally: + if self._connected_event is not None: + self._connected_event.clear() + await self._close_websocket_only() - @staticmethod - def _normalize_response_text(response: str) -> str: - """简单清理回复文本,避免把 WS 结构层遗留空白直接发回群里。""" - normalized_text = str(response or "").replace("\r\n", "\n").strip() - normalized_text = re.sub(r"\n{3,}", "\n\n", normalized_text) - return normalized_text + if self.status != PluginStatus.RUNNING: + break + await asyncio.sleep(self._reconnect_delay) - async def _query_maibot(self, session_key: str, query: str) -> str: - """执行一次完整的 MaiBot HTTP 认证 + WS 会话对话流程。""" - client_timeout = aiohttp.ClientTimeout(total=self._reply_timeout + self._connect_timeout + 10) - cookie_jar = aiohttp.CookieJar(unsafe=True) + await self._close_session() + + async def _connect_websocket(self) -> None: + """按 MaiBot 官方 API Server 协议建立 WebSocket。""" + if self._client_session is None or self._client_session.closed: + timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._connect_timeout, sock_read=None) + self._client_session = aiohttp.ClientSession(timeout=timeout) + + headers = { + "x-uuid": self._connection_uuid or f"abot_{uuid.uuid4().hex}", + "x-apikey": self._api_key, + "x-platform": self._platform_name, + } ssl_option = None if self._verify_ssl else False - async with aiohttp.ClientSession(timeout=client_timeout, cookie_jar=cookie_jar) as session: - await self._login_webui(session=session, ssl_option=ssl_option) - ws_token = await self._fetch_ws_token(session=session, ssl_option=ssl_option) - ws_url = self._build_ws_url(ws_token) + self.LOG.info( + f"[{self.name}] 正在连接 MaiBot API Server: url={self._api_server_ws_url}, " + f"platform={self._platform_name}, connection_uuid={headers['x-uuid']}" + ) - self.LOG.info(f"[{self.name}] 正在连接 MaiBot WebSocket: ws_url={ws_url}") - async with session.ws_connect( - ws_url, - ssl=ssl_option, - heartbeat=30, - receive_timeout=self._reply_timeout + 10, - timeout=self._connect_timeout, - ) as websocket: - client_session_id = f"{session_key}:{uuid.uuid4().hex[:8]}" + self._websocket = await self._client_session.ws_connect( + self._api_server_ws_url, + headers=headers, + heartbeat=self._heartbeat_interval, + timeout=self._connect_timeout, + receive_timeout=self._receive_timeout, + ssl=ssl_option, + ) - # 这里显式打开逻辑聊天会话,确保后续回复都能按 session 维度关联回来。 - open_request_id = f"open_{uuid.uuid4().hex}" - await websocket.send_json( - { - "op": "call", - "id": open_request_id, - "domain": "chat", - "method": "session.open", - "session": client_session_id, - "data": { - "restore": True, - "user_id": session_key, - "user_name": "ABotBridge", - }, - } - ) - await self._wait_for_call_ok( - websocket=websocket, - request_id=open_request_id, - expected_session=client_session_id, - ) + if self._connected_event is not None: + self._connected_event.set() - send_request_id = f"send_{uuid.uuid4().hex}" - await websocket.send_json( - { - "op": "call", - "id": send_request_id, - "domain": "chat", - "method": "message.send", - "session": client_session_id, - "data": { - "content": query, - "user_name": "ABotBridge", - }, - } - ) - await self._wait_for_call_ok( - websocket=websocket, - request_id=send_request_id, - expected_session=client_session_id, - ) + self.LOG.info( + f"[{self.name}] MaiBot API Server 连接成功: url={self._api_server_ws_url}, " + f"connection_uuid={headers['x-uuid']}" + ) - reply_text = await self._wait_for_bot_message( - websocket=websocket, - expected_session=client_session_id, - ) + async def _receive_loop(self) -> None: + """接收 MaiBot 官方 API Server 返回的消息。""" + websocket = self._websocket + if websocket is None: + raise RuntimeError("WebSocket 尚未连接") - # 关闭逻辑会话是“尽力而为”动作: - # 1. 即使关闭失败,也不影响当前已经拿到的回复; - # 2. 因为 WebSocket 断开后服务端也会清理连接,所以这里不把异常上抛。 - try: - close_request_id = f"close_{uuid.uuid4().hex}" - await websocket.send_json( - { - "op": "call", - "id": close_request_id, - "domain": "chat", - "method": "session.close", - "session": client_session_id, - "data": {}, - } - ) - except Exception as close_exc: - self.LOG.warning(f"[{self.name}] 关闭 MaiBot 逻辑会话失败: {close_exc}") + while self.status == PluginStatus.RUNNING: + message = await websocket.receive() - return reply_text + if message.type == aiohttp.WSMsgType.TEXT: + raw_text = str(message.data or "") + self.LOG.info(f"[{self.name}] 收到 MaiBot 原始消息: {raw_text[:500]}") + payload = self._parse_json_message(raw_text) + if payload is None: + continue + await self._handle_incoming_package(payload) + continue - async def _login_webui(self, session: aiohttp.ClientSession, ssl_option: Any) -> None: - """使用 MaiBot WebUI token 登录,以便后续换取一次性 ws-token。""" - verify_url = f"{self._server_url}/api/webui/auth/verify" - payload = {"token": self._access_token} - self.LOG.info(f"[{self.name}] 正在校验 MaiBot token: verify_url={verify_url}") + if message.type == aiohttp.WSMsgType.CLOSED: + raise RuntimeError("MaiBot WebSocket 已关闭") - async with session.post(verify_url, json=payload, ssl=ssl_option) as response: - response_text = await response.text() - if response.status != 200: - raise RuntimeError(f"MaiBot token 校验失败,HTTP {response.status}: {response_text[:200]}") + if message.type == aiohttp.WSMsgType.ERROR: + raise RuntimeError(f"MaiBot WebSocket 异常: {websocket.exception()}") + self.LOG.debug(f"[{self.name}] 忽略非文本消息类型: {message.type}") + + async def _sender_loop(self) -> None: + """消费出队消息并发送给 MaiBot。""" + while self.status == PluginStatus.RUNNING: + if self._outbound_queue is None: + await asyncio.sleep(0.5) + continue + + payload = await self._outbound_queue.get() try: - response_data = json.loads(response_text) - except json.JSONDecodeError as exc: - raise RuntimeError(f"MaiBot token 校验响应不是合法 JSON: {response_text[:200]}") from exc + await self._send_outbound_payload(payload) + except asyncio.CancelledError: + raise + except Exception as exc: + await self._handle_send_failure(payload, exc) + finally: + self._outbound_queue.task_done() - if not bool(response_data.get("valid")): - raise RuntimeError(f"MaiBot token 无效: {response_data.get('message') or response_text[:200]}") + async def _send_outbound_payload(self, payload: Dict[str, Any]) -> None: + """发送一条标准 sys_std 消息到 MaiBot。""" + if self._connected_event is None: + raise RuntimeError("connected_event 尚未初始化") - async def _fetch_ws_token(self, session: aiohttp.ClientSession, ssl_option: Any) -> str: - """通过已登录的 Cookie 换取一次性 WebSocket 临时 token。""" - token_url = f"{self._server_url}/api/webui/ws-token" - self.LOG.info(f"[{self.name}] 正在申请 MaiBot ws-token: url={token_url}") + await self._connected_event.wait() + websocket = self._websocket + if websocket is None: + raise RuntimeError("WebSocket 尚未连接") - async with session.get(token_url, ssl=ssl_option) as response: - response_text = await response.text() - if response.status != 200: - raise RuntimeError(f"MaiBot ws-token 获取失败,HTTP {response.status}: {response_text[:200]}") + package = { + "ver": 1, + "msg_id": f"msg_{uuid.uuid4().hex[:12]}_{int(time.time())}", + "type": "sys_std", + "meta": { + "sender_user": self._api_key, + "platform": self._platform_name, + "timestamp": time.time(), + }, + "payload": payload["api_message"], + } - try: - response_data = json.loads(response_text) - except json.JSONDecodeError as exc: - raise RuntimeError(f"MaiBot ws-token 响应不是合法 JSON: {response_text[:200]}") from exc + await websocket.send_json(package) + self.LOG.info( + f"[{self.name}] 已发送到 MaiBot: roomid={payload['roomid']}, sender={payload['sender']}, " + f"msg_type={payload['message_type']}, package_id={package['msg_id']}, " + f"content_preview={payload['normalized_content'][:120]}" + ) - if not bool(response_data.get("success")): - raise RuntimeError(f"MaiBot ws-token 获取失败: {response_data.get('message') or response_text[:200]}") + async def _handle_send_failure(self, payload: Dict[str, Any], exc: Exception) -> None: + """发送失败时尝试重试,避免瞬时断线导致消息直接丢失。""" + retry_count = int(payload.get("retry_count", 0) or 0) + 1 + payload["retry_count"] = retry_count - ws_token = str(response_data.get("token", "") or "").strip() - if not ws_token: - raise RuntimeError("MaiBot ws-token 为空") - return ws_token + self.LOG.warning( + f"[{self.name}] 发送到 MaiBot 失败: retry_count={retry_count}/{self._max_send_retries}, " + f"roomid={payload['roomid']}, sender={payload['sender']}, error={exc}" + ) - def _build_ws_url(self, ws_token: str) -> str: - """根据 server_url 自动转换成统一 WebSocket 地址。""" - if self._server_url.startswith("https://"): - base_ws_url = "wss://" + self._server_url[len("https://"):] - elif self._server_url.startswith("http://"): - base_ws_url = "ws://" + self._server_url[len("http://"):] + if self._connected_event is not None: + self._connected_event.clear() + await self._close_websocket_only() + + if retry_count >= self._max_send_retries: + self.LOG.error( + f"[{self.name}] 消息达到最大重试次数,最终丢弃: roomid={payload['roomid']}, " + f"sender={payload['sender']}, content_preview={payload['normalized_content'][:120]}" + ) + return + + if self._outbound_queue is not None: + await self._outbound_queue.put(payload) + + async def _handle_incoming_package(self, package: Dict[str, Any]) -> None: + """处理 MaiBot 返回给 abot 的消息包。""" + package_type = str(package.get("type", "") or "") + package_id = str(package.get("msg_id", "") or "") + + if package_type == "sys_ack": + acked_msg_id = str(((package.get("meta") or {}).get("acked_msg_id")) or "") + self.LOG.info(f"[{self.name}] 收到 MaiBot ACK: package_id={package_id}, acked_msg_id={acked_msg_id}") + return + + if package_type != "sys_std": + self.LOG.info(f"[{self.name}] 忽略非 sys_std 消息: package_type={package_type}, package_id={package_id}") + return + + api_message = package.get("payload") or {} + message_info = api_message.get("message_info") or {} + message_dim = api_message.get("message_dim") or {} + message_segment = api_message.get("message_segment") or {} + + reply_text = self._extract_segment_text(message_segment).strip() + if not reply_text: + self.LOG.info( + f"[{self.name}] MaiBot 返回了空文本或非文本片段,忽略发送: package_id={package_id}, " + f"segment_type={message_segment.get('type')}" + ) + return + + route = self._resolve_reply_route(message_info) + if route is None: + self.LOG.warning( + f"[{self.name}] 无法从 MaiBot 返回消息中解析路由,忽略发送: package_id={package_id}, " + f"message_info={message_info}" + ) + return + + self.LOG.info( + f"[{self.name}] 收到 MaiBot 回复: package_id={package_id}, route_type={route['route_type']}, " + f"target={route['target']}, at_target={route.get('at_target', '')}, " + f"platform={message_dim.get('platform')}, reply_preview={reply_text[:120]}" + ) + + await self._emit_reply(route=route, reply_text=reply_text, api_message=api_message) + + async def _emit_reply(self, route: Dict[str, str], reply_text: str, api_message: Dict[str, Any]) -> None: + """按配置决定是否把 MaiBot 回复真正发回微信。""" + if not self._enable_reply_output: + self.LOG.info(f"[{self.name}] enable_reply_output=false,仅采集不发回微信") + return + + bot = self._last_bot or getattr(self, "bot", None) + if bot is None: + self.LOG.warning(f"[{self.name}] 当前没有可用的 bot 实例,无法发送 MaiBot 回复") + return + + target = str(route.get("target", "") or "") + route_type = str(route.get("route_type", "") or "") + at_target = str(route.get("at_target", "") or "") + + if route_type == "group": + if not self._reply_group_messages: + self.LOG.info(f"[{self.name}] reply_group_messages=false,群回复已跳过: target={target}") + return + if self._respect_group_feature_switch and not self._group_reply_allowed(target): + self.LOG.info(f"[{self.name}] 群功能开关未启用,仅采集不回群: target={target}") + return + if self._mention_user_on_group_reply and at_target: + await bot.send_at_message(target, reply_text, [at_target]) + else: + await bot.send_text_message(target, reply_text, at_target if at_target else "") + self.LOG.info( + f"[{self.name}] 已发出 MaiBot 群回复: target={target}, at_target={at_target}, " + f"reply_len={len(reply_text)}" + ) + return + + if route_type == "private": + if not self._reply_private_messages: + self.LOG.info(f"[{self.name}] reply_private_messages=false,私聊回复已跳过: target={target}") + return + await bot.send_text_message(target, reply_text, "") + self.LOG.info(f"[{self.name}] 已发出 MaiBot 私聊回复: target={target}, reply_len={len(reply_text)}") + return + + self.LOG.warning(f"[{self.name}] 未知路由类型,无法发送 MaiBot 回复: route={route}, api_message={api_message}") + + def _group_reply_allowed(self, roomid: str) -> bool: + """群开关只影响“说不说”,不影响“收不收”。""" + if not roomid or not self.feature: + return True + + try: + permission = GroupBotManager.get_group_permission(roomid, self.feature) + return permission != PermissionStatus.DISABLED + except Exception: + # 这里故意保守放行: + # 1. 如果权限系统异常,我们不希望把功能直接打成完全失效; + # 2. 同时上层已有总开关 enable_reply_output 可兜底; + # 3. 真出问题时,日志里会保留异常栈便于后续修复。 + self.LOG.exception(f"[{self.name}] 读取群功能权限失败,默认允许发群回复: roomid={roomid}") + return True + + def _build_outbound_payload(self, message: Dict[str, Any], normalized_content: str) -> Dict[str, Any]: + """把微信消息包装成 MaiBot 官方 API Server 的 APIMessageBase 结构。""" + full_msg = message.get("full_wx_msg") + roomid = str(message.get("roomid", "") or "").strip() + sender = str(message.get("sender", "") or "").strip() + msg_type = self._resolve_message_type(message) + timestamp = self._resolve_message_timestamp(message, full_msg) + message_id = self._resolve_message_id(message, full_msg) + + sender_name = self._resolve_sender_name(message, sender) + group_name = self._resolve_group_name(message, roomid) + + sender_info: Dict[str, Any] = { + "user_info": { + "platform": self._platform_name, + "user_id": sender, + "user_nickname": sender_name, + } + } + receiver_info: Dict[str, Any] = {} + + if roomid: + receiver_info["group_info"] = { + "platform": self._platform_name, + "group_id": roomid, + "group_name": group_name, + } + # 群聊里额外把 user_info 也带上,目的是给 MaiBot 提供“当前这句话到底是谁说的”: + # 1. sender_info 是纯发送者视角; + # 2. receiver_info.group_info 是群路由视角; + # 3. receiver_info.user_info 则能帮助服务端在需要时直接知道“被路由回群时应 @ 谁”。 + receiver_info["user_info"] = { + "platform": self._platform_name, + "user_id": sender, + "user_nickname": sender_name, + } else: - raise RuntimeError(f"不支持的 MaiBot server_url: {self._server_url}") - return f"{base_ws_url}/ws?token={ws_token}" + receiver_info["user_info"] = { + "platform": self._platform_name, + "user_id": sender, + "user_nickname": sender_name, + } - async def _wait_for_call_ok( - self, - websocket: aiohttp.ClientWebSocketResponse, - request_id: str, - expected_session: str, - ) -> Dict[str, Any]: - """等待某次 WebSocket call 的确认响应。""" - deadline = time.time() + self._reply_timeout - while time.time() < deadline: - message = await self._receive_ws_json(websocket) + api_message = { + "message_info": { + "platform": self._platform_name, + "message_id": message_id, + "time": timestamp, + "format_info": { + "content_format": ["text"], + "accept_format": ["text"], + }, + "additional_config": { + "source": "abot_maibot_adapter", + "is_at": bool(message.get("is_at", False)), + "wx_message_type": msg_type, + "collect_only": True, + }, + "sender_info": sender_info, + "receiver_info": receiver_info, + }, + "message_segment": { + "type": "text", + "data": normalized_content, + }, + "message_dim": { + "api_key": self._api_key, + "platform": self._platform_name, + }, + } - # 统一 WebSocket 的准备事件不参与业务判断,直接忽略。 - if message.get("domain") == "system" and message.get("event") == "ready": - continue + return { + "roomid": roomid, + "sender": sender, + "message_type": msg_type, + "normalized_content": normalized_content, + "retry_count": 0, + "api_message": api_message, + } - if message.get("op") != "response": - continue + def _resolve_reply_route(self, message_info: Dict[str, Any]) -> Optional[Dict[str, str]]: + """从 MaiBot 返回的 message_info 中解析微信路由。""" + receiver_info = message_info.get("receiver_info") or {} + sender_info = message_info.get("sender_info") or {} - if str(message.get("id") or "") != request_id: - continue + receiver_group_info = receiver_info.get("group_info") or {} + receiver_user_info = receiver_info.get("user_info") or {} + sender_user_info = sender_info.get("user_info") or {} - if not bool(message.get("ok")): - error_info = message.get("error") or {} - raise RuntimeError(f"MaiBot 调用失败: {error_info}") + group_id = str(receiver_group_info.get("group_id", "") or "").strip() + receiver_user_id = str(receiver_user_info.get("user_id", "") or "").strip() + sender_user_id = str(sender_user_info.get("user_id", "") or "").strip() - data = message.get("data") or {} - if expected_session and str(data.get("session") or expected_session) != expected_session: - self.LOG.warning( - f"[{self.name}] 收到 session 不匹配的响应: expected={expected_session}, actual={data.get('session')}" - ) - return message + if group_id: + return { + "route_type": "group", + "target": group_id, + "at_target": receiver_user_id or sender_user_id, + } - raise TimeoutError(f"等待 MaiBot 请求确认超时: request_id={request_id}") + if receiver_user_id: + return { + "route_type": "private", + "target": receiver_user_id, + "at_target": "", + } - async def _wait_for_bot_message( - self, - websocket: aiohttp.ClientWebSocketResponse, - expected_session: str, - ) -> str: - """等待目标会话真正的机器人消息事件。""" - deadline = time.time() + self._reply_timeout - while time.time() < deadline: - message = await self._receive_ws_json(websocket) + if sender_user_id: + return { + "route_type": "private", + "target": sender_user_id, + "at_target": "", + } - if message.get("op") != "event": - continue + return None - if str(message.get("domain") or "") != "chat": - continue + def _normalize_message_content(self, message: Dict[str, Any]) -> str: + """将不同微信消息类型规整成适合 MaiBot 理解的文本。""" + full_msg = message.get("full_wx_msg") + msg_type = self._resolve_message_type(message) - if str(message.get("session") or "") != expected_session: - continue + if isinstance(full_msg, WxMessage): + if full_msg.msg_type == MessageType.TEXT: + return str(full_msg.content.clean_content or "").strip() - event_name = str(message.get("event") or "").strip() - data = message.get("data") or {} - data_type = str(data.get("type") or "").strip() + if full_msg.msg_type == MessageType.IMAGE: + image_content = full_msg.get_image_content() + if image_content: + return f"[图片] md5={image_content.md5} size={image_content.length}" + return "[图片]" - # typing / history / system / user_message 都属于过程事件,继续等待即可。 - if event_name == "bot_message" and data_type == "bot_message": - reply_text = str(data.get("content", "") or "").strip() - if reply_text: - return reply_text + if full_msg.msg_type == MessageType.VOICE: + voice_content = full_msg.get_voice_content() + if voice_content: + return f"[语音] 时长={voice_content.voice_length}ms" + return "[语音]" - if event_name == "error" or data_type == "error": - raise RuntimeError(str(data.get("content") or "MaiBot 返回错误事件")) + if full_msg.msg_type == MessageType.VIDEO: + video_content = full_msg.get_video_content() + if video_content: + return f"[视频] 时长={video_content.play_length}ms size={video_content.length}" + return "[视频]" - raise TimeoutError(f"等待 MaiBot 回复超时: session={expected_session}") + if full_msg.msg_type == MessageType.LOCATION: + location_content = full_msg.get_location_content() + if location_content: + return f"[位置] {location_content.label}" + return "[位置]" - async def _receive_ws_json(self, websocket: aiohttp.ClientWebSocketResponse) -> Dict[str, Any]: - """从 WebSocket 中读取一条 JSON 消息,并统一处理异常场景。""" - message = await websocket.receive() - if message.type == aiohttp.WSMsgType.TEXT: + if full_msg.msg_type == MessageType.APP: + app_type = full_msg.get_app_message_type() + if app_type == AppMessageType.LINK: + return f"[链接分享] {full_msg.content.clean_content or ''}".strip() + if app_type == AppMessageType.MINIPROGRAM: + return "[小程序]" + if app_type == AppMessageType.FILE: + return "[文件]" + if app_type == AppMessageType.QUOTE: + return f"[引用消息] {full_msg.content.clean_content or ''}".strip() + return f"[应用消息] {app_type.name if app_type else msg_type}" + + if full_msg.msg_type in (MessageType.EMOTICON, MessageType.EMOJI): + return "[表情]" + + if full_msg.msg_type in (MessageType.SYSTEM, MessageType.SYSTEM_NOTIFY, MessageType.RECALLED): + content = str(full_msg.content.clean_content or full_msg.content.raw_content or "").strip() + return f"[系统消息] {content}".strip() + + raw_content = str(message.get("content", "") or "").strip() + if raw_content: + return raw_content + return f"[{msg_type}]" + + def _resolve_message_type(self, message: Dict[str, Any]) -> str: + """统一导出消息类型名称,方便日志与 additional_config 使用。""" + full_msg = message.get("full_wx_msg") + if isinstance(full_msg, WxMessage) and full_msg.msg_type: + return str(full_msg.msg_type.name or "UNKNOWN") + + msg_type = message.get("type") + if isinstance(msg_type, MessageType): + return str(msg_type.name or "UNKNOWN") + return str(msg_type or "UNKNOWN") + + def _resolve_message_timestamp(self, message: Dict[str, Any], full_msg: Any) -> float: + """优先使用微信原始消息时间,缺失时再回落到当前时间。""" + if isinstance(full_msg, WxMessage) and getattr(full_msg, "create_time", 0): try: - payload = json.loads(message.data) - except json.JSONDecodeError as exc: - raise RuntimeError(f"MaiBot WebSocket 返回了非法 JSON: {message.data[:200]}") from exc - if isinstance(payload, dict): - return payload - raise RuntimeError(f"MaiBot WebSocket 返回了非对象 JSON: {payload}") + return float(full_msg.create_time) + except (TypeError, ValueError): + pass - if message.type == aiohttp.WSMsgType.CLOSED: - raise RuntimeError("MaiBot WebSocket 已关闭") + raw_timestamp = message.get("timestamp") + if raw_timestamp not in (None, ""): + try: + return float(raw_timestamp) + except (TypeError, ValueError): + pass - if message.type == aiohttp.WSMsgType.ERROR: - raise RuntimeError(f"MaiBot WebSocket 出错: {websocket.exception()}") + return time.time() + + def _resolve_message_id(self, message: Dict[str, Any], full_msg: Any) -> str: + """尽量保留微信原始 msg_id,便于服务端做去重与日志排查。""" + if isinstance(full_msg, WxMessage) and getattr(full_msg, "msg_id", None): + return str(full_msg.msg_id) + + roomid = str(message.get("roomid", "") or "") + sender = str(message.get("sender", "") or "") + return f"abot_{roomid}_{sender}_{uuid.uuid4().hex[:12]}" + + def _resolve_sender_name(self, message: Dict[str, Any], sender: str) -> str: + """从现有上下文里尽量拿到更友好的显示名。""" + all_contacts = message.get("all_contacts", {}) or {} + sender_name = str(all_contacts.get(sender, "") or "").strip() + if sender_name: + return sender_name + return sender or "unknown" + + def _resolve_group_name(self, message: Dict[str, Any], roomid: str) -> str: + """群名称仅作为增强上下文,不影响业务路由。""" + if not roomid: + return "" + all_contacts = message.get("all_contacts", {}) or {} + return str(all_contacts.get(roomid, "") or roomid) + + def _extract_segment_text(self, segment: Dict[str, Any]) -> str: + """把 MaiBot 返回的消息片段递归拼成文本。""" + segment_type = str(segment.get("type", "") or "") + data = segment.get("data") + + if segment_type == "text": + return str(data or "") + + if segment_type == "seglist" and isinstance(data, list): + parts: List[str] = [] + for item in data: + if isinstance(item, dict): + parts.append(self._extract_segment_text(item)) + return "".join(parts) + + if segment_type == "image": + return "[图片]" + + if segment_type == "emoji": + return "[表情]" + + return str(data or "") + + def _parse_json_message(self, raw_text: str) -> Optional[Dict[str, Any]]: + """统一处理 JSON 解析异常,避免接收循环被坏包打断。""" + try: + payload = json.loads(raw_text) + except json.JSONDecodeError: + self.LOG.warning(f"[{self.name}] 收到非法 JSON,已忽略: {raw_text[:300]}") + return None + + if not isinstance(payload, dict): + self.LOG.warning(f"[{self.name}] 收到非对象 JSON,已忽略: {payload}") + return None + return payload + + def _is_supported_message(self, message: Dict[str, Any]) -> bool: + """仅接收 MaiBot 当前最适合做上下文理解的消息类型。""" + full_msg = message.get("full_wx_msg") + if isinstance(full_msg, WxMessage): + return full_msg.msg_type in { + MessageType.TEXT, + MessageType.IMAGE, + MessageType.VOICE, + MessageType.VIDEO, + MessageType.LOCATION, + MessageType.APP, + MessageType.EMOTICON, + MessageType.EMOJI, + MessageType.SYSTEM, + MessageType.SYSTEM_NOTIFY, + MessageType.RECALLED, + } + + msg_type = message.get("type") + if isinstance(msg_type, MessageType): + return True + return bool(message.get("content")) + + async def _close_websocket_only(self) -> None: + """关闭当前 websocket,但保留 session 供重连复用。""" + if self._websocket is None: + return + try: + await self._websocket.close() + except Exception as exc: + self.LOG.warning(f"[{self.name}] 关闭 WebSocket 失败: {exc}") + finally: + self._websocket = None + + async def _close_session(self) -> None: + """关闭 aiohttp session。""" + await self._close_websocket_only() + if self._client_session is None: + return + try: + await self._client_session.close() + except Exception as exc: + self.LOG.warning(f"[{self.name}] 关闭 ClientSession 失败: {exc}") + finally: + self._client_session = None - raise RuntimeError(f"MaiBot WebSocket 返回了不支持的消息类型: {message.type}")