Files
WeChatHookBot/WechatHook/message_types.py

432 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
消息类型定义和映射
定义新协议 API 的消息类型常量,以及到内部事件的映射关系
"""
import json
import xml.etree.ElementTree as ET
class MessageType:
"""消息类型常量(基于新协议)"""
# 内部消息类型码(与旧协议兼容)
MT_DEBUG_LOG = 11024 # 调试日志
MT_USER_LOGIN = 11025 # 用户登录
MT_USER_LOGOUT = 11026 # 用户登出
MT_GET_LOGIN_INFO = 11028 # 获取登录信息
# 消息通知类型
MT_TEXT = 11046 # 文本消息
MT_IMAGE = 11047 # 图片消息
MT_VOICE = 11048 # 语音消息
MT_VIDEO = 11051 # 视频消息
MT_EMOJI = 11052 # 表情消息
MT_REVOKE = 11057 # 撤回消息
MT_SYSTEM = 11058 # 系统消息
MT_QUOTE = 11061 # 引用消息
MT_FRIEND_REQUEST = 11056 # 好友请求
MT_LOCATION = 11053 # 位置消息
MT_LINK = 11054 # 链接消息
MT_FILE = 11055 # 文件消息
MT_CARD = 11062 # 名片消息
MT_MINIAPP = 11063 # 小程序消息
MT_UNKNOWN = 11999 # 未知消息
# 群聊通知类型
MT_CHATROOM_MEMBER_ADD = 11098 # 群成员新增
MT_CHATROOM_MEMBER_REMOVE = 11099 # 群成员删除
MT_CHATROOM_INFO_CHANGE = 11100 # 群信息变化
MT_CHATROOM_MEMBER_NICKNAME_CHANGE = 11101 # 群成员昵称修改
# 发送消息类型
MT_SEND_TEXT = 11036 # 发送文本
# 微信原始消息类型到内部类型的映射
WECHAT_MSG_TYPE_MAP = {
"1": MessageType.MT_TEXT, # 文本
"3": MessageType.MT_IMAGE, # 图片
"34": MessageType.MT_VOICE, # 语音
"37": MessageType.MT_FRIEND_REQUEST, # 好友请求
"42": MessageType.MT_CARD, # 名片
"43": MessageType.MT_VIDEO, # 视频
"47": MessageType.MT_EMOJI, # 表情
"48": MessageType.MT_LOCATION, # 位置
"49": MessageType.MT_LINK, # AppMsg需结合 XML 细分
"10000": MessageType.MT_SYSTEM, # 系统消息
"10002": MessageType.MT_REVOKE, # 撤回消息
}
# 消息类型到事件名称的映射
MESSAGE_TYPE_MAP = {
MessageType.MT_TEXT: "text_message",
MessageType.MT_IMAGE: "image_message",
MessageType.MT_VOICE: "voice_message",
MessageType.MT_VIDEO: "video_message",
MessageType.MT_EMOJI: "emoji_message",
MessageType.MT_REVOKE: "revoke_message",
MessageType.MT_SYSTEM: "system_message",
MessageType.MT_FRIEND_REQUEST: "friend_request",
MessageType.MT_QUOTE: "quote_message",
MessageType.MT_CHATROOM_MEMBER_ADD: "chatroom_member_add",
MessageType.MT_CHATROOM_MEMBER_REMOVE: "chatroom_member_remove",
MessageType.MT_CHATROOM_INFO_CHANGE: "chatroom_info_change",
MessageType.MT_CHATROOM_MEMBER_NICKNAME_CHANGE: "chatroom_member_nickname_change",
MessageType.MT_LOCATION: "location_message",
MessageType.MT_LINK: "link_message",
MessageType.MT_FILE: "file_message",
MessageType.MT_CARD: "card_message",
MessageType.MT_MINIAPP: "miniapp_message",
MessageType.MT_UNKNOWN: "other_message",
}
def _extract_string(value) -> str:
"""
提取字符串值
Args:
value: 可能是 dict 或 str
Returns:
字符串值
"""
if isinstance(value, dict):
return value.get("String", "")
return str(value) if value else ""
def _ensure_dict(value) -> dict:
"""确保返回 dict兼容字符串/空值)"""
if isinstance(value, dict):
return value
if isinstance(value, str):
text = value.strip()
if text.startswith("{") or text.startswith("["):
try:
parsed = json.loads(text)
if isinstance(parsed, dict):
return parsed
except Exception:
pass
return {}
def _strip_group_prefix(content: str) -> str:
"""去掉群聊消息里可能带的 `wxid:\n` 前缀。"""
if not content or not isinstance(content, str):
return ""
xml_start = content.find("<?xml")
if xml_start == -1:
xml_start = content.find("<msg")
if xml_start == -1:
xml_start = content.find("<appmsg")
if xml_start > 0:
return content[xml_start:]
return content
def _parse_appmsg_meta(content) -> dict:
"""解析 `msgType=49` 的 XML提取 appmsg 元信息。"""
xml_content = _strip_group_prefix(_extract_string(content))
if not xml_content or "<" not in xml_content:
return {}
try:
root = ET.fromstring(xml_content)
except Exception:
return {}
appmsg = root.find(".//appmsg")
if appmsg is None and root.tag == "appmsg":
appmsg = root
if appmsg is None:
return {}
appattach = appmsg.find("appattach")
title = (appmsg.findtext("title", "") or "").strip()
desc = (appmsg.findtext("des", "") or appmsg.findtext("description", "") or "").strip()
url = (appmsg.findtext("url", "") or "").strip()
thumb_url = (appmsg.findtext("thumburl", "") or "").strip()
appmsg_type = (appmsg.findtext("type", "") or "").strip()
file_name = ""
file_ext = ""
if appattach is not None:
file_name = (appattach.findtext("filename", "") or title).strip()
file_ext = (appattach.findtext("fileext", "") or "").strip()
return {
"xml": xml_content,
"appmsg_type": appmsg_type,
"title": title,
"desc": desc,
"url": url,
"thumb_url": thumb_url,
"has_weappinfo": appmsg.find("weappinfo") is not None,
"has_appattach": appattach is not None,
"file_name": file_name,
"file_ext": file_ext,
}
def _resolve_appmsg_internal_type(appmsg_meta: dict) -> int:
"""根据 appmsg 元信息判断 49 消息的真实内部类型。"""
appmsg_type = str(appmsg_meta.get("appmsg_type", "")).strip()
xml_content = appmsg_meta.get("xml", "")
if appmsg_type == "57":
return MessageType.MT_QUOTE
if appmsg_type in {"33", "36"} or appmsg_meta.get("has_weappinfo"):
return MessageType.MT_MINIAPP
if appmsg_type == "6" or appmsg_meta.get("has_appattach"):
return MessageType.MT_FILE
if appmsg_type in {"5", "19"}:
return MessageType.MT_LINK
if "<appattach" in xml_content:
return MessageType.MT_FILE
if "<weappinfo" in xml_content:
return MessageType.MT_MINIAPP
return MessageType.MT_LINK
def normalize_message(msg_type: int, data: dict) -> dict:
"""
将新协议的消息格式转换为统一的内部格式
Args:
msg_type: 内部消息类型码
data: 原始消息数据(来自 HTTP 回调)
Returns:
标准化的消息字典
"""
# 判断消息来源类型
message_type_field = data.get("messageType", "")
is_group = message_type_field == "群聊消息"
# 如果没有 messageType 字段,根据 fromUserName 判断
if not message_type_field:
from_user = _extract_string(data.get("fromUserName", {}))
is_group = from_user.endswith("@chatroom")
# 提取基础字段
from_user = _extract_string(data.get("fromUserName", {}))
to_user = _extract_string(data.get("toUserName", {}))
content = _extract_string(data.get("content", {}))
appmsg_meta = _parse_appmsg_meta(content) if str(data.get("msgType", "")) == "49" else {}
# 群聊消息的真实内容
real_content = data.get("real_content", "")
if is_group and not real_content:
# 从 content 中提取格式wxid:\n实际内容
if ":\n" in content:
parts = content.split(":\n", 1)
if len(parts) == 2:
real_content = parts[1]
else:
real_content = content
# 构建标准消息
message = {
"MsgType": msg_type,
"MsgId": data.get("newMsgId", "") or str(data.get("msgId", "")),
"FromWxid": from_user,
"ToWxid": to_user,
"Content": real_content if is_group else content,
"CreateTime": int(data.get("createTime", 0)),
"IsGroup": is_group,
"RawMsgType": str(data.get("msgType", "1")),
}
if appmsg_meta:
message["AppMsgType"] = appmsg_meta.get("appmsg_type", "")
# 群聊消息处理
if is_group:
message["RoomWxid"] = from_user
# 提取发送者信息 - 优先使用 room_sender_by 字段
room_sender_by = _extract_string(data.get("room_sender_by", ""))
member_info = _ensure_dict(data.get("member_info"))
if room_sender_by:
message["SenderWxid"] = room_sender_by
elif member_info:
message["SenderWxid"] = member_info.get("userName", "")
else:
# 从 content 中提取发送者 wxid
if ":\n" in content:
sender_wxid = content.split(":\n")[0]
message["SenderWxid"] = sender_wxid
else:
message["SenderWxid"] = ""
# 发送者昵称 - 从 newChatroomData 中查找
sender_profile = _ensure_dict(data.get("sender_profile"))
new_chatroom_data = _ensure_dict(sender_profile.get("newChatroomData"))
chatroom_members = new_chatroom_data.get("chatRoomMember") or []
if not isinstance(chatroom_members, list):
chatroom_members = []
sender_wxid = message.get("SenderWxid", "")
for member in chatroom_members:
if member.get("userName") == sender_wxid:
message["SenderNickname"] = member.get("nickName", "")
break
else:
if member_info:
message["SenderNickname"] = member_info.get("nickName", "")
else:
message["SenderNickname"] = ""
message["SenderAvatar"] = member_info.get("bigHeadImgUrl", "") if member_info else ""
else:
# 私聊消息
message["SenderWxid"] = from_user
message["RoomWxid"] = ""
# 提取发送者信息
sender_profile = _ensure_dict(data.get("sender_profile"))
if sender_profile:
message["SenderNickname"] = _extract_string(sender_profile.get("nickName", {}))
message["SenderAvatar"] = sender_profile.get("bigHeadImgUrl", "")
else:
message["SenderNickname"] = data.get("sender_nick", "")
message["SenderAvatar"] = ""
# @ 消息处理
content_to_check = real_content if is_group else content
if "@" in content_to_check:
message["IsAtMessage"] = True
# 解析 @ 列表(简单实现)
# TODO: 从 msgSource XML 中解析完整的 @ 列表
# 图片消息
if msg_type == MessageType.MT_IMAGE:
message["ImgStatus"] = data.get("imgStatus", 0)
img_buf = data.get("imgBuf", {})
if img_buf:
message["ImgLen"] = img_buf.get("iLen", 0)
# 文件消息
if msg_type == MessageType.MT_FILE:
message["Filename"] = (
appmsg_meta.get("file_name")
or _extract_string(data.get("filename", ""))
or appmsg_meta.get("title", "")
)
message["FileExtend"] = appmsg_meta.get("file_ext") or _extract_string(data.get("file_extend", ""))
# 语音消息
if msg_type == MessageType.MT_VOICE:
message["VoiceLength"] = data.get("voiceLength", 0)
# 视频消息
if msg_type == MessageType.MT_VIDEO:
message["VideoLength"] = data.get("playLength", 0)
# 引用消息
if msg_type == MessageType.MT_QUOTE:
# 尝试解析引用内容
try:
import xml.etree.ElementTree as ET
if content:
root = ET.fromstring(content)
title = root.find(".//title")
if title is not None and title.text:
message["QuoteTitle"] = title.text
refermsg = root.find(".//refermsg")
if refermsg is not None:
message["QuoteContent"] = refermsg.findtext("content", "")
message["QuoteSender"] = refermsg.findtext("fromusr", "")
except Exception:
pass
if msg_type == MessageType.MT_CARD:
message["CardWxid"] = _extract_string(data.get("recommend_wxid", ""))
message["CardNickname"] = (
_extract_string(data.get("recommend_nickname", ""))
or _extract_string(data.get("title", ""))
)
# 位置消息
if msg_type == MessageType.MT_LOCATION:
message["Latitude"] = data.get("latitude", 0)
message["Longitude"] = data.get("longitude", 0)
message["LocationTitle"] = data.get("title", "")
message["LocationAddress"] = data.get("address", "")
# 链接消息
if msg_type == MessageType.MT_LINK:
message["LinkTitle"] = appmsg_meta.get("title") or _extract_string(data.get("title", ""))
message["LinkDesc"] = appmsg_meta.get("desc") or _extract_string(data.get("desc", ""))
message["LinkUrl"] = appmsg_meta.get("url") or _extract_string(data.get("url", ""))
message["LinkThumb"] = appmsg_meta.get("thumb_url") or _extract_string(data.get("thumb_url", ""))
if msg_type == MessageType.MT_MINIAPP:
message["MiniAppTitle"] = appmsg_meta.get("title") or _extract_string(data.get("title", ""))
message["MiniAppDesc"] = appmsg_meta.get("desc") or _extract_string(data.get("desc", ""))
message["MiniAppUrl"] = appmsg_meta.get("url") or _extract_string(data.get("url", ""))
message["MiniAppThumb"] = appmsg_meta.get("thumb_url") or _extract_string(data.get("thumb_url", ""))
# 好友请求
if msg_type == MessageType.MT_FRIEND_REQUEST:
message["V3"] = data.get("v3", "")
message["V4"] = data.get("v4", "")
message["Scene"] = data.get("scene", 0)
# 系统消息
if msg_type == MessageType.MT_SYSTEM:
message["Content"] = content
# 保留原始数据
message["_raw"] = data
return message
def get_internal_msg_type(wechat_msg_type: str, data: dict = None) -> int:
"""
将微信消息类型转换为内部消息类型
Args:
wechat_msg_type: 微信消息类型(字符串)
Returns:
内部消息类型码
"""
wechat_msg_type = str(wechat_msg_type)
if wechat_msg_type == "49" and data is not None:
appmsg_meta = _parse_appmsg_meta(data.get("content", ""))
if appmsg_meta:
return _resolve_appmsg_internal_type(appmsg_meta)
return MessageType.MT_LINK
return WECHAT_MSG_TYPE_MAP.get(wechat_msg_type, MessageType.MT_UNKNOWN)
def normalize_from_callback(message_type: str, data: dict) -> dict:
"""
从 HTTP 回调数据标准化消息
Args:
message_type: 消息来源类型 (private_message/group_message)
data: 原始回调数据
Returns:
标准化的消息字典
"""
# 获取微信消息类型
wechat_msg_type = str(data.get("msgType", "1"))
# 转换为内部类型
internal_type = get_internal_msg_type(wechat_msg_type, data)
# 调用通用标准化函数
return normalize_message(internal_type, data)