from dataclasses import dataclass from typing import Optional, Dict, Any from enum import Enum import xml.etree.ElementTree as ET import json class MessageType(Enum): """消息类型枚举""" TEXT = 1 # 文本消息 IMAGE = 3 # 图片消息 VOICE = 34 # 语音消息 VIDEO = 43 # 视频消息 EMOJI = 47 # emoji表情 LOCATION = 48 # 地理位置 APP = 49 # 应用消息(链接、文件、小程序等) SYSTEM = 10000 # 系统消息 SYSTEM_NOTIFY = 10002 # 系统通知 class AppMessageType(Enum): """应用消息类型枚举""" LINK = 5 # 链接消息 FILE = 6 # 文件消息 FILE_NOTICE = 74 # 文件上传通知 MINIPROGRAM = 33 # 小程序消息 QUOTE = 57 # 引用消息 TRANSFER = 2000 # 转账消息 RED_PACKET = 2001 # 红包消息 CHANNELS = 51 # 视频号消息 @dataclass class MessageContent: """消息内容""" raw_content: str # 原始内容 xml_content: Optional[ET.Element] = None # XML内容(如果有) def __post_init__(self): """处理XML内容""" if self.raw_content.startswith(' 'WxMessage': """从JSON数据创建消息对象""" data = json_data.get("Data", {}) to_user = data.get("ToUserName", {}).get("string", "") return cls( type_name=json_data.get("TypeName", ""), appid=json_data.get("Appid", ""), wxid=json_data.get("Wxid", ""), msg_id=data.get("MsgId", 0), sender=data.get("FromUserName", {}).get("string", ""), to_user=to_user, roomid=to_user if to_user.endswith("@chatroom") else "", # 设置room_id msg_type=MessageType(data.get("MsgType", 0)), content=MessageContent(data.get("Content", {}).get("string", "")), create_time=data.get("CreateTime", 0), push_content=data.get("PushContent"), new_msg_id=data.get("NewMsgId", 0), msg_seq=data.get("MsgSeq", 0), msg_source=data.get("MsgSource", ""), raw_data=json_data ) def __str__(self) -> str: """返回消息的字符串表示,用于打印和日志""" # 获取消息类型的名称 msg_type_name = self.msg_type.name if self.msg_type else "UNKNOWN" # 处理不同类型的消息内容 content_str = "" if self.msg_type == MessageType.TEXT: # 文本消息直接显示内容 content_str = self.content.raw_content elif self.msg_type == MessageType.IMAGE: # 图片消息显示图片信息 img_content = self.get_image_content() if img_content: content_str = f"[图片] 大小: {img_content.length}字节, MD5: {img_content.md5}" else: content_str = "[图片]" elif self.msg_type == MessageType.VOICE: # 语音消息显示语音信息 voice_content = self.get_voice_content() if voice_content: content_str = f"[语音] 长度: {voice_content.voice_length}ms" else: content_str = "[语音]" elif self.msg_type == MessageType.VIDEO: # 视频消息显示视频信息 video_content = self.get_video_content() if video_content: content_str = f"[视频] 长度: {video_content.play_length}ms, 大小: {video_content.length}字节" else: content_str = "[视频]" elif self.msg_type == MessageType.LOCATION: # 位置消息显示位置信息 location_content = self.get_location_content() if location_content: content_str = f"[位置] {location_content.label}" else: content_str = "[位置]" elif self.msg_type == MessageType.APP: # 应用消息显示应用类型 app_type = self.get_app_message_type() if app_type: content_str = f"[应用消息] 类型: {app_type.name}" else: content_str = "[应用消息]" elif self.msg_type == MessageType.EMOJI: content_str = "[表情]" elif self.msg_type == MessageType.SYSTEM: content_str = f"[系统消息] {self.content.raw_content}" elif self.msg_type == MessageType.SYSTEM_NOTIFY: content_str = f"[系统通知] {self.content.raw_content}" else: # 其他类型消息 content_str = f"[未知类型消息] {self.content.raw_content[:30]}..." # 限制内容长度,避免过长 if len(content_str) > 100: content_str = content_str[:97] + "..." # 构建基本信息 from_info = f"发送者: {self.sender}" to_info = f"接收者: {self.to_user}" # 如果是群消息,添加群信息 group_info = "" if self.from_group(): group_info = f"群聊: {self.roomid}, " # 构建完整的消息字符串 return (f"WxMessage[ID: {self.msg_id}, 类型: {msg_type_name}, " f"{group_info}{from_info}, {to_info}, " f"内容: {content_str}]") def __repr__(self) -> str: """返回消息的详细表示,用于调试""" return self.__str__() def from_self(self) -> bool: """判断是否是自己发送的消息""" return self.sender == self.wxid def from_group(self) -> bool: return self.to_user.endswith("@chatroom") def get_app_message_type(self) -> Optional[AppMessageType]: """获取应用消息类型""" if self.msg_type != MessageType.APP or not self.content.xml_content: return None try: appmsg = self.content.xml_content.find('.//appmsg') if appmsg is not None: type_value = int(appmsg.find('type').text) return AppMessageType(type_value) except (AttributeError, ValueError): pass return None def get_image_content(self) -> Optional[ImageContent]: """获取图片消息内容""" if self.msg_type != MessageType.IMAGE or not self.content.xml_content: return None try: img = self.content.xml_content.find('img') if img is not None: return ImageContent( aes_key=img.get('aeskey', ''), url=img.get('cdnthumburl', ''), length=int(img.get('length', 0)), md5=img.get('md5', ''), thumb_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError): pass return None def get_voice_content(self) -> Optional[VoiceContent]: """获取语音消息内容""" if self.msg_type != MessageType.VOICE or not self.content.xml_content: return None try: voice = self.content.xml_content.find('.//voicemsg') if voice is not None: return VoiceContent( voice_length=int(voice.get('voicelength', 0)), aes_key=voice.get('aeskey', ''), url=voice.get('voiceurl', ''), voice_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError): pass return None def get_video_content(self) -> Optional[VideoContent]: """获取视频消息内容""" if self.msg_type != MessageType.VIDEO or not self.content.xml_content: return None try: video = self.content.xml_content.find('.//videomsg') if video is not None: return VideoContent( aes_key=video.get('aeskey', ''), video_url=video.get('cdnvideourl', ''), thumb_url=video.get('cdnthumburl', ''), length=int(video.get('length', 0)), play_length=int(video.get('playlength', 0)) ) except (AttributeError, ValueError): pass return None def get_location_content(self) -> Optional[LocationContent]: """获取地理位置内容""" if self.msg_type != MessageType.LOCATION or not self.content.xml_content: return None try: location = self.content.xml_content.find('location') if location is not None: return LocationContent( x=float(location.get('x', 0)), y=float(location.get('y', 0)), label=location.get('label', ''), poi_name=location.get('poiname') ) except (AttributeError, ValueError): pass return None