from dataclasses import dataclass from typing import Optional, Dict, Any from enum import Enum import xml.etree.ElementTree as ET import json class MessageType(Enum): """消息类型枚举""" UNKNOWN = 0 # 未知类型 TEXT = 1 # 文本消息 IMAGE = 3 # 图片消息 VOICE = 34 # 语音消息 VERIFY_MSG = 37 # 好友确认消息 POSSIBLE_FRIEND_MSG = 40 # 好友推荐消息 SHARE_CARD = 42 # 名片消息 VIDEO = 43 # 视频消息 EMOTICON = 47 # 动画表情 LOCATION = 48 # 位置消息 APP = 49 # 应用消息(链接、音乐、小程序等) VOIP_MSG = 50 # VOIP消息 STATUS_NOTIFY = 51 # 状态通知 SYSTEM = 10000 # 系统消息 SYSTEM_NOTIFY = 10002 # 系统通知 RECALLED = 10002 # 撤回消息 EMOJI = 1090519089 # 大表情 class AppMessageType(Enum): """应用消息类型枚举""" UNKNOWN = 0 # 未知类型 TEXT = 1 # 文本 IMG = 2 # 图片 AUDIO = 3 # 音频 VIDEO = 4 # 视频 LINK = 5 # 链接消息 FILE = 6 # 文件 QUOTE = 57 # 引用 EMOJI = 8 # 表情 LOCATION = 17 # 位置 APP_MSG = 33 # APP消息 MINIPROGRAM = 36 # 小程序 TRANSFER = 2000 # 转账 RED_PACKET = 2001 # 红包 CARD_TICKET = 2002 # 卡券 REAL_TIME_LOCATION_START = 17 # 实时位置共享开始 REAL_TIME_LOCATION_STOP = 18 # 实时位置共享结束 CARD = 42 # 名片 VOICE_REMIND = 43 # 语音提醒 FILE_NOTICE = 74 # 文件通知 CHANNELS = 51 # 视频号消息 @dataclass class MessageContent: """消息内容""" raw_content: str # 原始内容 xml_content: Optional[ET.Element] = None # XML内容(如果有) def __post_init__(self): """处理XML内容""" if self.raw_content.startswith(' 'WxMessage': """从JSON数据创建消息对象""" data = json_data.get("Data", {}) to_user = data.get("ToUserName", {}).get("string", "") return cls( type_name=json_data.get("TypeName", ""), appid=json_data.get("Appid", ""), wxid=json_data.get("Wxid", ""), msg_id=data.get("MsgId", 0), sender=data.get("FromUserName", {}).get("string", ""), to_user=to_user, roomid=to_user if to_user.endswith("@chatroom") else "", # 设置room_id msg_type=MessageType(data.get("MsgType", 0)), content=MessageContent(data.get("Content", {}).get("string", "")), create_time=data.get("CreateTime", 0), push_content=data.get("PushContent"), new_msg_id=data.get("NewMsgId", 0), msg_seq=data.get("MsgSeq", 0), msg_source=data.get("MsgSource", ""), raw_data=json_data ) def __str__(self) -> str: """返回消息的字符串表示,用于打印和日志""" # 获取消息类型的名称 msg_type_name = self.msg_type.name if self.msg_type else "UNKNOWN" # 处理不同类型的消息内容 content_str = "" if self.msg_type == MessageType.TEXT: # 文本消息直接显示内容 content_str = self.content.raw_content elif self.msg_type == MessageType.IMAGE: # 图片消息显示图片信息 img_content = self.get_image_content() if img_content: content_str = f"[图片] 大小: {img_content.length}字节, MD5: {img_content.md5}" else: content_str = "[图片]" elif self.msg_type == MessageType.VOICE: # 语音消息显示语音信息 voice_content = self.get_voice_content() if voice_content: content_str = f"[语音] 长度: {voice_content.voice_length}ms" else: content_str = "[语音]" elif self.msg_type == MessageType.VIDEO: # 视频消息显示视频信息 video_content = self.get_video_content() if video_content: content_str = f"[视频] 长度: {video_content.play_length}ms, 大小: {video_content.length}字节" else: content_str = "[视频]" elif self.msg_type == MessageType.LOCATION: # 位置消息显示位置信息 location_content = self.get_location_content() if location_content: content_str = f"[位置] {location_content.label}" else: content_str = "[位置]" elif self.msg_type == MessageType.APP: # 应用消息显示应用类型 app_type = self.get_app_message_type() if app_type: content_str = f"[应用消息] 类型: {app_type.name}" else: content_str = "[应用消息]" elif self.msg_type == MessageType.EMOJI: content_str = "[表情]" elif self.msg_type == MessageType.SYSTEM: content_str = f"[系统消息] {self.content.raw_content}" elif self.msg_type == MessageType.SYSTEM_NOTIFY: content_str = f"[系统通知] {self.content.raw_content}" else: # 其他类型消息 content_str = f"[未知类型消息] {self.content.raw_content[:30]}..." # 限制内容长度,避免过长 if len(content_str) > 100: content_str = content_str[:97] + "..." # 构建基本信息 from_info = f"发送者: {self.sender}" to_info = f"接收者: {self.to_user}" # 如果是群消息,添加群信息 group_info = "" if self.from_group(): group_info = f"群聊: {self.roomid}, " # 构建完整的消息字符串 return (f"WxMessage[ID: {self.msg_id}, 类型: {msg_type_name}, " f"{group_info}{from_info}, {to_info}, " f"内容: {content_str}]") def __repr__(self) -> str: """返回消息的详细表示,用于调试""" return self.__str__() def from_self(self) -> bool: """判断是否是自己发送的消息""" return self.sender == self.wxid def from_group(self) -> bool: return self.to_user.endswith("@chatroom") def is_at(self, wxid) -> bool: """是否被 @:群消息,在 @ 名单里,并且不是 @ 所有人""" if not self.from_group(): return False # 只有群消息才能 @ if not re.findall(f"[\s|\S]*({wxid})[\s|\S]*", self.msg_source): return False # 不在 @ 清单里 if re.findall(r"@(?:所有人|all|All)", self.content): return False # 排除 @ 所有人 return True def get_app_message_type(self) -> Optional[AppMessageType]: """获取应用消息类型""" if self.msg_type != MessageType.APP or not self.content.xml_content: return None try: appmsg = self.content.xml_content.find('.//appmsg') if appmsg is not None: type_value = int(appmsg.find('type').text) return AppMessageType(type_value) except (AttributeError, ValueError): pass return None def get_image_content(self) -> Optional[ImageContent]: """获取图片消息内容""" if self.msg_type != MessageType.IMAGE or not self.content.xml_content: return None try: img = self.content.xml_content.find('img') if img is not None: return ImageContent( aes_key=img.get('aeskey', ''), url=img.get('cdnthumburl', ''), length=int(img.get('length', 0)), md5=img.get('md5', ''), thumb_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError): pass return None def get_voice_content(self) -> Optional[VoiceContent]: """获取语音消息内容""" if self.msg_type != MessageType.VOICE or not self.content.xml_content: return None try: voice = self.content.xml_content.find('.//voicemsg') if voice is not None: return VoiceContent( voice_length=int(voice.get('voicelength', 0)), aes_key=voice.get('aeskey', ''), url=voice.get('voiceurl', ''), voice_base64=self.raw_data.get("Data", {}).get("ImgBuf", {}).get("buffer") ) except (AttributeError, ValueError): pass return None def get_video_content(self) -> Optional[VideoContent]: """获取视频消息内容""" if self.msg_type != MessageType.VIDEO or not self.content.xml_content: return None try: video = self.content.xml_content.find('.//videomsg') if video is not None: return VideoContent( aes_key=video.get('aeskey', ''), video_url=video.get('cdnvideourl', ''), thumb_url=video.get('cdnthumburl', ''), length=int(video.get('length', 0)), play_length=int(video.get('playlength', 0)) ) except (AttributeError, ValueError): pass return None def get_location_content(self) -> Optional[LocationContent]: """获取地理位置内容""" if self.msg_type != MessageType.LOCATION or not self.content.xml_content: return None try: location = self.content.xml_content.find('location') if location is not None: return LocationContent( x=float(location.get('x', 0)), y=float(location.get('y', 0)), label=location.get('label', ''), poi_name=location.get('poiname') ) except (AttributeError, ValueError): pass return None