384 lines
13 KiB
Python
384 lines
13 KiB
Python
from dataclasses import dataclass
|
||
from typing import Optional, Dict, Any
|
||
from enum import Enum
|
||
import xml.etree.ElementTree as ET
|
||
import re
|
||
|
||
class MessageType(Enum):
|
||
"""消息类型枚举"""
|
||
UNKNOWN = 0 # 未知类型
|
||
TEXT = 1 # 文本消息
|
||
IMAGE = 3 # 图片消息
|
||
VOICE = 34 # 语音消息
|
||
VERIFY_MSG = 37 # 好友确认消息
|
||
POSSIBLE_FRIEND_MSG = 40 # 好友推荐消息
|
||
SHARE_CARD = 42 # 名片消息
|
||
VIDEO = 43 # 视频消息
|
||
EMOTICON = 47 # 动画表情
|
||
LOCATION = 48 # 位置消息
|
||
APP = 49 # 应用消息(链接、音乐、小程序等)
|
||
VOIP_MSG = 50 # VOIP消息
|
||
STATUS_NOTIFY = 51 # 状态通知
|
||
SYSTEM = 10000 # 系统消息
|
||
SYSTEM_NOTIFY = 10002 # 系统通知
|
||
RECALLED = 10002 # 撤回消息
|
||
EMOJI = 1090519089 # 大表情
|
||
|
||
|
||
class AppMessageType(Enum):
|
||
"""应用消息类型枚举"""
|
||
UNKNOWN = 0 # 未知类型
|
||
TEXT = 1 # 文本
|
||
IMG = 2 # 图片
|
||
AUDIO = 3 # 音频
|
||
VIDEO = 4 # 视频
|
||
LINK = 5 # 链接消息
|
||
FILE = 6 # 文件
|
||
QUOTE = 57 # 引用
|
||
EMOJI = 8 # 表情
|
||
LOCATION = 17 # 位置
|
||
APP_MSG = 33 # APP消息
|
||
MINIPROGRAM = 36 # 小程序
|
||
TRANSFER = 2000 # 转账
|
||
RED_PACKET = 2001 # 红包
|
||
CARD_TICKET = 2002 # 卡券
|
||
REAL_TIME_LOCATION_START = 17 # 实时位置共享开始
|
||
REAL_TIME_LOCATION_STOP = 18 # 实时位置共享结束
|
||
CARD = 42 # 名片
|
||
VOICE_REMIND = 43 # 语音提醒
|
||
FILE_NOTICE = 74 # 文件通知
|
||
CHANNELS = 51 # 视频号消息
|
||
|
||
|
||
@dataclass
|
||
class MessageContent:
|
||
"""消息内容"""
|
||
raw_content: str # 原始内容
|
||
xml_content: str = "" # XML内容(如果有)
|
||
clean_content: str = "" # 清理后的内容(去除发信人信息)
|
||
sender: str = "" # 发信人wxid
|
||
|
||
def __post_init__(self):
|
||
"""处理XML内容和清理发信人信息"""
|
||
# 处理XML内容
|
||
# 清理发信人信息
|
||
self.clean_content = self.clean_sender_info(self.raw_content)
|
||
|
||
if self.clean_content.startswith('<?xml') or self.clean_content.startswith('<msg'):
|
||
try:
|
||
self.xml_content = self.clean_content
|
||
except ET.ParseError:
|
||
self.xml_content = ""
|
||
|
||
def clean_sender_info(self, content: str) -> str:
|
||
"""清理内容中的发信人信息"""
|
||
if not content:
|
||
return ""
|
||
|
||
import re
|
||
# 如果有发信人信息,优先使用发信人信息进行清理
|
||
if self.sender:
|
||
# 尝试移除发信人前缀(包括昵称和wxid两种情况)
|
||
patterns = [
|
||
f"^{self.sender}[::]\\s*\\n", # wxid格式
|
||
f"^[^\\n]+?\\({self.sender}\\)[::]\\s*\\n", # 昵称(wxid)格式
|
||
f"^[^\\n]+?<{self.sender}>[::]\\s*\\n", # 昵称<wxid>格式
|
||
]
|
||
for pattern in patterns:
|
||
content = re.sub(pattern, '', content)
|
||
|
||
# 通用清理规则(用于处理其他可能的格式)
|
||
patterns = [
|
||
r'^wxid_[a-zA-Z0-9_]+[::]\s*\n', # wxid格式
|
||
r'^[^::\n]+\([^)]+\)[::]\s*\n', # 昵称(wxid)格式
|
||
r'^[^::\n]+<[^>]+>[::]\s*\n', # 昵称<wxid>格式
|
||
r'^[^::\n]+[::]\s*\n', # 其他格式
|
||
]
|
||
|
||
for pattern in patterns:
|
||
content = re.sub(pattern, '', content)
|
||
|
||
return content.strip()
|
||
|
||
|
||
@dataclass
|
||
class ImageContent:
|
||
"""图片消息特定内容"""
|
||
aes_key: str
|
||
url: str
|
||
length: int
|
||
md5: str
|
||
thumb_base64: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class VoiceContent:
|
||
"""语音消息特定内容"""
|
||
voice_length: int
|
||
aes_key: str
|
||
url: str
|
||
voice_base64: Optional[str] = None
|
||
|
||
|
||
@dataclass
|
||
class VideoContent:
|
||
"""视频消息特定内容"""
|
||
aes_key: str
|
||
video_url: str
|
||
thumb_url: str
|
||
length: int
|
||
play_length: int
|
||
|
||
|
||
@dataclass
|
||
class LocationContent:
|
||
"""地理位置特定内容"""
|
||
x: float # 纬度
|
||
y: float # 经度
|
||
label: str # 地址标签
|
||
poi_name: Optional[str] = None # 地点名称
|
||
|
||
|
||
@dataclass
|
||
class WxMessage:
|
||
"""消息基础类"""
|
||
type_name: str
|
||
appid: str
|
||
wxid: str
|
||
msg_id: int
|
||
sender: str
|
||
to_user: str
|
||
roomid: str # 新增room_id属性
|
||
msg_type: MessageType
|
||
content: MessageContent
|
||
create_time: int
|
||
push_content: Optional[str]
|
||
new_msg_id: int
|
||
msg_seq: int
|
||
msg_source: str
|
||
raw_data: Dict[str, Any] # 原始JSON数据
|
||
|
||
@classmethod
|
||
def from_json(cls, json_data: Dict[str, Any]) -> 'WxMessage':
|
||
"""从JSON数据创建消息对象"""
|
||
data = json_data.get("Data", {})
|
||
to_user = data.get("ToUserName", {}).get("string", "")
|
||
|
||
# 获取原始内容和发信人
|
||
content_str = data.get("Content", {}).get("string", "")
|
||
sender = data.get("FromUserName", {}).get("string", "")
|
||
|
||
# 创建MessageContent对象时传入发信人信息
|
||
message_content = MessageContent(content_str, sender=sender)
|
||
|
||
return cls(
|
||
type_name=json_data.get("TypeName", ""),
|
||
appid=json_data.get("Appid", ""),
|
||
wxid=json_data.get("Wxid", ""),
|
||
msg_id=data.get("MsgId", 0),
|
||
sender=sender,
|
||
to_user=to_user,
|
||
roomid=to_user if to_user.endswith("@chatroom") else "",
|
||
msg_type=MessageType(data.get("MsgType", 0)),
|
||
content=message_content, # 使用包含发信人信息的MessageContent
|
||
create_time=data.get("CreateTime", 0),
|
||
push_content=data.get("PushContent"),
|
||
new_msg_id=data.get("NewMsgId", 0),
|
||
msg_seq=data.get("MsgSeq", 0),
|
||
msg_source=data.get("MsgSource", ""),
|
||
raw_data=json_data
|
||
)
|
||
|
||
def __str__(self) -> str:
|
||
"""返回消息的字符串表示,用于打印和日志"""
|
||
# 获取消息类型的名称
|
||
msg_type_name = self.msg_type.name if self.msg_type else "UNKNOWN"
|
||
|
||
# 处理不同类型的消息内容
|
||
content_str = ""
|
||
if self.msg_type == MessageType.TEXT:
|
||
# 文本消息直接显示清理后的内容
|
||
content_str = self.content.clean_content
|
||
elif self.msg_type == MessageType.IMAGE:
|
||
# 图片消息显示图片信息
|
||
img_content = self.get_image_content()
|
||
if img_content:
|
||
content_str = f"[图片] 大小: {img_content.length}字节, MD5: {img_content.md5}"
|
||
else:
|
||
content_str = "[图片]"
|
||
elif self.msg_type == MessageType.VOICE:
|
||
# 语音消息显示语音信息
|
||
voice_content = self.get_voice_content()
|
||
if voice_content:
|
||
content_str = f"[语音] 长度: {voice_content.voice_length}ms"
|
||
else:
|
||
content_str = "[语音]"
|
||
elif self.msg_type == MessageType.VIDEO:
|
||
# 视频消息显示视频信息
|
||
video_content = self.get_video_content()
|
||
if video_content:
|
||
content_str = f"[视频] 长度: {video_content.play_length}ms, 大小: {video_content.length}字节"
|
||
else:
|
||
content_str = "[视频]"
|
||
elif self.msg_type == MessageType.LOCATION:
|
||
# 位置消息显示位置信息
|
||
location_content = self.get_location_content()
|
||
if location_content:
|
||
content_str = f"[位置] {location_content.label}"
|
||
else:
|
||
content_str = "[位置]"
|
||
elif self.msg_type == MessageType.APP:
|
||
# 应用消息显示应用类型
|
||
app_type = self.get_app_message_type()
|
||
if app_type:
|
||
content_str = f"[应用消息] 类型: {app_type.name}"
|
||
else:
|
||
content_str = "[应用消息]"
|
||
elif self.msg_type == MessageType.EMOJI:
|
||
content_str = "[表情]"
|
||
elif self.msg_type == MessageType.SYSTEM:
|
||
content_str = f"[系统消息] {self.content.raw_content}"
|
||
elif self.msg_type == MessageType.SYSTEM_NOTIFY:
|
||
content_str = f"[系统通知] {self.content.raw_content}"
|
||
else:
|
||
# 其他类型消息
|
||
content_str = f"[未知类型消息] {self.content.raw_content[:30]}..."
|
||
|
||
# 限制内容长度,避免过长
|
||
if len(content_str) > 100:
|
||
content_str = content_str[:97] + "..."
|
||
|
||
# 构建基本信息
|
||
from_info = f"发送者: {self.sender}"
|
||
to_info = f"接收者: {self.to_user}"
|
||
|
||
# 如果是群消息,添加群信息
|
||
group_info = ""
|
||
if self.from_group():
|
||
group_info = f"群聊: {self.roomid}, "
|
||
|
||
# 构建完整的消息字符串
|
||
return (f"WxMessage[ID: {self.msg_id}, 类型: {msg_type_name}, "
|
||
f"{group_info}{from_info}, {to_info}, "
|
||
f"内容: {content_str}]")
|
||
|
||
def __repr__(self) -> str:
|
||
"""返回消息的详细表示,用于调试"""
|
||
return self.__str__()
|
||
|
||
def from_self(self) -> bool:
|
||
"""判断是否是自己发送的消息"""
|
||
return self.sender == self.wxid
|
||
|
||
def from_group(self) -> bool:
|
||
return self.to_user.endswith("@chatroom")
|
||
|
||
def is_at(self, wxid) -> bool:
|
||
"""是否被 @:群消息,在 @ 名单里,并且不是 @ 所有人"""
|
||
if not self.from_group():
|
||
return False # 只有群消息才能 @
|
||
|
||
if not re.findall(f"<atuserlist>[\s|\S]*({wxid})[\s|\S]*</atuserlist>", self.msg_source):
|
||
return False # 不在 @ 清单里
|
||
|
||
if re.findall(r"@(?:所有人|all|All)", self.content):
|
||
return False # 排除 @ 所有人
|
||
|
||
return True
|
||
|
||
def get_app_message_type(self) -> Optional[AppMessageType]:
|
||
"""获取应用消息类型"""
|
||
if self.msg_type != MessageType.APP or not self.content.xml_content:
|
||
return None
|
||
|
||
try:
|
||
appmsg = self.content.xml_content.find('.//appmsg')
|
||
if appmsg is not None:
|
||
type_value = int(appmsg.find('type').text)
|
||
return AppMessageType(type_value)
|
||
except (AttributeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
def get_image_content(self) -> Optional[ImageContent]:
|
||
"""获取图片消息内容"""
|
||
if self.msg_type != MessageType.IMAGE or not self.content.xml_content:
|
||
return None
|
||
|
||
try:
|
||
img = self.content.xml_content.find('img')
|
||
if img is not None:
|
||
return ImageContent(
|
||
aes_key=img.get('aeskey', ''),
|
||
url=img.get('cdnthumburl', ''),
|
||
length=int(img.get('length', 0)),
|
||
md5=img.get('md5', ''),
|
||
thumb_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer")
|
||
)
|
||
except (AttributeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
def get_voice_content(self) -> Optional[VoiceContent]:
|
||
"""获取语音消息内容"""
|
||
if self.msg_type != MessageType.VOICE or not self.content.xml_content:
|
||
return None
|
||
|
||
try:
|
||
voice = self.content.xml_content.find('.//voicemsg')
|
||
if voice is not None:
|
||
return VoiceContent(
|
||
voice_length=int(voice.get('voicelength', 0)),
|
||
aes_key=voice.get('aeskey', ''),
|
||
url=voice.get('voiceurl', ''),
|
||
voice_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer")
|
||
)
|
||
except (AttributeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
def get_video_content(self) -> Optional[VideoContent]:
|
||
"""获取视频消息内容"""
|
||
if self.msg_type != MessageType.VIDEO or not self.content.xml_content:
|
||
return None
|
||
|
||
try:
|
||
video = self.content.xml_content.find('.//videomsg')
|
||
if video is not None:
|
||
return VideoContent(
|
||
aes_key=video.get('aeskey', ''),
|
||
video_url=video.get('cdnvideourl', ''),
|
||
thumb_url=video.get('cdnthumburl', ''),
|
||
length=int(video.get('length', 0)),
|
||
play_length=int(video.get('playlength', 0))
|
||
)
|
||
except (AttributeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
def get_location_content(self) -> Optional[LocationContent]:
|
||
"""获取地理位置内容"""
|
||
if self.msg_type != MessageType.LOCATION or not self.content.xml_content:
|
||
return None
|
||
|
||
try:
|
||
location = self.content.xml_content.find('location')
|
||
if location is not None:
|
||
return LocationContent(
|
||
x=float(location.get('x', 0)),
|
||
y=float(location.get('y', 0)),
|
||
label=location.get('label', ''),
|
||
poi_name=location.get('poiname')
|
||
)
|
||
except (AttributeError, ValueError):
|
||
pass
|
||
return None
|
||
|
||
|
||
if __name__ == '__main__':
|
||
content_str = """wxid_g6vc38ifs1an22:\n1"""
|
||
content = MessageContent(content_str, sender="Jyunere")
|
||
print(content.raw_content)
|
||
print(content.xml_content)
|
||
print(content.clean_content)
|