from dataclasses import dataclass from typing import Optional, Dict, Any from enum import Enum import xml.etree.ElementTree as ET import re class MessageType(Enum): """消息类型枚举""" UNKNOWN = 0 # 未知类型 TEXT = 1 # 文本消息 IMAGE = 3 # 图片消息 VOICE = 34 # 语音消息 VERIFY_MSG = 37 # 好友确认消息 POSSIBLE_FRIEND_MSG = 40 # 好友推荐消息 SHARE_CARD = 42 # 名片消息 VIDEO = 43 # 视频消息 EMOTICON = 47 # 动画表情 LOCATION = 48 # 位置消息 APP = 49 # 应用消息(链接、音乐、小程序等) VOIP_MSG = 50 # VOIP消息 STATUS_NOTIFY = 51 # 状态通知 SYSTEM = 10000 # 系统消息 SYSTEM_NOTIFY = 10002 # 系统通知 RECALLED = 10002 # 撤回消息 EMOJI = 1090519089 # 大表情 class AppMessageType(Enum): """应用消息类型枚举""" UNKNOWN = 0 # 未知类型 TEXT = 1 # 文本 IMG = 2 # 图片 AUDIO = 3 # 音频 VIDEO = 4 # 视频 LINK = 5 # 链接消息 FILE = 6 # 文件 QUOTE = 57 # 引用 EMOJI = 8 # 表情 LOCATION = 17 # 位置 APP_MSG = 33 # APP消息 MINIPROGRAM = 36 # 小程序 TRANSFER = 2000 # 转账 RED_PACKET = 2001 # 红包 CARD_TICKET = 2002 # 卡券 REAL_TIME_LOCATION_START = 17 # 实时位置共享开始 REAL_TIME_LOCATION_STOP = 18 # 实时位置共享结束 CARD = 42 # 名片 VOICE_REMIND = 43 # 语音提醒 FILE_NOTICE = 74 # 文件通知 CHANNELS = 51 # 视频号消息 @dataclass class MessageContent: """消息内容""" raw_content: str # 原始内容 xml_content: str = "" # XML内容(如果有) clean_content: str = "" # 清理后的内容(去除发信人信息) sender: str = "" # 发信人wxid def __post_init__(self): """处理XML内容和清理发信人信息""" # 清理发信人信息 self.clean_content = self.clean_sender_info(self.raw_content) # 处理XML内容 if self.clean_content and (self.clean_content.startswith(' str: """清理内容中的发信人信息""" if not content: return "" # 如果有发信人信息,优先使用发信人信息进行清理 if self.sender: # 尝试移除发信人前缀(包括昵称和wxid两种情况) patterns = [ f"^{re.escape(self.sender)}[::]\\s*\\n", # wxid格式 f"^[^\\n]+?\\({re.escape(self.sender)}\\)[::]\\s*\\n", # 昵称(wxid)格式 f"^[^\\n]+?<{re.escape(self.sender)}>[::]\\s*\\n", # 昵称格式 ] for pattern in patterns: content = re.sub(pattern, '', content) # 通用清理规则(用于处理其他可能的格式) patterns = [ r'^wxid_[a-zA-Z0-9_]+[::]\s*\n', # wxid格式 r'^[^::\n]+\([^)]+\)[::]\s*\n', # 昵称(wxid)格式 r'^[^::\n]+<[^>]+>[::]\s*\n', # 昵称格式 r'^[^::\n]+[::]\s*\n', # 其他格式 ] for pattern in patterns: content = re.sub(pattern, '', content) return content.strip() @dataclass class ImageContent: """图片消息特定内容""" aes_key: str url: str length: int md5: str thumb_base64: Optional[str] = None @dataclass class VoiceContent: """语音消息特定内容""" voice_length: int aes_key: str url: str voice_base64: Optional[str] = None @dataclass class VideoContent: """视频消息特定内容""" aes_key: str video_url: str thumb_url: str length: int play_length: int @dataclass class LocationContent: """地理位置特定内容""" x: float # 纬度 y: float # 经度 label: str # 地址标签 poi_name: Optional[str] = None # 地点名称 @dataclass class WxMessage: """消息基础类""" type_name: str appid: str wxid: str msg_id: int sender: str to_user: str roomid: str # 群聊ID msg_type: MessageType content: MessageContent create_time: int push_content: Optional[str] new_msg_id: int msg_seq: int msg_source: str raw_data: Dict[str, Any] # 原始JSON数据 @classmethod def from_json(cls, json_data: Dict[str, Any]) -> 'WxMessage': """从JSON数据创建消息对象""" data = json_data.get("Data", {}) to_user = data.get("ToUserName", {}).get("string", "") from_user = data.get("FromUserName", {}).get("string", "") # 获取原始内容 content_str = data.get("Content", {}).get("string", "") # 判断是否是群聊消息 is_group_chat = from_user.endswith("@chatroom") # 如果是群聊消息,需要调整发送者和接收者 actual_sender = from_user if is_group_chat and content_str: # 从消息内容中提取真正的发送人 parts = content_str.split(':', 1) # 只分割第一个冒号 if len(parts) > 1: # 提取发送人ID(冒号前的部分) potential_sender = parts[0].strip() if potential_sender: # 确保发送人ID不为空 actual_sender = potential_sender # 群聊消息中,接收者是群ID to_user = from_user # 创建MessageContent对象时传入发信人信息 message_content = MessageContent(content_str, sender=actual_sender) return cls( type_name=json_data.get("TypeName", ""), appid=json_data.get("Appid", ""), wxid=json_data.get("Wxid", ""), msg_id=data.get("MsgId", 0), sender=actual_sender, # 使用提取出的实际发送人 to_user=to_user, # 群聊时,接收者为群ID roomid=from_user if is_group_chat else "", # 如果是群聊,roomid就是from_user msg_type=MessageType(data.get("MsgType", 0)), content=message_content, create_time=data.get("CreateTime", 0), push_content=data.get("PushContent"), new_msg_id=data.get("NewMsgId", 0), msg_seq=data.get("MsgSeq", 0), msg_source=data.get("MsgSource", ""), raw_data=json_data ) def __str__(self) -> str: """返回消息的字符串表示,用于打印和日志""" # 获取消息类型的名称 msg_type_name = self.msg_type.name if self.msg_type else "UNKNOWN" # 处理不同类型的消息内容 content_str = "" if self.msg_type == MessageType.TEXT: # 文本消息直接显示清理后的内容 content_str = self.content.clean_content elif self.msg_type == MessageType.IMAGE: # 图片消息显示图片信息 img_content = self.get_image_content() if img_content: content_str = f"[图片] 大小: {img_content.length}字节, MD5: {img_content.md5}" else: content_str = "[图片]" elif self.msg_type == MessageType.VOICE: # 语音消息显示语音信息 voice_content = self.get_voice_content() if voice_content: content_str = f"[语音] 长度: {voice_content.voice_length}ms" else: content_str = "[语音]" elif self.msg_type == MessageType.VIDEO: # 视频消息显示视频信息 video_content = self.get_video_content() if video_content: content_str = f"[视频] 长度: {video_content.play_length}ms, 大小: {video_content.length}字节" else: content_str = "[视频]" elif self.msg_type == MessageType.LOCATION: # 位置消息显示位置信息 location_content = self.get_location_content() if location_content: content_str = f"[位置] {location_content.label}" else: content_str = "[位置]" elif self.msg_type == MessageType.APP: # 应用消息显示应用类型 app_type = self.get_app_message_type() if app_type: content_str = f"[应用消息] 类型: {app_type.name}" else: content_str = "[应用消息]" elif self.msg_type == MessageType.EMOJI: content_str = "[表情]" elif self.msg_type == MessageType.SYSTEM: content_str = f"[系统消息] {self.content.raw_content}" elif self.msg_type == MessageType.SYSTEM_NOTIFY: content_str = f"[系统通知] {self.content.raw_content}" else: # 其他类型消息 content_str = f"[未知类型消息] {self.content.raw_content[:30]}..." # 限制内容长度,避免过长 if len(content_str) > 100: content_str = content_str[:97] + "..." # 构建基本信息 from_info = f"发送者: {self.sender}" to_info = f"接收者: {self.to_user}" # 如果是群消息,添加群信息 group_info = "" if self.from_group(): group_info = f"群聊: {self.roomid}, " # 构建完整的消息字符串 return (f"WxMessage[ID: {self.msg_id}, 类型: {msg_type_name}, " f"{group_info}{from_info}, {to_info}, " f"内容: {content_str}]") def __repr__(self) -> str: """返回消息的详细表示,用于调试""" return self.__str__() def from_self(self) -> bool: """判断是否是自己发送的消息""" return self.sender == self.wxid def from_group(self) -> bool: """判断是否是群聊消息""" return self.to_user.endswith("@chatroom") def is_at(self, wxid) -> bool: """是否被 @:群消息,在 @ 名单里,并且不是 @ 所有人""" if not self.from_group(): return False # 只有群消息才能 @ if not re.findall(f"[\s|\S]*({wxid})[\s|\S]*", self.msg_source): return False # 不在 @ 清单里 if re.findall(r"@(?:所有人|all|All)", self.content.clean_content): return False # 排除 @ 所有人 return True def get_app_message_type(self) -> Optional[AppMessageType]: """获取应用消息类型""" if self.msg_type != MessageType.APP or not self.content.xml_content: return None try: appmsg = ET.fromstring(self.content.xml_content).find('.//appmsg') if appmsg is not None: type_value = int(appmsg.find('type').text) return AppMessageType(type_value) except (AttributeError, ValueError, ET.ParseError): pass return None def get_image_content(self) -> Optional[ImageContent]: """获取图片消息内容""" if self.msg_type != MessageType.IMAGE or not self.content.xml_content: return None try: img = ET.fromstring(self.content.xml_content).find('img') if img is not None: return ImageContent( aes_key=img.get('aeskey', ''), url=img.get('cdnthumburl', ''), length=int(img.get('length', 0)), md5=img.get('md5', ''), thumb_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError, ET.ParseError): pass return None def get_voice_content(self) -> Optional[VoiceContent]: """获取语音消息内容""" if self.msg_type != MessageType.VOICE or not self.content.xml_content: return None try: voice = ET.fromstring(self.content.xml_content).find('.//voicemsg') if voice is not None: return VoiceContent( voice_length=int(voice.get('voicelength', 0)), aes_key=voice.get('aeskey', ''), url=voice.get('voiceurl', ''), voice_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError, ET.ParseError): pass return None def get_video_content(self) -> Optional[VideoContent]: """获取视频消息内容""" if self.msg_type != MessageType.VIDEO or not self.content.xml_content: return None try: video = ET.fromstring(self.content.xml_content).find('.//videomsg') if video is not None: return VideoContent( aes_key=video.get('aeskey', ''), video_url=video.get('cdnvideourl', ''), thumb_url=video.get('cdnthumburl', ''), length=int(video.get('length', 0)), play_length=int(video.get('playlength', 0)) ) except (AttributeError, ValueError, ET.ParseError): pass return None def get_location_content(self) -> Optional[LocationContent]: """获取地理位置内容""" if self.msg_type != MessageType.LOCATION or not self.content.xml_content: return None try: location = ET.fromstring(self.content.xml_content).find('location') if location is not None: return LocationContent( x=float(location.get('x', 0)), y=float(location.get('y', 0)), label=location.get('label', ''), poi_name=location.get('poiname') ) except (AttributeError, ValueError, ET.ParseError): pass return None if __name__ == '__main__': content_str = """wxid_g6vc38ifs1an22:\n1""" content = MessageContent(content_str, sender="Jyunere") print(content.raw_content) print(content.xml_content) print(content.clean_content)