Files
abot/gewechat/call_back_message/message.py
2025-04-22 14:13:17 +08:00

387 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from dataclasses import dataclass
from typing import Optional, Dict, Any, re
from enum import Enum
import xml.etree.ElementTree as ET
import json
import main
class MessageType(Enum):
"""消息类型枚举"""
UNKNOWN = 0 # 未知类型
TEXT = 1 # 文本消息
IMAGE = 3 # 图片消息
VOICE = 34 # 语音消息
VERIFY_MSG = 37 # 好友确认消息
POSSIBLE_FRIEND_MSG = 40 # 好友推荐消息
SHARE_CARD = 42 # 名片消息
VIDEO = 43 # 视频消息
EMOTICON = 47 # 动画表情
LOCATION = 48 # 位置消息
APP = 49 # 应用消息(链接、音乐、小程序等)
VOIP_MSG = 50 # VOIP消息
STATUS_NOTIFY = 51 # 状态通知
SYSTEM = 10000 # 系统消息
SYSTEM_NOTIFY = 10002 # 系统通知
RECALLED = 10002 # 撤回消息
EMOJI = 1090519089 # 大表情
class AppMessageType(Enum):
"""应用消息类型枚举"""
UNKNOWN = 0 # 未知类型
TEXT = 1 # 文本
IMG = 2 # 图片
AUDIO = 3 # 音频
VIDEO = 4 # 视频
LINK = 5 # 链接消息
FILE = 6 # 文件
QUOTE = 57 # 引用
EMOJI = 8 # 表情
LOCATION = 17 # 位置
APP_MSG = 33 # APP消息
MINIPROGRAM = 36 # 小程序
TRANSFER = 2000 # 转账
RED_PACKET = 2001 # 红包
CARD_TICKET = 2002 # 卡券
REAL_TIME_LOCATION_START = 17 # 实时位置共享开始
REAL_TIME_LOCATION_STOP = 18 # 实时位置共享结束
CARD = 42 # 名片
VOICE_REMIND = 43 # 语音提醒
FILE_NOTICE = 74 # 文件通知
CHANNELS = 51 # 视频号消息
@dataclass
class MessageContent:
"""消息内容"""
raw_content: str # 原始内容
xml_content: str = "" # XML内容(如果有)
clean_content: str = "" # 清理后的内容(去除发信人信息)
sender: str = "" # 发信人wxid
def __post_init__(self):
"""处理XML内容和清理发信人信息"""
# 处理XML内容
# 清理发信人信息
self.clean_content = self.clean_sender_info(self.raw_content)
if self.clean_content.startswith('<?xml') or self.clean_content.startswith('<msg'):
try:
self.xml_content = self.clean_content
except ET.ParseError:
self.xml_content = ""
def clean_sender_info(self, content: str) -> str:
"""清理内容中的发信人信息"""
if not content:
return ""
import re
# 如果有发信人信息,优先使用发信人信息进行清理
if self.sender:
# 尝试移除发信人前缀包括昵称和wxid两种情况
patterns = [
f"^{self.sender}[:]\\s*\\n", # wxid格式
f"^[^\\n]+?\\({self.sender}\\)[:]\\s*\\n", # 昵称(wxid)格式
f"^[^\\n]+?<{self.sender}>[:]\\s*\\n", # 昵称<wxid>格式
]
for pattern in patterns:
content = re.sub(pattern, '', content)
# 通用清理规则(用于处理其他可能的格式)
patterns = [
r'^wxid_[a-zA-Z0-9_]+[:]\s*\n', # wxid格式
r'^[^:\n]+\([^)]+\)[:]\s*\n', # 昵称(wxid)格式
r'^[^:\n]+<[^>]+>[:]\s*\n', # 昵称<wxid>格式
r'^[^:\n]+[:]\s*\n', # 其他格式
]
for pattern in patterns:
content = re.sub(pattern, '', content)
return content.strip()
@dataclass
class ImageContent:
"""图片消息特定内容"""
aes_key: str
url: str
length: int
md5: str
thumb_base64: Optional[str] = None
@dataclass
class VoiceContent:
"""语音消息特定内容"""
voice_length: int
aes_key: str
url: str
voice_base64: Optional[str] = None
@dataclass
class VideoContent:
"""视频消息特定内容"""
aes_key: str
video_url: str
thumb_url: str
length: int
play_length: int
@dataclass
class LocationContent:
"""地理位置特定内容"""
x: float # 纬度
y: float # 经度
label: str # 地址标签
poi_name: Optional[str] = None # 地点名称
@dataclass
class WxMessage:
"""消息基础类"""
type_name: str
appid: str
wxid: str
msg_id: int
sender: str
to_user: str
roomid: str # 新增room_id属性
msg_type: MessageType
content: MessageContent
create_time: int
push_content: Optional[str]
new_msg_id: int
msg_seq: int
msg_source: str
raw_data: Dict[str, Any] # 原始JSON数据
@classmethod
def from_json(cls, json_data: Dict[str, Any]) -> 'WxMessage':
"""从JSON数据创建消息对象"""
data = json_data.get("Data", {})
to_user = data.get("ToUserName", {}).get("string", "")
# 获取原始内容和发信人
content_str = data.get("Content", {}).get("string", "")
sender = data.get("FromUserName", {}).get("string", "")
# 创建MessageContent对象时传入发信人信息
message_content = MessageContent(content_str, sender=sender)
return cls(
type_name=json_data.get("TypeName", ""),
appid=json_data.get("Appid", ""),
wxid=json_data.get("Wxid", ""),
msg_id=data.get("MsgId", 0),
sender=sender,
to_user=to_user,
roomid=to_user if to_user.endswith("@chatroom") else "",
msg_type=MessageType(data.get("MsgType", 0)),
content=message_content, # 使用包含发信人信息的MessageContent
create_time=data.get("CreateTime", 0),
push_content=data.get("PushContent"),
new_msg_id=data.get("NewMsgId", 0),
msg_seq=data.get("MsgSeq", 0),
msg_source=data.get("MsgSource", ""),
raw_data=json_data
)
def __str__(self) -> str:
"""返回消息的字符串表示,用于打印和日志"""
# 获取消息类型的名称
msg_type_name = self.msg_type.name if self.msg_type else "UNKNOWN"
# 处理不同类型的消息内容
content_str = ""
if self.msg_type == MessageType.TEXT:
# 文本消息直接显示清理后的内容
content_str = self.content.clean_content
elif self.msg_type == MessageType.IMAGE:
# 图片消息显示图片信息
img_content = self.get_image_content()
if img_content:
content_str = f"[图片] 大小: {img_content.length}字节, MD5: {img_content.md5}"
else:
content_str = "[图片]"
elif self.msg_type == MessageType.VOICE:
# 语音消息显示语音信息
voice_content = self.get_voice_content()
if voice_content:
content_str = f"[语音] 长度: {voice_content.voice_length}ms"
else:
content_str = "[语音]"
elif self.msg_type == MessageType.VIDEO:
# 视频消息显示视频信息
video_content = self.get_video_content()
if video_content:
content_str = f"[视频] 长度: {video_content.play_length}ms, 大小: {video_content.length}字节"
else:
content_str = "[视频]"
elif self.msg_type == MessageType.LOCATION:
# 位置消息显示位置信息
location_content = self.get_location_content()
if location_content:
content_str = f"[位置] {location_content.label}"
else:
content_str = "[位置]"
elif self.msg_type == MessageType.APP:
# 应用消息显示应用类型
app_type = self.get_app_message_type()
if app_type:
content_str = f"[应用消息] 类型: {app_type.name}"
else:
content_str = "[应用消息]"
elif self.msg_type == MessageType.EMOJI:
content_str = "[表情]"
elif self.msg_type == MessageType.SYSTEM:
content_str = f"[系统消息] {self.content.raw_content}"
elif self.msg_type == MessageType.SYSTEM_NOTIFY:
content_str = f"[系统通知] {self.content.raw_content}"
else:
# 其他类型消息
content_str = f"[未知类型消息] {self.content.raw_content[:30]}..."
# 限制内容长度,避免过长
if len(content_str) > 100:
content_str = content_str[:97] + "..."
# 构建基本信息
from_info = f"发送者: {self.sender}"
to_info = f"接收者: {self.to_user}"
# 如果是群消息,添加群信息
group_info = ""
if self.from_group():
group_info = f"群聊: {self.roomid}, "
# 构建完整的消息字符串
return (f"WxMessage[ID: {self.msg_id}, 类型: {msg_type_name}, "
f"{group_info}{from_info}, {to_info}, "
f"内容: {content_str}]")
def __repr__(self) -> str:
"""返回消息的详细表示,用于调试"""
return self.__str__()
def from_self(self) -> bool:
"""判断是否是自己发送的消息"""
return self.sender == self.wxid
def from_group(self) -> bool:
return self.to_user.endswith("@chatroom")
def is_at(self, wxid) -> bool:
"""是否被 @:群消息,在 @ 名单里,并且不是 @ 所有人"""
if not self.from_group():
return False # 只有群消息才能 @
if not re.findall(f"<atuserlist>[\s|\S]*({wxid})[\s|\S]*</atuserlist>", self.msg_source):
return False # 不在 @ 清单里
if re.findall(r"@(?:所有人|all|All)", self.content):
return False # 排除 @ 所有人
return True
def get_app_message_type(self) -> Optional[AppMessageType]:
"""获取应用消息类型"""
if self.msg_type != MessageType.APP or not self.content.xml_content:
return None
try:
appmsg = self.content.xml_content.find('.//appmsg')
if appmsg is not None:
type_value = int(appmsg.find('type').text)
return AppMessageType(type_value)
except (AttributeError, ValueError):
pass
return None
def get_image_content(self) -> Optional[ImageContent]:
"""获取图片消息内容"""
if self.msg_type != MessageType.IMAGE or not self.content.xml_content:
return None
try:
img = self.content.xml_content.find('img')
if img is not None:
return ImageContent(
aes_key=img.get('aeskey', ''),
url=img.get('cdnthumburl', ''),
length=int(img.get('length', 0)),
md5=img.get('md5', ''),
thumb_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer")
)
except (AttributeError, ValueError):
pass
return None
def get_voice_content(self) -> Optional[VoiceContent]:
"""获取语音消息内容"""
if self.msg_type != MessageType.VOICE or not self.content.xml_content:
return None
try:
voice = self.content.xml_content.find('.//voicemsg')
if voice is not None:
return VoiceContent(
voice_length=int(voice.get('voicelength', 0)),
aes_key=voice.get('aeskey', ''),
url=voice.get('voiceurl', ''),
voice_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer")
)
except (AttributeError, ValueError):
pass
return None
def get_video_content(self) -> Optional[VideoContent]:
"""获取视频消息内容"""
if self.msg_type != MessageType.VIDEO or not self.content.xml_content:
return None
try:
video = self.content.xml_content.find('.//videomsg')
if video is not None:
return VideoContent(
aes_key=video.get('aeskey', ''),
video_url=video.get('cdnvideourl', ''),
thumb_url=video.get('cdnthumburl', ''),
length=int(video.get('length', 0)),
play_length=int(video.get('playlength', 0))
)
except (AttributeError, ValueError):
pass
return None
def get_location_content(self) -> Optional[LocationContent]:
"""获取地理位置内容"""
if self.msg_type != MessageType.LOCATION or not self.content.xml_content:
return None
try:
location = self.content.xml_content.find('location')
if location is not None:
return LocationContent(
x=float(location.get('x', 0)),
y=float(location.get('y', 0)),
label=location.get('label', ''),
poi_name=location.get('poiname')
)
except (AttributeError, ValueError):
pass
return None
if __name__ == '__main__':
content_str = """wxid_g6vc38ifs1an22:\n1"""
content = MessageContent(content_str, sender="Jyunere")
print(content.raw_content)
print(content.xml_content)
print(content.clean_content)