变更项:\n1. 新增 _sanitize_reply_text 方法,对回复文本做发送前净化。\n2. 增加规则:清理回复开头的长数字前缀(>=6位)及其后随标点/空白,避免 message_id 等内部标识外泄到群聊。\n3. 增加前导标点二次修整,去除数字剥离后残留的逗号/冒号等噪音字符。\n4. 在 _handle_incoming_package 中接入净化流程,保证实际发送前统一生效。
925 lines
41 KiB
Python
925 lines
41 KiB
Python
import asyncio
|
||
import json
|
||
import re
|
||
import time
|
||
import uuid
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import aiohttp
|
||
|
||
from base.plugin_common.message_plugin_interface import MessagePluginInterface
|
||
from base.plugin_common.plugin_interface import PluginStatus
|
||
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
|
||
from wechat_ipad import WechatAPIClient
|
||
from wechat_ipad.models.message import AppMessageType, MessageType, WxMessage
|
||
|
||
|
||
class MaiBotAdapterPlugin(MessagePluginInterface):
|
||
"""将 abot 的微信消息桥接到 MaiBot 官方 API Server。"""
|
||
|
||
FEATURE_KEY = "MAIBOT_CHAT"
|
||
FEATURE_DESCRIPTION = "🤖 MaiBot 对话桥接 [官方API Server 接入]"
|
||
|
||
@property
|
||
def name(self) -> str:
|
||
return "MaiBot对话"
|
||
|
||
@property
|
||
def version(self) -> str:
|
||
return "2.0.0"
|
||
|
||
@property
|
||
def description(self) -> str:
|
||
return "默认采集群聊/私聊消息到 MaiBot,并按开关决定是否输出 MaiBot 回复"
|
||
|
||
@property
|
||
def author(self) -> str:
|
||
return "Codex"
|
||
|
||
@property
|
||
def command_prefix(self) -> Optional[str]:
|
||
# 这个插件已经不是命令式插件:
|
||
# 1. 现在它的职责是“消息旁路采集 + 可选回复输出”;
|
||
# 2. 触发入口不再依赖“麦麦 xxx”这种命令;
|
||
# 3. 因此这里返回 None,让插件管理器不要按命令插件路径理解它。
|
||
return None
|
||
|
||
@property
|
||
def commands(self) -> List[str]:
|
||
return []
|
||
|
||
@property
|
||
def feature_key(self) -> Optional[str]:
|
||
return self.FEATURE_KEY
|
||
|
||
@property
|
||
def feature_description(self) -> Optional[str]:
|
||
return self.FEATURE_DESCRIPTION
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.feature = self.register_feature()
|
||
|
||
# 基础开关。
|
||
self._enabled = True
|
||
self._collect_group_messages = True
|
||
self._collect_private_messages = True
|
||
self._enable_reply_output = True
|
||
self._reply_group_messages = True
|
||
self._reply_private_messages = True
|
||
self._respect_group_feature_switch = True
|
||
self._mention_user_on_group_reply = False
|
||
|
||
# 协议与连接配置。
|
||
self._api_server_ws_url = ""
|
||
self._api_key = ""
|
||
self._platform_name = "abot-maibot"
|
||
self._verify_ssl = True
|
||
self._connect_timeout = 15
|
||
self._receive_timeout = 120
|
||
self._heartbeat_interval = 20
|
||
self._reconnect_delay = 5
|
||
self._queue_maxsize = 500
|
||
self._max_send_retries = 3
|
||
self._log_level = "INFO"
|
||
|
||
# 运行时状态。
|
||
self._config_ready = False
|
||
self._runtime_started = False
|
||
self._runtime_lock: Optional[asyncio.Lock] = None
|
||
self._outbound_queue: Optional[asyncio.Queue] = None
|
||
self._connected_event: Optional[asyncio.Event] = None
|
||
self._tasks: List[asyncio.Task] = []
|
||
self._client_session: Optional[aiohttp.ClientSession] = None
|
||
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
|
||
self._connection_uuid = ""
|
||
self._last_bot: Optional[WechatAPIClient] = None
|
||
|
||
def initialize(self, context: Dict[str, Any]) -> bool:
|
||
"""加载插件配置。"""
|
||
self.LOG.debug(f"正在初始化 {self.name} 插件...")
|
||
|
||
plugin_config = self._config.get("MaiBotAdapter", {}) or {}
|
||
self._enabled = bool(plugin_config.get("enable", True))
|
||
self._collect_group_messages = bool(plugin_config.get("collect_group_messages", True))
|
||
self._collect_private_messages = bool(plugin_config.get("collect_private_messages", True))
|
||
self._enable_reply_output = bool(plugin_config.get("enable_reply_output", True))
|
||
self._reply_group_messages = bool(plugin_config.get("reply_group_messages", True))
|
||
self._reply_private_messages = bool(plugin_config.get("reply_private_messages", True))
|
||
self._respect_group_feature_switch = bool(plugin_config.get("respect_group_feature_switch", True))
|
||
self._mention_user_on_group_reply = bool(plugin_config.get("mention_user_on_group_reply", False))
|
||
|
||
self._api_server_ws_url = str(plugin_config.get("api_server_ws_url", "") or "").strip()
|
||
self._api_key = str(plugin_config.get("api_key", "") or "").strip()
|
||
self._platform_name = str(plugin_config.get("platform_name", "abot-maibot") or "abot-maibot").strip()
|
||
self._verify_ssl = bool(plugin_config.get("verify_ssl", True))
|
||
self._connect_timeout = max(5, int(plugin_config.get("connect_timeout", 15) or 15))
|
||
self._receive_timeout = max(30, int(plugin_config.get("receive_timeout", 120) or 120))
|
||
self._heartbeat_interval = max(10, int(plugin_config.get("heartbeat_interval", 20) or 20))
|
||
self._reconnect_delay = max(1, int(plugin_config.get("reconnect_delay", 5) or 5))
|
||
self._queue_maxsize = max(50, int(plugin_config.get("queue_maxsize", 500) or 500))
|
||
self._max_send_retries = max(1, int(plugin_config.get("max_send_retries", 3) or 3))
|
||
self._log_level = str(plugin_config.get("log_level", "INFO") or "INFO").strip().upper()
|
||
if self._log_level not in {"DEBUG", "INFO"}:
|
||
self._log_level = "INFO"
|
||
|
||
# 官方 API Server 至少需要 ws 地址与 api_key:
|
||
# 1. 地址用于建立长期 WebSocket;
|
||
# 2. api_key 会进入握手头和消息维度,MaiBot 用它做用户路由;
|
||
# 3. 两者任一缺失,都不适合继续放消息进队列。
|
||
self._config_ready = bool(self._api_server_ws_url and self._api_key)
|
||
if not self._config_ready:
|
||
self.LOG.warning(f"[{self.name}] api_server_ws_url/api_key 未配置完整,插件会加载成功但不会转发消息")
|
||
|
||
self._log_runtime(
|
||
f"[{self.name}] 初始化完成: enabled={self._enabled}, "
|
||
f"collect_group_messages={self._collect_group_messages}, collect_private_messages={self._collect_private_messages}, "
|
||
f"enable_reply_output={self._enable_reply_output}, reply_group_messages={self._reply_group_messages}, "
|
||
f"reply_private_messages={self._reply_private_messages}, api_server_ws_url={self._api_server_ws_url}, "
|
||
f"platform_name={self._platform_name}, queue_maxsize={self._queue_maxsize}"
|
||
)
|
||
return True
|
||
|
||
def start(self) -> bool:
|
||
self.status = PluginStatus.RUNNING
|
||
self._log_runtime(f"[{self.name}] 插件已启动,等待首条消息后再懒启动长连接")
|
||
return True
|
||
|
||
def stop(self) -> bool:
|
||
# stop 是同步接口,不能直接 await 清理:
|
||
# 1. 这里先把状态切为 STOPPED,让后台协程自行退出;
|
||
# 2. 再取消当前任务,尽量让连接尽快释放;
|
||
# 3. 真正的 session/ws 关闭逻辑放在异步清理函数里做“尽力而为”处理。
|
||
self.status = PluginStatus.STOPPED
|
||
for task in list(self._tasks):
|
||
if not task.done():
|
||
task.cancel()
|
||
self._tasks = []
|
||
self._runtime_started = False
|
||
self.LOG.info(f"[{self.name}] 插件已停止")
|
||
return True
|
||
|
||
def can_process(self, message: Dict[str, Any]) -> bool:
|
||
"""只要消息允许被采集,就让插件旁路处理一次。"""
|
||
if not self._enabled or not self._config_ready:
|
||
return False
|
||
|
||
if not self._is_supported_message(message):
|
||
return False
|
||
|
||
full_msg = message.get("full_wx_msg")
|
||
if isinstance(full_msg, WxMessage) and full_msg.from_self():
|
||
return False
|
||
|
||
chat_route = self._resolve_chat_route(message)
|
||
if chat_route["is_group"]:
|
||
return self._collect_group_messages
|
||
return self._collect_private_messages
|
||
|
||
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
|
||
"""将消息写入 MaiBot 采集队列,但默认不阻断后续插件。"""
|
||
await self._ensure_runtime_started()
|
||
|
||
bot: WechatAPIClient = message.get("bot")
|
||
if bot is not None:
|
||
self._last_bot = bot
|
||
|
||
outbound_queue = self._outbound_queue
|
||
if outbound_queue is None:
|
||
self.LOG.warning(f"[{self.name}] outbound_queue 尚未就绪,忽略本次消息")
|
||
return False, "runtime_not_ready"
|
||
|
||
normalized_content = self._normalize_message_content(message)
|
||
if not normalized_content:
|
||
self.LOG.debug(f"[{self.name}] 消息无有效文本表示,跳过转发")
|
||
return False, "empty_content"
|
||
|
||
payload = self._build_outbound_payload(message=message, normalized_content=normalized_content)
|
||
try:
|
||
outbound_queue.put_nowait(payload)
|
||
self._log_runtime(
|
||
f"[{self.name}] 消息已入队: route_type={payload['route_type']}, roomid={payload['roomid']}, "
|
||
f"route_source={payload['route_source']}, sender={payload['sender']}, "
|
||
f"msg_type={payload['message_type']}, queue_size={outbound_queue.qsize()}, "
|
||
f"content_preview={normalized_content[:120]}"
|
||
)
|
||
except asyncio.QueueFull:
|
||
self.LOG.warning(
|
||
f"[{self.name}] 消息队列已满,丢弃本次转发: roomid={payload['roomid']}, sender={payload['sender']}, "
|
||
f"msg_type={payload['message_type']}, content_preview={normalized_content[:120]}"
|
||
)
|
||
return False, "queue_full"
|
||
|
||
# 这里故意返回 False:
|
||
# 1. 该插件的主职责是“默认把消息送给 MaiBot 做长期上下文采集”;
|
||
# 2. 它不应该吞掉本地其它插件,例如 ai_auto_response、Dify、命令插件;
|
||
# 3. 是否真的在微信里说话,由 MaiBot 返回消息和本插件的 reply 开关共同决定。
|
||
return False, "queued"
|
||
|
||
async def _ensure_runtime_started(self) -> None:
|
||
"""在首条消息到来时懒启动后台协程。"""
|
||
if self._runtime_started:
|
||
return
|
||
|
||
if self._runtime_lock is None:
|
||
self._runtime_lock = asyncio.Lock()
|
||
|
||
async with self._runtime_lock:
|
||
if self._runtime_started:
|
||
return
|
||
|
||
self._outbound_queue = asyncio.Queue(maxsize=self._queue_maxsize)
|
||
self._connected_event = asyncio.Event()
|
||
self._connection_uuid = f"abot_{uuid.uuid4().hex}"
|
||
self._tasks = [
|
||
asyncio.create_task(self._connection_loop(), name="maibot_adapter_connection_loop"),
|
||
asyncio.create_task(self._sender_loop(), name="maibot_adapter_sender_loop"),
|
||
]
|
||
self._runtime_started = True
|
||
self._log_runtime(
|
||
f"[{self.name}] 后台运行时已启动: connection_uuid={self._connection_uuid}, "
|
||
f"queue_maxsize={self._queue_maxsize}, heartbeat_interval={self._heartbeat_interval}"
|
||
)
|
||
|
||
async def _connection_loop(self) -> None:
|
||
"""常驻维护官方 API Server 长连接,并负责接收 MaiBot 回包。"""
|
||
while self.status == PluginStatus.RUNNING:
|
||
try:
|
||
await self._connect_websocket()
|
||
await self._receive_loop()
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as exc:
|
||
self.LOG.exception(f"[{self.name}] 连接循环异常,稍后重连: {exc}")
|
||
finally:
|
||
if self._connected_event is not None:
|
||
self._connected_event.clear()
|
||
await self._close_websocket_only()
|
||
|
||
if self.status != PluginStatus.RUNNING:
|
||
break
|
||
await asyncio.sleep(self._reconnect_delay)
|
||
|
||
await self._close_session()
|
||
|
||
async def _connect_websocket(self) -> None:
|
||
"""按 MaiBot 官方 API Server 协议建立 WebSocket。"""
|
||
if self._client_session is None or self._client_session.closed:
|
||
timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._connect_timeout, sock_read=None)
|
||
self._client_session = aiohttp.ClientSession(timeout=timeout)
|
||
|
||
headers = {
|
||
"x-uuid": self._connection_uuid or f"abot_{uuid.uuid4().hex}",
|
||
"x-apikey": self._api_key,
|
||
"x-platform": self._platform_name,
|
||
}
|
||
ssl_option = None if self._verify_ssl else False
|
||
|
||
self._log_runtime(
|
||
f"[{self.name}] 正在连接 MaiBot API Server: url={self._api_server_ws_url}, "
|
||
f"platform={self._platform_name}, connection_uuid={headers['x-uuid']}"
|
||
)
|
||
|
||
self._websocket = await self._client_session.ws_connect(
|
||
self._api_server_ws_url,
|
||
headers=headers,
|
||
heartbeat=self._heartbeat_interval,
|
||
timeout=self._connect_timeout,
|
||
receive_timeout=self._receive_timeout,
|
||
ssl=ssl_option,
|
||
)
|
||
|
||
if self._connected_event is not None:
|
||
self._connected_event.set()
|
||
|
||
self._log_runtime(
|
||
f"[{self.name}] MaiBot API Server 连接成功: url={self._api_server_ws_url}, "
|
||
f"connection_uuid={headers['x-uuid']}"
|
||
)
|
||
|
||
async def _receive_loop(self) -> None:
|
||
"""接收 MaiBot 官方 API Server 返回的消息。"""
|
||
websocket = self._websocket
|
||
if websocket is None:
|
||
raise RuntimeError("WebSocket 尚未连接")
|
||
|
||
while self.status == PluginStatus.RUNNING:
|
||
message = await websocket.receive()
|
||
|
||
if message.type == aiohttp.WSMsgType.TEXT:
|
||
raw_text = str(message.data or "")
|
||
self._log_runtime(f"[{self.name}] 收到 MaiBot 原始消息: {raw_text[:500]}")
|
||
payload = self._parse_json_message(raw_text)
|
||
if payload is None:
|
||
continue
|
||
await self._handle_incoming_package(payload)
|
||
continue
|
||
|
||
if message.type == aiohttp.WSMsgType.CLOSED:
|
||
raise RuntimeError("MaiBot WebSocket 已关闭")
|
||
|
||
if message.type == aiohttp.WSMsgType.ERROR:
|
||
raise RuntimeError(f"MaiBot WebSocket 异常: {websocket.exception()}")
|
||
|
||
self.LOG.debug(f"[{self.name}] 忽略非文本消息类型: {message.type}")
|
||
|
||
async def _sender_loop(self) -> None:
|
||
"""消费出队消息并发送给 MaiBot。"""
|
||
while self.status == PluginStatus.RUNNING:
|
||
if self._outbound_queue is None:
|
||
await asyncio.sleep(0.5)
|
||
continue
|
||
|
||
payload = await self._outbound_queue.get()
|
||
try:
|
||
await self._send_outbound_payload(payload)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as exc:
|
||
await self._handle_send_failure(payload, exc)
|
||
finally:
|
||
self._outbound_queue.task_done()
|
||
|
||
async def _send_outbound_payload(self, payload: Dict[str, Any]) -> None:
|
||
"""发送一条标准 sys_std 消息到 MaiBot。"""
|
||
if self._connected_event is None:
|
||
raise RuntimeError("connected_event 尚未初始化")
|
||
|
||
await self._connected_event.wait()
|
||
websocket = self._websocket
|
||
if websocket is None:
|
||
raise RuntimeError("WebSocket 尚未连接")
|
||
|
||
package = {
|
||
"ver": 1,
|
||
"msg_id": f"msg_{uuid.uuid4().hex[:12]}_{int(time.time())}",
|
||
"type": "sys_std",
|
||
"meta": {
|
||
"sender_user": self._api_key,
|
||
"platform": self._platform_name,
|
||
"timestamp": time.time(),
|
||
},
|
||
"payload": payload["api_message"],
|
||
}
|
||
|
||
# 发送前打印 message_info 快照:
|
||
# 1. 当前“MaiBot 侧仍显示私聊”的排障核心是确认 group_info 是否真实入包;
|
||
# 2. 仅打印 message_info,不输出 message_segment 正文,避免无意义扩大日志;
|
||
# 3. 保留 ensure_ascii=False 便于直接查看中文群名/昵称。
|
||
try:
|
||
message_info_snapshot = json.dumps(
|
||
(payload.get("api_message") or {}).get("message_info") or {},
|
||
ensure_ascii=False,
|
||
)
|
||
self._log_runtime(f"[{self.name}] 发送前 message_info={message_info_snapshot}")
|
||
except Exception as exc:
|
||
self.LOG.warning(f"[{self.name}] message_info 序列化失败,跳过快照日志: {exc}")
|
||
|
||
await websocket.send_json(package)
|
||
self._log_runtime(
|
||
f"[{self.name}] 已发送到 MaiBot: roomid={payload['roomid']}, sender={payload['sender']}, "
|
||
f"msg_type={payload['message_type']}, package_id={package['msg_id']}, "
|
||
f"content_preview={payload['normalized_content'][:120]}"
|
||
)
|
||
|
||
async def _handle_send_failure(self, payload: Dict[str, Any], exc: Exception) -> None:
|
||
"""发送失败时尝试重试,避免瞬时断线导致消息直接丢失。"""
|
||
retry_count = int(payload.get("retry_count", 0) or 0) + 1
|
||
payload["retry_count"] = retry_count
|
||
|
||
self.LOG.warning(
|
||
f"[{self.name}] 发送到 MaiBot 失败: retry_count={retry_count}/{self._max_send_retries}, "
|
||
f"roomid={payload['roomid']}, sender={payload['sender']}, error={exc}"
|
||
)
|
||
|
||
if self._connected_event is not None:
|
||
self._connected_event.clear()
|
||
await self._close_websocket_only()
|
||
|
||
if retry_count >= self._max_send_retries:
|
||
self.LOG.error(
|
||
f"[{self.name}] 消息达到最大重试次数,最终丢弃: roomid={payload['roomid']}, "
|
||
f"sender={payload['sender']}, content_preview={payload['normalized_content'][:120]}"
|
||
)
|
||
return
|
||
|
||
if self._outbound_queue is not None:
|
||
await self._outbound_queue.put(payload)
|
||
|
||
async def _handle_incoming_package(self, package: Dict[str, Any]) -> None:
|
||
"""处理 MaiBot 返回给 abot 的消息包。"""
|
||
package_type = str(package.get("type", "") or "")
|
||
package_id = str(package.get("msg_id", "") or "")
|
||
|
||
if package_type == "sys_ack":
|
||
acked_msg_id = str(((package.get("meta") or {}).get("acked_msg_id")) or "")
|
||
self._log_runtime(f"[{self.name}] 收到 MaiBot ACK: package_id={package_id}, acked_msg_id={acked_msg_id}")
|
||
return
|
||
|
||
if package_type != "sys_std":
|
||
self._log_runtime(f"[{self.name}] 忽略非 sys_std 消息: package_type={package_type}, package_id={package_id}")
|
||
return
|
||
|
||
api_message = package.get("payload") or {}
|
||
message_info = api_message.get("message_info") or {}
|
||
message_dim = api_message.get("message_dim") or {}
|
||
message_segment = api_message.get("message_segment") or {}
|
||
|
||
reply_text = self._extract_segment_text(message_segment).strip()
|
||
reply_text = self._sanitize_reply_text(reply_text)
|
||
if not reply_text:
|
||
self._log_runtime(
|
||
f"[{self.name}] MaiBot 返回了空文本或非文本片段,忽略发送: package_id={package_id}, "
|
||
f"segment_type={message_segment.get('type')}"
|
||
)
|
||
return
|
||
|
||
route = self._resolve_reply_route(message_info)
|
||
if route is None:
|
||
self.LOG.warning(
|
||
f"[{self.name}] 无法从 MaiBot 返回消息中解析路由,忽略发送: package_id={package_id}, "
|
||
f"message_info={message_info}"
|
||
)
|
||
return
|
||
|
||
self._log_runtime(
|
||
f"[{self.name}] 收到 MaiBot 回复: package_id={package_id}, route_type={route['route_type']}, "
|
||
f"target={route['target']}, at_target={route.get('at_target', '')}, "
|
||
f"platform={message_dim.get('platform')}, reply_preview={reply_text[:120]}"
|
||
)
|
||
|
||
await self._emit_reply(route=route, reply_text=reply_text, api_message=api_message)
|
||
|
||
async def _emit_reply(self, route: Dict[str, str], reply_text: str, api_message: Dict[str, Any]) -> None:
|
||
"""按配置决定是否把 MaiBot 回复真正发回微信。"""
|
||
if not self._enable_reply_output:
|
||
self._log_runtime(f"[{self.name}] enable_reply_output=false,仅采集不发回微信")
|
||
return
|
||
|
||
bot = self._last_bot or getattr(self, "bot", None)
|
||
if bot is None:
|
||
self.LOG.warning(f"[{self.name}] 当前没有可用的 bot 实例,无法发送 MaiBot 回复")
|
||
return
|
||
|
||
target = str(route.get("target", "") or "")
|
||
route_type = str(route.get("route_type", "") or "")
|
||
at_target = str(route.get("at_target", "") or "")
|
||
|
||
if route_type == "group":
|
||
if not self._reply_group_messages:
|
||
self._log_runtime(f"[{self.name}] reply_group_messages=false,群回复已跳过: target={target}")
|
||
return
|
||
if self._respect_group_feature_switch and not self._group_reply_allowed(target):
|
||
self._log_runtime(f"[{self.name}] 群功能开关未启用,仅采集不回群: target={target}")
|
||
return
|
||
if self._mention_user_on_group_reply and at_target:
|
||
await bot.send_at_message(target, reply_text, [at_target])
|
||
else:
|
||
await bot.send_text_message(target, reply_text, at_target if at_target else "")
|
||
self._log_runtime(
|
||
f"[{self.name}] 已发出 MaiBot 群回复: target={target}, at_target={at_target}, "
|
||
f"reply_len={len(reply_text)}"
|
||
)
|
||
return
|
||
|
||
if route_type == "private":
|
||
if not self._reply_private_messages:
|
||
self._log_runtime(f"[{self.name}] reply_private_messages=false,私聊回复已跳过: target={target}")
|
||
return
|
||
await bot.send_text_message(target, reply_text, "")
|
||
self._log_runtime(f"[{self.name}] 已发出 MaiBot 私聊回复: target={target}, reply_len={len(reply_text)}")
|
||
return
|
||
|
||
self.LOG.warning(f"[{self.name}] 未知路由类型,无法发送 MaiBot 回复: route={route}, api_message={api_message}")
|
||
|
||
def _group_reply_allowed(self, roomid: str) -> bool:
|
||
"""群开关只影响“说不说”,不影响“收不收”。"""
|
||
if not roomid or not self.feature:
|
||
return True
|
||
|
||
try:
|
||
permission = GroupBotManager.get_group_permission(roomid, self.feature)
|
||
return permission != PermissionStatus.DISABLED
|
||
except Exception:
|
||
# 这里故意保守放行:
|
||
# 1. 如果权限系统异常,我们不希望把功能直接打成完全失效;
|
||
# 2. 同时上层已有总开关 enable_reply_output 可兜底;
|
||
# 3. 真出问题时,日志里会保留异常栈便于后续修复。
|
||
self.LOG.exception(f"[{self.name}] 读取群功能权限失败,默认允许发群回复: roomid={roomid}")
|
||
return True
|
||
|
||
def _build_outbound_payload(self, message: Dict[str, Any], normalized_content: str) -> Dict[str, Any]:
|
||
"""把微信消息包装成 MaiBot 官方 API Server 的 APIMessageBase 结构。"""
|
||
full_msg = message.get("full_wx_msg")
|
||
# 会话路由统一在这里解析,避免“can_process 一套、转发又一套”导致判定漂移。
|
||
chat_route = self._resolve_chat_route(message)
|
||
roomid = chat_route["roomid"]
|
||
route_type = chat_route["route_type"]
|
||
route_source = chat_route["route_source"]
|
||
sender = str(message.get("sender", "") or "").strip()
|
||
msg_type = self._resolve_message_type(message)
|
||
timestamp = self._resolve_message_timestamp(message, full_msg)
|
||
message_id = self._resolve_message_id(message, full_msg)
|
||
|
||
sender_name = self._resolve_sender_name(message, sender)
|
||
group_name = self._resolve_group_name(message, roomid)
|
||
|
||
# 路由诊断日志:这里把关键字段一次性打全,便于定位“群消息为何被识别成私聊”。
|
||
self._log_runtime(
|
||
f"[{self.name}] 路由判定: route_type={route_type}, route_source={route_source}, "
|
||
f"plugin_roomid={str(message.get('roomid', '') or '').strip()}, "
|
||
f"wx_roomid={str(getattr(full_msg, 'roomid', '') or '').strip() if isinstance(full_msg, WxMessage) else ''}, "
|
||
f"wx_to_user={str(getattr(full_msg, 'to_user', '') or '').strip() if isinstance(full_msg, WxMessage) else ''}, "
|
||
f"sender={sender}, message_id={message_id}"
|
||
)
|
||
|
||
# 按 MaiBot 当前桥接实现做“新旧兼容双写”:
|
||
# 1. 顶层 user_info/group_info:兼容新结构消费者;
|
||
# 2. sender_info.user_info/group_info:兼容当前 API Server 内部 from_api_receive 逻辑;
|
||
# 3. 通过 sender_info.group_info 显式传群维度,避免被判成私聊。
|
||
user_info: Dict[str, Any] = {
|
||
"platform": self._platform_name,
|
||
"user_id": sender,
|
||
"user_nickname": sender_name,
|
||
}
|
||
group_info: Optional[Dict[str, Any]] = None
|
||
if roomid:
|
||
group_info = {
|
||
"platform": self._platform_name,
|
||
"group_id": roomid,
|
||
"group_name": group_name,
|
||
}
|
||
sender_info: Dict[str, Any] = {
|
||
"user_info": user_info,
|
||
"group_info": group_info,
|
||
}
|
||
|
||
api_message = {
|
||
"message_info": {
|
||
"platform": self._platform_name,
|
||
"message_id": message_id,
|
||
"time": timestamp,
|
||
"format_info": {
|
||
"content_format": ["text"],
|
||
"accept_format": ["text"],
|
||
},
|
||
"additional_config": {
|
||
"source": "abot_maibot_adapter",
|
||
"is_at": bool(message.get("is_at", False)),
|
||
# 兼容 MaiBot 当前提及检测逻辑:
|
||
# 1. is_mentioned_bot_in_message 会优先读取 additional_config.at_bot / is_mentioned;
|
||
# 2. 之前仅传 is_at,MaiBot 不会把它当作“提及强信号”;
|
||
# 3. 这里同步双写,确保群里 @ 机器人时能够稳定提升回复概率并触发后续动作。
|
||
"at_bot": bool(message.get("is_at", False)),
|
||
"is_mentioned": bool(message.get("is_at", False)),
|
||
"wx_message_type": msg_type,
|
||
"collect_only": True,
|
||
# 额外把本地路由判定结果透传给 MaiBot,便于服务端/日志排查“为何被识别成私聊”。
|
||
"abot_route_type": route_type,
|
||
"abot_route_source": route_source,
|
||
},
|
||
"user_info": user_info,
|
||
"group_info": group_info,
|
||
"sender_info": sender_info,
|
||
},
|
||
"message_segment": {
|
||
"type": "text",
|
||
"data": normalized_content,
|
||
},
|
||
"message_dim": {
|
||
"api_key": self._api_key,
|
||
"platform": self._platform_name,
|
||
},
|
||
}
|
||
|
||
return {
|
||
"route_type": route_type,
|
||
"route_source": route_source,
|
||
"roomid": roomid,
|
||
"sender": sender,
|
||
"message_type": msg_type,
|
||
"normalized_content": normalized_content,
|
||
"retry_count": 0,
|
||
"api_message": api_message,
|
||
}
|
||
|
||
def _resolve_chat_route(self, message: Dict[str, Any]) -> Dict[str, str]:
|
||
"""解析消息的群聊/私聊路由,兼容上游字段缺失场景。"""
|
||
full_msg = message.get("full_wx_msg")
|
||
|
||
# 先看插件消息里已经带好的 roomid,这是最直接、最便宜的一跳。
|
||
# 按你现在的业务约定:只要 roomid 非空,就视为群聊。
|
||
# 这样可以避免某些平台包体里 roomid 不是 @chatroom 结尾时被误判成私聊。
|
||
plugin_roomid = str(message.get("roomid", "") or "").strip()
|
||
if plugin_roomid:
|
||
return {"is_group": True, "route_type": "group", "roomid": plugin_roomid, "route_source": "plugin_roomid_non_empty"}
|
||
|
||
# 再看 WxMessage 里计算过的 roomid。
|
||
wx_roomid = ""
|
||
wx_to_user = ""
|
||
wx_from_group = False
|
||
if isinstance(full_msg, WxMessage):
|
||
wx_roomid = str(getattr(full_msg, "roomid", "") or "").strip()
|
||
wx_to_user = str(getattr(full_msg, "to_user", "") or "").strip()
|
||
try:
|
||
wx_from_group = bool(full_msg.from_group())
|
||
except Exception:
|
||
wx_from_group = False
|
||
|
||
if wx_roomid.endswith("@chatroom"):
|
||
return {"is_group": True, "route_type": "group", "roomid": wx_roomid, "route_source": "wx_roomid"}
|
||
|
||
# 某些边界消息里,roomid 可能没落下来,但 to_user 仍是 chatroom。
|
||
if wx_to_user.endswith("@chatroom"):
|
||
return {"is_group": True, "route_type": "group", "roomid": wx_to_user, "route_source": "wx_to_user"}
|
||
|
||
# 最后兜底原始报文,避免上游字段偶发缺失时把群消息误判成私聊。
|
||
raw_data = getattr(full_msg, "raw_data", {}) if isinstance(full_msg, WxMessage) else {}
|
||
raw_from = ""
|
||
raw_to = ""
|
||
if isinstance(raw_data, dict):
|
||
raw_from = str(((raw_data.get("FromUserName") or {}).get("string")) or "").strip()
|
||
raw_to = str(((raw_data.get("ToUserName") or {}).get("string")) or "").strip()
|
||
|
||
if raw_from.endswith("@chatroom"):
|
||
return {"is_group": True, "route_type": "group", "roomid": raw_from, "route_source": "raw_from_user"}
|
||
if raw_to.endswith("@chatroom"):
|
||
return {"is_group": True, "route_type": "group", "roomid": raw_to, "route_source": "raw_to_user"}
|
||
|
||
# 到这里仍不是群聊,就按私聊处理,但保留详细来源,方便日志反查。
|
||
private_source = "wx_from_group_flag" if wx_from_group else "private_default"
|
||
return {"is_group": False, "route_type": "private", "roomid": "", "route_source": private_source}
|
||
|
||
def _resolve_reply_route(self, message_info: Dict[str, Any]) -> Optional[Dict[str, str]]:
|
||
"""从 MaiBot 返回的 message_info 中解析微信路由。"""
|
||
# 优先按官方当前结构解析:
|
||
# 1. group_info 存在 => 群回复;
|
||
# 2. user_info 存在 => 私聊回复。
|
||
root_group_info = message_info.get("group_info") or {}
|
||
root_user_info = message_info.get("user_info") or {}
|
||
group_id = str(root_group_info.get("group_id", "") or "").strip()
|
||
root_user_id = str(root_user_info.get("user_id", "") or "").strip()
|
||
|
||
if group_id:
|
||
return {
|
||
"route_type": "group",
|
||
"target": group_id,
|
||
"at_target": root_user_id,
|
||
}
|
||
|
||
if root_user_id:
|
||
return {
|
||
"route_type": "private",
|
||
"target": root_user_id,
|
||
"at_target": "",
|
||
}
|
||
|
||
# 兼容历史 sender_info/receiver_info 结构,避免升级期间回包偶发失配。
|
||
receiver_info = message_info.get("receiver_info") or {}
|
||
sender_info = message_info.get("sender_info") or {}
|
||
receiver_group_info = receiver_info.get("group_info") or {}
|
||
receiver_user_info = receiver_info.get("user_info") or {}
|
||
sender_user_info = sender_info.get("user_info") or {}
|
||
|
||
group_id = str(receiver_group_info.get("group_id", "") or "").strip()
|
||
receiver_user_id = str(receiver_user_info.get("user_id", "") or "").strip()
|
||
sender_user_id = str(sender_user_info.get("user_id", "") or "").strip()
|
||
|
||
if group_id:
|
||
return {
|
||
"route_type": "group",
|
||
"target": group_id,
|
||
"at_target": receiver_user_id or sender_user_id,
|
||
}
|
||
|
||
if receiver_user_id:
|
||
return {
|
||
"route_type": "private",
|
||
"target": receiver_user_id,
|
||
"at_target": "",
|
||
}
|
||
|
||
if sender_user_id:
|
||
return {
|
||
"route_type": "private",
|
||
"target": sender_user_id,
|
||
"at_target": "",
|
||
}
|
||
|
||
return None
|
||
|
||
def _normalize_message_content(self, message: Dict[str, Any]) -> str:
|
||
"""将不同微信消息类型规整成适合 MaiBot 理解的文本。"""
|
||
full_msg = message.get("full_wx_msg")
|
||
msg_type = self._resolve_message_type(message)
|
||
|
||
if isinstance(full_msg, WxMessage):
|
||
if full_msg.msg_type == MessageType.TEXT:
|
||
return str(full_msg.content.clean_content or "").strip()
|
||
|
||
if full_msg.msg_type == MessageType.IMAGE:
|
||
image_content = full_msg.get_image_content()
|
||
if image_content:
|
||
return f"[图片] md5={image_content.md5} size={image_content.length}"
|
||
return "[图片]"
|
||
|
||
if full_msg.msg_type == MessageType.VOICE:
|
||
voice_content = full_msg.get_voice_content()
|
||
if voice_content:
|
||
return f"[语音] 时长={voice_content.voice_length}ms"
|
||
return "[语音]"
|
||
|
||
if full_msg.msg_type == MessageType.VIDEO:
|
||
video_content = full_msg.get_video_content()
|
||
if video_content:
|
||
return f"[视频] 时长={video_content.play_length}ms size={video_content.length}"
|
||
return "[视频]"
|
||
|
||
if full_msg.msg_type == MessageType.LOCATION:
|
||
location_content = full_msg.get_location_content()
|
||
if location_content:
|
||
return f"[位置] {location_content.label}"
|
||
return "[位置]"
|
||
|
||
if full_msg.msg_type == MessageType.APP:
|
||
app_type = full_msg.get_app_message_type()
|
||
if app_type == AppMessageType.LINK:
|
||
return f"[链接分享] {full_msg.content.clean_content or ''}".strip()
|
||
if app_type == AppMessageType.MINIPROGRAM:
|
||
return "[小程序]"
|
||
if app_type == AppMessageType.FILE:
|
||
return "[文件]"
|
||
if app_type == AppMessageType.QUOTE:
|
||
return f"[引用消息] {full_msg.content.clean_content or ''}".strip()
|
||
return f"[应用消息] {app_type.name if app_type else msg_type}"
|
||
|
||
if full_msg.msg_type in (MessageType.EMOTICON, MessageType.EMOJI):
|
||
return "[表情]"
|
||
|
||
if full_msg.msg_type in (MessageType.SYSTEM, MessageType.SYSTEM_NOTIFY, MessageType.RECALLED):
|
||
content = str(full_msg.content.clean_content or full_msg.content.raw_content or "").strip()
|
||
return f"[系统消息] {content}".strip()
|
||
|
||
raw_content = str(message.get("content", "") or "").strip()
|
||
if raw_content:
|
||
return raw_content
|
||
return f"[{msg_type}]"
|
||
|
||
def _resolve_message_type(self, message: Dict[str, Any]) -> str:
|
||
"""统一导出消息类型名称,方便日志与 additional_config 使用。"""
|
||
full_msg = message.get("full_wx_msg")
|
||
if isinstance(full_msg, WxMessage) and full_msg.msg_type:
|
||
return str(full_msg.msg_type.name or "UNKNOWN")
|
||
|
||
msg_type = message.get("type")
|
||
if isinstance(msg_type, MessageType):
|
||
return str(msg_type.name or "UNKNOWN")
|
||
return str(msg_type or "UNKNOWN")
|
||
|
||
def _resolve_message_timestamp(self, message: Dict[str, Any], full_msg: Any) -> float:
|
||
"""优先使用微信原始消息时间,缺失时再回落到当前时间。"""
|
||
if isinstance(full_msg, WxMessage) and getattr(full_msg, "create_time", 0):
|
||
try:
|
||
return float(full_msg.create_time)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
raw_timestamp = message.get("timestamp")
|
||
if raw_timestamp not in (None, ""):
|
||
try:
|
||
return float(raw_timestamp)
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
return time.time()
|
||
|
||
def _resolve_message_id(self, message: Dict[str, Any], full_msg: Any) -> str:
|
||
"""尽量保留微信原始 msg_id,便于服务端做去重与日志排查。"""
|
||
if isinstance(full_msg, WxMessage) and getattr(full_msg, "msg_id", None):
|
||
return str(full_msg.msg_id)
|
||
|
||
roomid = str(message.get("roomid", "") or "")
|
||
sender = str(message.get("sender", "") or "")
|
||
return f"abot_{roomid}_{sender}_{uuid.uuid4().hex[:12]}"
|
||
|
||
def _resolve_sender_name(self, message: Dict[str, Any], sender: str) -> str:
|
||
"""从现有上下文里尽量拿到更友好的显示名。"""
|
||
all_contacts = message.get("all_contacts", {}) or {}
|
||
sender_name = str(all_contacts.get(sender, "") or "").strip()
|
||
if sender_name:
|
||
return sender_name
|
||
return sender or "unknown"
|
||
|
||
def _resolve_group_name(self, message: Dict[str, Any], roomid: str) -> str:
|
||
"""群名称仅作为增强上下文,不影响业务路由。"""
|
||
if not roomid:
|
||
return ""
|
||
all_contacts = message.get("all_contacts", {}) or {}
|
||
return str(all_contacts.get(roomid, "") or roomid)
|
||
|
||
def _extract_segment_text(self, segment: Dict[str, Any]) -> str:
|
||
"""把 MaiBot 返回的消息片段递归拼成文本。"""
|
||
segment_type = str(segment.get("type", "") or "")
|
||
data = segment.get("data")
|
||
|
||
if segment_type == "text":
|
||
return str(data or "")
|
||
|
||
if segment_type == "seglist" and isinstance(data, list):
|
||
parts: List[str] = []
|
||
for item in data:
|
||
if isinstance(item, dict):
|
||
parts.append(self._extract_segment_text(item))
|
||
return "".join(parts)
|
||
|
||
if segment_type == "image":
|
||
return "[图片]"
|
||
|
||
if segment_type == "emoji":
|
||
return "[表情]"
|
||
|
||
return str(data or "")
|
||
|
||
def _sanitize_reply_text(self, reply_text: str) -> str:
|
||
"""清理 MaiBot 回复里的路由噪音,避免把 message_id 等内部标识发到群里。"""
|
||
text = str(reply_text or "").strip()
|
||
if not text:
|
||
return ""
|
||
|
||
# 清理“纯数字前缀 + 标点/空白”的噪音:
|
||
# 1. 某些轮次里模型会把 message_id 或会话标识误拼到回复最前面;
|
||
# 2. 这些数字通常长度较长(>=6),且紧跟中文/英文标点或空白;
|
||
# 3. 只在“前缀位置”清理,避免误伤正文里的正常数字内容。
|
||
text = re.sub(r"^\s*\d{6,}\s*[,,::;;\-—_~\s]*", "", text)
|
||
|
||
# 若清理后前部还有一层“孤立标点”,再做一次轻量修整,避免出现“,在呢怎么了”这种残留。
|
||
text = re.sub(r"^\s*[,,::;;\-—_~]+\s*", "", text)
|
||
|
||
return text.strip()
|
||
|
||
def _parse_json_message(self, raw_text: str) -> Optional[Dict[str, Any]]:
|
||
"""统一处理 JSON 解析异常,避免接收循环被坏包打断。"""
|
||
try:
|
||
payload = json.loads(raw_text)
|
||
except json.JSONDecodeError:
|
||
self.LOG.warning(f"[{self.name}] 收到非法 JSON,已忽略: {raw_text[:300]}")
|
||
return None
|
||
|
||
if not isinstance(payload, dict):
|
||
self.LOG.warning(f"[{self.name}] 收到非对象 JSON,已忽略: {payload}")
|
||
return None
|
||
return payload
|
||
|
||
def _log_runtime(self, message: str) -> None:
|
||
"""根据插件配置动态输出运行日志级别。"""
|
||
if self._log_level == "DEBUG":
|
||
self.LOG.debug(message)
|
||
return
|
||
self.LOG.info(message)
|
||
|
||
def _is_supported_message(self, message: Dict[str, Any]) -> bool:
|
||
"""仅接收 MaiBot 当前最适合做上下文理解的消息类型。"""
|
||
full_msg = message.get("full_wx_msg")
|
||
if isinstance(full_msg, WxMessage):
|
||
return full_msg.msg_type in {
|
||
MessageType.TEXT,
|
||
MessageType.IMAGE,
|
||
MessageType.VOICE,
|
||
MessageType.VIDEO,
|
||
MessageType.LOCATION,
|
||
MessageType.APP,
|
||
MessageType.EMOTICON,
|
||
MessageType.EMOJI,
|
||
MessageType.SYSTEM,
|
||
MessageType.SYSTEM_NOTIFY,
|
||
MessageType.RECALLED,
|
||
}
|
||
|
||
msg_type = message.get("type")
|
||
if isinstance(msg_type, MessageType):
|
||
return True
|
||
return bool(message.get("content"))
|
||
|
||
async def _close_websocket_only(self) -> None:
|
||
"""关闭当前 websocket,但保留 session 供重连复用。"""
|
||
if self._websocket is None:
|
||
return
|
||
try:
|
||
await self._websocket.close()
|
||
except Exception as exc:
|
||
self.LOG.warning(f"[{self.name}] 关闭 WebSocket 失败: {exc}")
|
||
finally:
|
||
self._websocket = None
|
||
|
||
async def _close_session(self) -> None:
|
||
"""关闭 aiohttp session。"""
|
||
await self._close_websocket_only()
|
||
if self._client_session is None:
|
||
return
|
||
try:
|
||
await self._client_session.close()
|
||
except Exception as exc:
|
||
self.LOG.warning(f"[{self.name}] 关闭 ClientSession 失败: {exc}")
|
||
finally:
|
||
self._client_session = None
|