Files
abot/plugins/maibot_adapter/main.py
liuwei 736ac05d98 增加MaiBot发包message_info快照日志用于私聊误判排障
变更项:\n1. 在 _send_outbound_payload 发送前新增 message_info 快照日志,完整输出实际入包的 message_info JSON。\n2. 日志仅输出 message_info,避免正文 segment 造成噪音与隐私扩散。\n3. 增加序列化异常保护,避免日志构建失败影响正常发包。\n4. 为定位‘群消息仍被识别为私聊’提供第一手协议证据。
2026-04-29 11:00:46 +08:00

895 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional, Tuple
import aiohttp
from base.plugin_common.message_plugin_interface import MessagePluginInterface
from base.plugin_common.plugin_interface import PluginStatus
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import AppMessageType, MessageType, WxMessage
class MaiBotAdapterPlugin(MessagePluginInterface):
"""将 abot 的微信消息桥接到 MaiBot 官方 API Server。"""
FEATURE_KEY = "MAIBOT_CHAT"
FEATURE_DESCRIPTION = "🤖 MaiBot 对话桥接 [官方API Server 接入]"
@property
def name(self) -> str:
return "MaiBot对话"
@property
def version(self) -> str:
return "2.0.0"
@property
def description(self) -> str:
return "默认采集群聊/私聊消息到 MaiBot并按开关决定是否输出 MaiBot 回复"
@property
def author(self) -> str:
return "Codex"
@property
def command_prefix(self) -> Optional[str]:
# 这个插件已经不是命令式插件:
# 1. 现在它的职责是“消息旁路采集 + 可选回复输出”;
# 2. 触发入口不再依赖“麦麦 xxx”这种命令
# 3. 因此这里返回 None让插件管理器不要按命令插件路径理解它。
return None
@property
def commands(self) -> List[str]:
return []
@property
def feature_key(self) -> Optional[str]:
return self.FEATURE_KEY
@property
def feature_description(self) -> Optional[str]:
return self.FEATURE_DESCRIPTION
def __init__(self):
super().__init__()
self.feature = self.register_feature()
# 基础开关。
self._enabled = True
self._collect_group_messages = True
self._collect_private_messages = True
self._enable_reply_output = True
self._reply_group_messages = True
self._reply_private_messages = True
self._respect_group_feature_switch = True
self._mention_user_on_group_reply = False
# 协议与连接配置。
self._api_server_ws_url = ""
self._api_key = ""
self._platform_name = "abot-maibot"
self._verify_ssl = True
self._connect_timeout = 15
self._receive_timeout = 120
self._heartbeat_interval = 20
self._reconnect_delay = 5
self._queue_maxsize = 500
self._max_send_retries = 3
self._log_level = "INFO"
# 运行时状态。
self._config_ready = False
self._runtime_started = False
self._runtime_lock: Optional[asyncio.Lock] = None
self._outbound_queue: Optional[asyncio.Queue] = None
self._connected_event: Optional[asyncio.Event] = None
self._tasks: List[asyncio.Task] = []
self._client_session: Optional[aiohttp.ClientSession] = None
self._websocket: Optional[aiohttp.ClientWebSocketResponse] = None
self._connection_uuid = ""
self._last_bot: Optional[WechatAPIClient] = None
def initialize(self, context: Dict[str, Any]) -> bool:
"""加载插件配置。"""
self.LOG.debug(f"正在初始化 {self.name} 插件...")
plugin_config = self._config.get("MaiBotAdapter", {}) or {}
self._enabled = bool(plugin_config.get("enable", True))
self._collect_group_messages = bool(plugin_config.get("collect_group_messages", True))
self._collect_private_messages = bool(plugin_config.get("collect_private_messages", True))
self._enable_reply_output = bool(plugin_config.get("enable_reply_output", True))
self._reply_group_messages = bool(plugin_config.get("reply_group_messages", True))
self._reply_private_messages = bool(plugin_config.get("reply_private_messages", True))
self._respect_group_feature_switch = bool(plugin_config.get("respect_group_feature_switch", True))
self._mention_user_on_group_reply = bool(plugin_config.get("mention_user_on_group_reply", False))
self._api_server_ws_url = str(plugin_config.get("api_server_ws_url", "") or "").strip()
self._api_key = str(plugin_config.get("api_key", "") or "").strip()
self._platform_name = str(plugin_config.get("platform_name", "abot-maibot") or "abot-maibot").strip()
self._verify_ssl = bool(plugin_config.get("verify_ssl", True))
self._connect_timeout = max(5, int(plugin_config.get("connect_timeout", 15) or 15))
self._receive_timeout = max(30, int(plugin_config.get("receive_timeout", 120) or 120))
self._heartbeat_interval = max(10, int(plugin_config.get("heartbeat_interval", 20) or 20))
self._reconnect_delay = max(1, int(plugin_config.get("reconnect_delay", 5) or 5))
self._queue_maxsize = max(50, int(plugin_config.get("queue_maxsize", 500) or 500))
self._max_send_retries = max(1, int(plugin_config.get("max_send_retries", 3) or 3))
self._log_level = str(plugin_config.get("log_level", "INFO") or "INFO").strip().upper()
if self._log_level not in {"DEBUG", "INFO"}:
self._log_level = "INFO"
# 官方 API Server 至少需要 ws 地址与 api_key
# 1. 地址用于建立长期 WebSocket
# 2. api_key 会进入握手头和消息维度MaiBot 用它做用户路由;
# 3. 两者任一缺失,都不适合继续放消息进队列。
self._config_ready = bool(self._api_server_ws_url and self._api_key)
if not self._config_ready:
self.LOG.warning(f"[{self.name}] api_server_ws_url/api_key 未配置完整,插件会加载成功但不会转发消息")
self._log_runtime(
f"[{self.name}] 初始化完成: enabled={self._enabled}, "
f"collect_group_messages={self._collect_group_messages}, collect_private_messages={self._collect_private_messages}, "
f"enable_reply_output={self._enable_reply_output}, reply_group_messages={self._reply_group_messages}, "
f"reply_private_messages={self._reply_private_messages}, api_server_ws_url={self._api_server_ws_url}, "
f"platform_name={self._platform_name}, queue_maxsize={self._queue_maxsize}"
)
return True
def start(self) -> bool:
self.status = PluginStatus.RUNNING
self._log_runtime(f"[{self.name}] 插件已启动,等待首条消息后再懒启动长连接")
return True
def stop(self) -> bool:
# stop 是同步接口,不能直接 await 清理:
# 1. 这里先把状态切为 STOPPED让后台协程自行退出
# 2. 再取消当前任务,尽量让连接尽快释放;
# 3. 真正的 session/ws 关闭逻辑放在异步清理函数里做“尽力而为”处理。
self.status = PluginStatus.STOPPED
for task in list(self._tasks):
if not task.done():
task.cancel()
self._tasks = []
self._runtime_started = False
self.LOG.info(f"[{self.name}] 插件已停止")
return True
def can_process(self, message: Dict[str, Any]) -> bool:
"""只要消息允许被采集,就让插件旁路处理一次。"""
if not self._enabled or not self._config_ready:
return False
if not self._is_supported_message(message):
return False
full_msg = message.get("full_wx_msg")
if isinstance(full_msg, WxMessage) and full_msg.from_self():
return False
chat_route = self._resolve_chat_route(message)
if chat_route["is_group"]:
return self._collect_group_messages
return self._collect_private_messages
async def process_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""将消息写入 MaiBot 采集队列,但默认不阻断后续插件。"""
await self._ensure_runtime_started()
bot: WechatAPIClient = message.get("bot")
if bot is not None:
self._last_bot = bot
outbound_queue = self._outbound_queue
if outbound_queue is None:
self.LOG.warning(f"[{self.name}] outbound_queue 尚未就绪,忽略本次消息")
return False, "runtime_not_ready"
normalized_content = self._normalize_message_content(message)
if not normalized_content:
self.LOG.debug(f"[{self.name}] 消息无有效文本表示,跳过转发")
return False, "empty_content"
payload = self._build_outbound_payload(message=message, normalized_content=normalized_content)
try:
outbound_queue.put_nowait(payload)
self._log_runtime(
f"[{self.name}] 消息已入队: route_type={payload['route_type']}, roomid={payload['roomid']}, "
f"route_source={payload['route_source']}, sender={payload['sender']}, "
f"msg_type={payload['message_type']}, queue_size={outbound_queue.qsize()}, "
f"content_preview={normalized_content[:120]}"
)
except asyncio.QueueFull:
self.LOG.warning(
f"[{self.name}] 消息队列已满,丢弃本次转发: roomid={payload['roomid']}, sender={payload['sender']}, "
f"msg_type={payload['message_type']}, content_preview={normalized_content[:120]}"
)
return False, "queue_full"
# 这里故意返回 False
# 1. 该插件的主职责是“默认把消息送给 MaiBot 做长期上下文采集”;
# 2. 它不应该吞掉本地其它插件,例如 ai_auto_response、Dify、命令插件
# 3. 是否真的在微信里说话,由 MaiBot 返回消息和本插件的 reply 开关共同决定。
return False, "queued"
async def _ensure_runtime_started(self) -> None:
"""在首条消息到来时懒启动后台协程。"""
if self._runtime_started:
return
if self._runtime_lock is None:
self._runtime_lock = asyncio.Lock()
async with self._runtime_lock:
if self._runtime_started:
return
self._outbound_queue = asyncio.Queue(maxsize=self._queue_maxsize)
self._connected_event = asyncio.Event()
self._connection_uuid = f"abot_{uuid.uuid4().hex}"
self._tasks = [
asyncio.create_task(self._connection_loop(), name="maibot_adapter_connection_loop"),
asyncio.create_task(self._sender_loop(), name="maibot_adapter_sender_loop"),
]
self._runtime_started = True
self._log_runtime(
f"[{self.name}] 后台运行时已启动: connection_uuid={self._connection_uuid}, "
f"queue_maxsize={self._queue_maxsize}, heartbeat_interval={self._heartbeat_interval}"
)
async def _connection_loop(self) -> None:
"""常驻维护官方 API Server 长连接,并负责接收 MaiBot 回包。"""
while self.status == PluginStatus.RUNNING:
try:
await self._connect_websocket()
await self._receive_loop()
except asyncio.CancelledError:
raise
except Exception as exc:
self.LOG.exception(f"[{self.name}] 连接循环异常,稍后重连: {exc}")
finally:
if self._connected_event is not None:
self._connected_event.clear()
await self._close_websocket_only()
if self.status != PluginStatus.RUNNING:
break
await asyncio.sleep(self._reconnect_delay)
await self._close_session()
async def _connect_websocket(self) -> None:
"""按 MaiBot 官方 API Server 协议建立 WebSocket。"""
if self._client_session is None or self._client_session.closed:
timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._connect_timeout, sock_read=None)
self._client_session = aiohttp.ClientSession(timeout=timeout)
headers = {
"x-uuid": self._connection_uuid or f"abot_{uuid.uuid4().hex}",
"x-apikey": self._api_key,
"x-platform": self._platform_name,
}
ssl_option = None if self._verify_ssl else False
self._log_runtime(
f"[{self.name}] 正在连接 MaiBot API Server: url={self._api_server_ws_url}, "
f"platform={self._platform_name}, connection_uuid={headers['x-uuid']}"
)
self._websocket = await self._client_session.ws_connect(
self._api_server_ws_url,
headers=headers,
heartbeat=self._heartbeat_interval,
timeout=self._connect_timeout,
receive_timeout=self._receive_timeout,
ssl=ssl_option,
)
if self._connected_event is not None:
self._connected_event.set()
self._log_runtime(
f"[{self.name}] MaiBot API Server 连接成功: url={self._api_server_ws_url}, "
f"connection_uuid={headers['x-uuid']}"
)
async def _receive_loop(self) -> None:
"""接收 MaiBot 官方 API Server 返回的消息。"""
websocket = self._websocket
if websocket is None:
raise RuntimeError("WebSocket 尚未连接")
while self.status == PluginStatus.RUNNING:
message = await websocket.receive()
if message.type == aiohttp.WSMsgType.TEXT:
raw_text = str(message.data or "")
self._log_runtime(f"[{self.name}] 收到 MaiBot 原始消息: {raw_text[:500]}")
payload = self._parse_json_message(raw_text)
if payload is None:
continue
await self._handle_incoming_package(payload)
continue
if message.type == aiohttp.WSMsgType.CLOSED:
raise RuntimeError("MaiBot WebSocket 已关闭")
if message.type == aiohttp.WSMsgType.ERROR:
raise RuntimeError(f"MaiBot WebSocket 异常: {websocket.exception()}")
self.LOG.debug(f"[{self.name}] 忽略非文本消息类型: {message.type}")
async def _sender_loop(self) -> None:
"""消费出队消息并发送给 MaiBot。"""
while self.status == PluginStatus.RUNNING:
if self._outbound_queue is None:
await asyncio.sleep(0.5)
continue
payload = await self._outbound_queue.get()
try:
await self._send_outbound_payload(payload)
except asyncio.CancelledError:
raise
except Exception as exc:
await self._handle_send_failure(payload, exc)
finally:
self._outbound_queue.task_done()
async def _send_outbound_payload(self, payload: Dict[str, Any]) -> None:
"""发送一条标准 sys_std 消息到 MaiBot。"""
if self._connected_event is None:
raise RuntimeError("connected_event 尚未初始化")
await self._connected_event.wait()
websocket = self._websocket
if websocket is None:
raise RuntimeError("WebSocket 尚未连接")
package = {
"ver": 1,
"msg_id": f"msg_{uuid.uuid4().hex[:12]}_{int(time.time())}",
"type": "sys_std",
"meta": {
"sender_user": self._api_key,
"platform": self._platform_name,
"timestamp": time.time(),
},
"payload": payload["api_message"],
}
# 发送前打印 message_info 快照:
# 1. 当前“MaiBot 侧仍显示私聊”的排障核心是确认 group_info 是否真实入包;
# 2. 仅打印 message_info不输出 message_segment 正文,避免无意义扩大日志;
# 3. 保留 ensure_ascii=False 便于直接查看中文群名/昵称。
try:
message_info_snapshot = json.dumps(
(payload.get("api_message") or {}).get("message_info") or {},
ensure_ascii=False,
)
self._log_runtime(f"[{self.name}] 发送前 message_info={message_info_snapshot}")
except Exception as exc:
self.LOG.warning(f"[{self.name}] message_info 序列化失败,跳过快照日志: {exc}")
await websocket.send_json(package)
self._log_runtime(
f"[{self.name}] 已发送到 MaiBot: roomid={payload['roomid']}, sender={payload['sender']}, "
f"msg_type={payload['message_type']}, package_id={package['msg_id']}, "
f"content_preview={payload['normalized_content'][:120]}"
)
async def _handle_send_failure(self, payload: Dict[str, Any], exc: Exception) -> None:
"""发送失败时尝试重试,避免瞬时断线导致消息直接丢失。"""
retry_count = int(payload.get("retry_count", 0) or 0) + 1
payload["retry_count"] = retry_count
self.LOG.warning(
f"[{self.name}] 发送到 MaiBot 失败: retry_count={retry_count}/{self._max_send_retries}, "
f"roomid={payload['roomid']}, sender={payload['sender']}, error={exc}"
)
if self._connected_event is not None:
self._connected_event.clear()
await self._close_websocket_only()
if retry_count >= self._max_send_retries:
self.LOG.error(
f"[{self.name}] 消息达到最大重试次数,最终丢弃: roomid={payload['roomid']}, "
f"sender={payload['sender']}, content_preview={payload['normalized_content'][:120]}"
)
return
if self._outbound_queue is not None:
await self._outbound_queue.put(payload)
async def _handle_incoming_package(self, package: Dict[str, Any]) -> None:
"""处理 MaiBot 返回给 abot 的消息包。"""
package_type = str(package.get("type", "") or "")
package_id = str(package.get("msg_id", "") or "")
if package_type == "sys_ack":
acked_msg_id = str(((package.get("meta") or {}).get("acked_msg_id")) or "")
self._log_runtime(f"[{self.name}] 收到 MaiBot ACK: package_id={package_id}, acked_msg_id={acked_msg_id}")
return
if package_type != "sys_std":
self._log_runtime(f"[{self.name}] 忽略非 sys_std 消息: package_type={package_type}, package_id={package_id}")
return
api_message = package.get("payload") or {}
message_info = api_message.get("message_info") or {}
message_dim = api_message.get("message_dim") or {}
message_segment = api_message.get("message_segment") or {}
reply_text = self._extract_segment_text(message_segment).strip()
if not reply_text:
self._log_runtime(
f"[{self.name}] MaiBot 返回了空文本或非文本片段,忽略发送: package_id={package_id}, "
f"segment_type={message_segment.get('type')}"
)
return
route = self._resolve_reply_route(message_info)
if route is None:
self.LOG.warning(
f"[{self.name}] 无法从 MaiBot 返回消息中解析路由,忽略发送: package_id={package_id}, "
f"message_info={message_info}"
)
return
self._log_runtime(
f"[{self.name}] 收到 MaiBot 回复: package_id={package_id}, route_type={route['route_type']}, "
f"target={route['target']}, at_target={route.get('at_target', '')}, "
f"platform={message_dim.get('platform')}, reply_preview={reply_text[:120]}"
)
await self._emit_reply(route=route, reply_text=reply_text, api_message=api_message)
async def _emit_reply(self, route: Dict[str, str], reply_text: str, api_message: Dict[str, Any]) -> None:
"""按配置决定是否把 MaiBot 回复真正发回微信。"""
if not self._enable_reply_output:
self._log_runtime(f"[{self.name}] enable_reply_output=false仅采集不发回微信")
return
bot = self._last_bot or getattr(self, "bot", None)
if bot is None:
self.LOG.warning(f"[{self.name}] 当前没有可用的 bot 实例,无法发送 MaiBot 回复")
return
target = str(route.get("target", "") or "")
route_type = str(route.get("route_type", "") or "")
at_target = str(route.get("at_target", "") or "")
if route_type == "group":
if not self._reply_group_messages:
self._log_runtime(f"[{self.name}] reply_group_messages=false群回复已跳过: target={target}")
return
if self._respect_group_feature_switch and not self._group_reply_allowed(target):
self._log_runtime(f"[{self.name}] 群功能开关未启用,仅采集不回群: target={target}")
return
if self._mention_user_on_group_reply and at_target:
await bot.send_at_message(target, reply_text, [at_target])
else:
await bot.send_text_message(target, reply_text, at_target if at_target else "")
self._log_runtime(
f"[{self.name}] 已发出 MaiBot 群回复: target={target}, at_target={at_target}, "
f"reply_len={len(reply_text)}"
)
return
if route_type == "private":
if not self._reply_private_messages:
self._log_runtime(f"[{self.name}] reply_private_messages=false私聊回复已跳过: target={target}")
return
await bot.send_text_message(target, reply_text, "")
self._log_runtime(f"[{self.name}] 已发出 MaiBot 私聊回复: target={target}, reply_len={len(reply_text)}")
return
self.LOG.warning(f"[{self.name}] 未知路由类型,无法发送 MaiBot 回复: route={route}, api_message={api_message}")
def _group_reply_allowed(self, roomid: str) -> bool:
"""群开关只影响“说不说”,不影响“收不收”。"""
if not roomid or not self.feature:
return True
try:
permission = GroupBotManager.get_group_permission(roomid, self.feature)
return permission != PermissionStatus.DISABLED
except Exception:
# 这里故意保守放行:
# 1. 如果权限系统异常,我们不希望把功能直接打成完全失效;
# 2. 同时上层已有总开关 enable_reply_output 可兜底;
# 3. 真出问题时,日志里会保留异常栈便于后续修复。
self.LOG.exception(f"[{self.name}] 读取群功能权限失败,默认允许发群回复: roomid={roomid}")
return True
def _build_outbound_payload(self, message: Dict[str, Any], normalized_content: str) -> Dict[str, Any]:
"""把微信消息包装成 MaiBot 官方 API Server 的 APIMessageBase 结构。"""
full_msg = message.get("full_wx_msg")
# 会话路由统一在这里解析避免“can_process 一套、转发又一套”导致判定漂移。
chat_route = self._resolve_chat_route(message)
roomid = chat_route["roomid"]
route_type = chat_route["route_type"]
route_source = chat_route["route_source"]
sender = str(message.get("sender", "") or "").strip()
msg_type = self._resolve_message_type(message)
timestamp = self._resolve_message_timestamp(message, full_msg)
message_id = self._resolve_message_id(message, full_msg)
sender_name = self._resolve_sender_name(message, sender)
group_name = self._resolve_group_name(message, roomid)
# 路由诊断日志:这里把关键字段一次性打全,便于定位“群消息为何被识别成私聊”。
self._log_runtime(
f"[{self.name}] 路由判定: route_type={route_type}, route_source={route_source}, "
f"plugin_roomid={str(message.get('roomid', '') or '').strip()}, "
f"wx_roomid={str(getattr(full_msg, 'roomid', '') or '').strip() if isinstance(full_msg, WxMessage) else ''}, "
f"wx_to_user={str(getattr(full_msg, 'to_user', '') or '').strip() if isinstance(full_msg, WxMessage) else ''}, "
f"sender={sender}, message_id={message_id}"
)
# 按 MaiBot 官方 BaseMessageInfo 结构组装:
# 1. user_info消息“发送者”的用户信息群聊/私聊都必须存在);
# 2. group_info仅群聊存在私聊应为 None/缺省;
# 3. 不再发送 sender_info/receiver_info 这套旧字段,避免服务端按私聊路径兜底解析。
user_info: Dict[str, Any] = {
"platform": self._platform_name,
"user_id": sender,
"user_nickname": sender_name,
}
group_info: Optional[Dict[str, Any]] = None
if roomid:
group_info = {
"platform": self._platform_name,
"group_id": roomid,
"group_name": group_name,
}
api_message = {
"message_info": {
"platform": self._platform_name,
"message_id": message_id,
"time": timestamp,
"format_info": {
"content_format": ["text"],
"accept_format": ["text"],
},
"additional_config": {
"source": "abot_maibot_adapter",
"is_at": bool(message.get("is_at", False)),
"wx_message_type": msg_type,
"collect_only": True,
# 额外把本地路由判定结果透传给 MaiBot便于服务端/日志排查“为何被识别成私聊”。
"abot_route_type": route_type,
"abot_route_source": route_source,
},
"user_info": user_info,
"group_info": group_info,
},
"message_segment": {
"type": "text",
"data": normalized_content,
},
"message_dim": {
"api_key": self._api_key,
"platform": self._platform_name,
},
}
return {
"route_type": route_type,
"route_source": route_source,
"roomid": roomid,
"sender": sender,
"message_type": msg_type,
"normalized_content": normalized_content,
"retry_count": 0,
"api_message": api_message,
}
def _resolve_chat_route(self, message: Dict[str, Any]) -> Dict[str, str]:
"""解析消息的群聊/私聊路由,兼容上游字段缺失场景。"""
full_msg = message.get("full_wx_msg")
# 先看插件消息里已经带好的 roomid这是最直接、最便宜的一跳。
# 按你现在的业务约定:只要 roomid 非空,就视为群聊。
# 这样可以避免某些平台包体里 roomid 不是 @chatroom 结尾时被误判成私聊。
plugin_roomid = str(message.get("roomid", "") or "").strip()
if plugin_roomid:
return {"is_group": True, "route_type": "group", "roomid": plugin_roomid, "route_source": "plugin_roomid_non_empty"}
# 再看 WxMessage 里计算过的 roomid。
wx_roomid = ""
wx_to_user = ""
wx_from_group = False
if isinstance(full_msg, WxMessage):
wx_roomid = str(getattr(full_msg, "roomid", "") or "").strip()
wx_to_user = str(getattr(full_msg, "to_user", "") or "").strip()
try:
wx_from_group = bool(full_msg.from_group())
except Exception:
wx_from_group = False
if wx_roomid.endswith("@chatroom"):
return {"is_group": True, "route_type": "group", "roomid": wx_roomid, "route_source": "wx_roomid"}
# 某些边界消息里roomid 可能没落下来,但 to_user 仍是 chatroom。
if wx_to_user.endswith("@chatroom"):
return {"is_group": True, "route_type": "group", "roomid": wx_to_user, "route_source": "wx_to_user"}
# 最后兜底原始报文,避免上游字段偶发缺失时把群消息误判成私聊。
raw_data = getattr(full_msg, "raw_data", {}) if isinstance(full_msg, WxMessage) else {}
raw_from = ""
raw_to = ""
if isinstance(raw_data, dict):
raw_from = str(((raw_data.get("FromUserName") or {}).get("string")) or "").strip()
raw_to = str(((raw_data.get("ToUserName") or {}).get("string")) or "").strip()
if raw_from.endswith("@chatroom"):
return {"is_group": True, "route_type": "group", "roomid": raw_from, "route_source": "raw_from_user"}
if raw_to.endswith("@chatroom"):
return {"is_group": True, "route_type": "group", "roomid": raw_to, "route_source": "raw_to_user"}
# 到这里仍不是群聊,就按私聊处理,但保留详细来源,方便日志反查。
private_source = "wx_from_group_flag" if wx_from_group else "private_default"
return {"is_group": False, "route_type": "private", "roomid": "", "route_source": private_source}
def _resolve_reply_route(self, message_info: Dict[str, Any]) -> Optional[Dict[str, str]]:
"""从 MaiBot 返回的 message_info 中解析微信路由。"""
# 优先按官方当前结构解析:
# 1. group_info 存在 => 群回复;
# 2. user_info 存在 => 私聊回复。
root_group_info = message_info.get("group_info") or {}
root_user_info = message_info.get("user_info") or {}
group_id = str(root_group_info.get("group_id", "") or "").strip()
root_user_id = str(root_user_info.get("user_id", "") or "").strip()
if group_id:
return {
"route_type": "group",
"target": group_id,
"at_target": root_user_id,
}
if root_user_id:
return {
"route_type": "private",
"target": root_user_id,
"at_target": "",
}
# 兼容历史 sender_info/receiver_info 结构,避免升级期间回包偶发失配。
receiver_info = message_info.get("receiver_info") or {}
sender_info = message_info.get("sender_info") or {}
receiver_group_info = receiver_info.get("group_info") or {}
receiver_user_info = receiver_info.get("user_info") or {}
sender_user_info = sender_info.get("user_info") or {}
group_id = str(receiver_group_info.get("group_id", "") or "").strip()
receiver_user_id = str(receiver_user_info.get("user_id", "") or "").strip()
sender_user_id = str(sender_user_info.get("user_id", "") or "").strip()
if group_id:
return {
"route_type": "group",
"target": group_id,
"at_target": receiver_user_id or sender_user_id,
}
if receiver_user_id:
return {
"route_type": "private",
"target": receiver_user_id,
"at_target": "",
}
if sender_user_id:
return {
"route_type": "private",
"target": sender_user_id,
"at_target": "",
}
return None
def _normalize_message_content(self, message: Dict[str, Any]) -> str:
"""将不同微信消息类型规整成适合 MaiBot 理解的文本。"""
full_msg = message.get("full_wx_msg")
msg_type = self._resolve_message_type(message)
if isinstance(full_msg, WxMessage):
if full_msg.msg_type == MessageType.TEXT:
return str(full_msg.content.clean_content or "").strip()
if full_msg.msg_type == MessageType.IMAGE:
image_content = full_msg.get_image_content()
if image_content:
return f"[图片] md5={image_content.md5} size={image_content.length}"
return "[图片]"
if full_msg.msg_type == MessageType.VOICE:
voice_content = full_msg.get_voice_content()
if voice_content:
return f"[语音] 时长={voice_content.voice_length}ms"
return "[语音]"
if full_msg.msg_type == MessageType.VIDEO:
video_content = full_msg.get_video_content()
if video_content:
return f"[视频] 时长={video_content.play_length}ms size={video_content.length}"
return "[视频]"
if full_msg.msg_type == MessageType.LOCATION:
location_content = full_msg.get_location_content()
if location_content:
return f"[位置] {location_content.label}"
return "[位置]"
if full_msg.msg_type == MessageType.APP:
app_type = full_msg.get_app_message_type()
if app_type == AppMessageType.LINK:
return f"[链接分享] {full_msg.content.clean_content or ''}".strip()
if app_type == AppMessageType.MINIPROGRAM:
return "[小程序]"
if app_type == AppMessageType.FILE:
return "[文件]"
if app_type == AppMessageType.QUOTE:
return f"[引用消息] {full_msg.content.clean_content or ''}".strip()
return f"[应用消息] {app_type.name if app_type else msg_type}"
if full_msg.msg_type in (MessageType.EMOTICON, MessageType.EMOJI):
return "[表情]"
if full_msg.msg_type in (MessageType.SYSTEM, MessageType.SYSTEM_NOTIFY, MessageType.RECALLED):
content = str(full_msg.content.clean_content or full_msg.content.raw_content or "").strip()
return f"[系统消息] {content}".strip()
raw_content = str(message.get("content", "") or "").strip()
if raw_content:
return raw_content
return f"[{msg_type}]"
def _resolve_message_type(self, message: Dict[str, Any]) -> str:
"""统一导出消息类型名称,方便日志与 additional_config 使用。"""
full_msg = message.get("full_wx_msg")
if isinstance(full_msg, WxMessage) and full_msg.msg_type:
return str(full_msg.msg_type.name or "UNKNOWN")
msg_type = message.get("type")
if isinstance(msg_type, MessageType):
return str(msg_type.name or "UNKNOWN")
return str(msg_type or "UNKNOWN")
def _resolve_message_timestamp(self, message: Dict[str, Any], full_msg: Any) -> float:
"""优先使用微信原始消息时间,缺失时再回落到当前时间。"""
if isinstance(full_msg, WxMessage) and getattr(full_msg, "create_time", 0):
try:
return float(full_msg.create_time)
except (TypeError, ValueError):
pass
raw_timestamp = message.get("timestamp")
if raw_timestamp not in (None, ""):
try:
return float(raw_timestamp)
except (TypeError, ValueError):
pass
return time.time()
def _resolve_message_id(self, message: Dict[str, Any], full_msg: Any) -> str:
"""尽量保留微信原始 msg_id便于服务端做去重与日志排查。"""
if isinstance(full_msg, WxMessage) and getattr(full_msg, "msg_id", None):
return str(full_msg.msg_id)
roomid = str(message.get("roomid", "") or "")
sender = str(message.get("sender", "") or "")
return f"abot_{roomid}_{sender}_{uuid.uuid4().hex[:12]}"
def _resolve_sender_name(self, message: Dict[str, Any], sender: str) -> str:
"""从现有上下文里尽量拿到更友好的显示名。"""
all_contacts = message.get("all_contacts", {}) or {}
sender_name = str(all_contacts.get(sender, "") or "").strip()
if sender_name:
return sender_name
return sender or "unknown"
def _resolve_group_name(self, message: Dict[str, Any], roomid: str) -> str:
"""群名称仅作为增强上下文,不影响业务路由。"""
if not roomid:
return ""
all_contacts = message.get("all_contacts", {}) or {}
return str(all_contacts.get(roomid, "") or roomid)
def _extract_segment_text(self, segment: Dict[str, Any]) -> str:
"""把 MaiBot 返回的消息片段递归拼成文本。"""
segment_type = str(segment.get("type", "") or "")
data = segment.get("data")
if segment_type == "text":
return str(data or "")
if segment_type == "seglist" and isinstance(data, list):
parts: List[str] = []
for item in data:
if isinstance(item, dict):
parts.append(self._extract_segment_text(item))
return "".join(parts)
if segment_type == "image":
return "[图片]"
if segment_type == "emoji":
return "[表情]"
return str(data or "")
def _parse_json_message(self, raw_text: str) -> Optional[Dict[str, Any]]:
"""统一处理 JSON 解析异常,避免接收循环被坏包打断。"""
try:
payload = json.loads(raw_text)
except json.JSONDecodeError:
self.LOG.warning(f"[{self.name}] 收到非法 JSON已忽略: {raw_text[:300]}")
return None
if not isinstance(payload, dict):
self.LOG.warning(f"[{self.name}] 收到非对象 JSON已忽略: {payload}")
return None
return payload
def _log_runtime(self, message: str) -> None:
"""根据插件配置动态输出运行日志级别。"""
if self._log_level == "DEBUG":
self.LOG.debug(message)
return
self.LOG.info(message)
def _is_supported_message(self, message: Dict[str, Any]) -> bool:
"""仅接收 MaiBot 当前最适合做上下文理解的消息类型。"""
full_msg = message.get("full_wx_msg")
if isinstance(full_msg, WxMessage):
return full_msg.msg_type in {
MessageType.TEXT,
MessageType.IMAGE,
MessageType.VOICE,
MessageType.VIDEO,
MessageType.LOCATION,
MessageType.APP,
MessageType.EMOTICON,
MessageType.EMOJI,
MessageType.SYSTEM,
MessageType.SYSTEM_NOTIFY,
MessageType.RECALLED,
}
msg_type = message.get("type")
if isinstance(msg_type, MessageType):
return True
return bool(message.get("content"))
async def _close_websocket_only(self) -> None:
"""关闭当前 websocket但保留 session 供重连复用。"""
if self._websocket is None:
return
try:
await self._websocket.close()
except Exception as exc:
self.LOG.warning(f"[{self.name}] 关闭 WebSocket 失败: {exc}")
finally:
self._websocket = None
async def _close_session(self) -> None:
"""关闭 aiohttp session。"""
await self._close_websocket_only()
if self._client_session is None:
return
try:
await self._client_session.close()
except Exception as exc:
self.LOG.warning(f"[{self.name}] 关闭 ClientSession 失败: {exc}")
finally:
self._client_session = None