import base64 import re import xml.etree.ElementTree as ET from typing import Dict, List, Tuple # 说明: # 1. 微信表情消息里的语义字段并不稳定,有时是明文,有时是 base64 + protobuf; # 2. 这里把“发送参数解析”和“中文语义提取”收敛成独立工具,便于后台表情库和 AI 自动回复共用; # 3. 模块只保留纯解析逻辑,不依赖 Flask / DB,方便在任何场景下直接复用。 _EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE) _EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE) _EMOJI_BASE64_RE = re.compile(r"^[A-Za-z0-9+/=]+$") _EMOJI_LOCALE_KEYS = {"zh_cn", "zh_tw", "zh_hk", "default", "en", "ja", "ko"} _EMOJI_SEMANTIC_STOPWORDS = { "default", "zh_cn", "zh_tw", "zh_hk", "en", "ja", "ko", "opus", "gif", "png", "jpg", "jpeg", "webp", } def safe_text(value) -> str: """安全转字符串,避免 None 参与解析。""" return "" if value is None else str(value) def extract_emoji_meta(attachment_url: str) -> Tuple[str, int]: """从表情 XML 中提取发送所需的 md5 与 total_length。""" text = safe_text(attachment_url).strip() if not text.startswith("<"): return "", 0 md5 = "" total_length = 0 try: root = ET.fromstring(text) emoji_node = root.find(".//emoji") if emoji_node is None: return "", 0 md5 = safe_text(emoji_node.attrib.get("md5", "")).strip().lower() for key in ("totallen", "total_len", "totalLen", "len"): value = safe_text(emoji_node.attrib.get(key, "")).strip() if value.isdigit(): total_length = int(value) break except Exception: md5_match = _EMOJI_MD5_RE.search(text) if md5_match: md5 = md5_match.group(1).lower() len_match = _EMOJI_TOTALLEN_RE.search(text) if len_match: try: total_length = int(len_match.group(1)) except Exception: total_length = 0 return md5, total_length def _read_protobuf_varint(payload: bytes, offset: int): """读取 protobuf varint。""" result = 0 shift = 0 index = offset while index < len(payload) and shift <= 63: current = payload[index] index += 1 result |= (current & 0x7F) << shift if not (current & 0x80): return result, index shift += 7 raise ValueError("protobuf varint 读取失败") def _extract_protobuf_strings(payload: bytes, depth: int = 0) -> List[str]: """递归提取 protobuf length-delimited 字段里的 UTF-8 文本。""" if not payload: return [] results: List[str] = [] index = 0 while index < len(payload): try: tag, index = _read_protobuf_varint(payload, index) except Exception: break if tag <= 0: break wire_type = tag & 0x07 if wire_type == 0: try: _, index = _read_protobuf_varint(payload, index) except Exception: break continue if wire_type == 1: index += 8 continue if wire_type == 5: index += 4 continue if wire_type != 2: break try: length, index = _read_protobuf_varint(payload, index) except Exception: break if length < 0 or index + length > len(payload): break chunk = payload[index:index + length] index += length if not chunk: continue try: decoded = chunk.decode("utf-8") except Exception: decoded = "" if decoded: results.append(decoded) # desc 常见是语言包嵌套结构,递归两层足够覆盖大多数历史数据。 if depth < 2: results.extend(_extract_protobuf_strings(chunk, depth + 1)) return results def sanitize_emoji_semantic_text(value: str) -> str: """清洗候选语义文本,去掉控制字符和多余空白。""" text = "".join(ch for ch in safe_text(value) if ch.isprintable()).strip() text = re.sub(r"\s+", " ", text) return text.strip() def is_emoji_semantic_candidate(value: str) -> bool: """判断一个候选文本是否像“可读的表情语义”。""" text = sanitize_emoji_semantic_text(value) if not text: return False lowered = text.lower() if lowered in _EMOJI_LOCALE_KEYS or lowered in _EMOJI_SEMANTIC_STOPWORDS: return False if any(locale_key in lowered for locale_key in _EMOJI_LOCALE_KEYS): return False if lowered.startswith("com.tencent.") or lowered.startswith("finder:"): return False if re.fullmatch(r"[0-9a-f]{16,64}", lowered): return False if len(text) >= 8 and _EMOJI_BASE64_RE.fullmatch(text): return False if len(text) > 40: return False return bool(re.search(r"[\u4e00-\u9fffA-Za-z]", text)) def dedupe_emoji_semantic_candidates(values: List[str]) -> List[str]: """按出现顺序去重候选语义文本。""" seen = set() results: List[str] = [] for item in values or []: text = sanitize_emoji_semantic_text(item) if not is_emoji_semantic_candidate(text): continue key = text.lower() if key in seen: continue seen.add(key) results.append(text) return results def _maybe_decode_base64_payload(value: str) -> bytes: """尽量把字段值解成 base64 原始字节,失败时返回空字节。""" normalized = re.sub(r"\s+", "", safe_text(value)) if len(normalized) < 4 or not _EMOJI_BASE64_RE.fullmatch(normalized): return b"" normalized += "=" * (-len(normalized) % 4) try: return base64.b64decode(normalized, validate=False) except Exception: return b"" def decode_emoji_semantic_value(value: str) -> List[str]: """解析单个表情语义字段,输出候选语义文本列表。""" raw_text = safe_text(value).strip() if not raw_text: return [] candidates: List[str] = [] if is_emoji_semantic_candidate(raw_text): candidates.append(raw_text) decoded_bytes = _maybe_decode_base64_payload(raw_text) if not decoded_bytes: return dedupe_emoji_semantic_candidates(candidates) protobuf_texts = _extract_protobuf_strings(decoded_bytes) candidates.extend(protobuf_texts) # 有些字段是“base64 后的纯文本”,不是 protobuf。 # 只有在 protobuf 路径没有抽出有效文本时,才回退整段 UTF-8 解码,避免把语言包壳子拼成脏值。 if not dedupe_emoji_semantic_candidates(candidates): try: decoded_text = decoded_bytes.decode("utf-8") except Exception: decoded_text = "" if decoded_text: candidates.append(decoded_text) return dedupe_emoji_semantic_candidates(candidates) def extract_emoji_semantic_info(attachment_url: str) -> Dict[str, object]: """从表情 XML 中提取“主语义 + 别名列表 + 来源字段”。""" text = safe_text(attachment_url).strip() if not text.startswith("<"): return { "semantic_text": "", "semantic_aliases": [], "semantic_source": "", } field_values = [] try: root = ET.fromstring(text) emoji_node = root.find(".//emoji") if emoji_node is not None: for field_name in ("desc", "attachedtext", "emojiattr"): field_values.append((field_name, safe_text(emoji_node.attrib.get(field_name, "")).strip())) except Exception: for field_name in ("desc", "attachedtext", "emojiattr"): match = re.search(rf'{field_name}\s*=\s*[\"\']([^\"\']+)[\"\']', text, re.IGNORECASE) field_values.append((field_name, safe_text(match.group(1) if match else "").strip())) aliases: List[str] = [] sources: List[str] = [] for field_name, field_value in field_values: decoded_candidates = decode_emoji_semantic_value(field_value) if not decoded_candidates: continue aliases.extend(decoded_candidates) sources.append(field_name) semantic_aliases = dedupe_emoji_semantic_candidates(aliases) semantic_text = "" if semantic_aliases: # 优先选中文最明显的语义,便于后续直接拿来做展示和匹配。 chinese_first = [item for item in semantic_aliases if re.search(r"[\u4e00-\u9fff]", item)] semantic_text = chinese_first[0] if chinese_first else semantic_aliases[0] return { "semantic_text": semantic_text, "semantic_aliases": semantic_aliases, "semantic_source": ",".join(sources), } def normalize_emoji_match_text(value: str) -> str: """把回复文本和表情语义统一归一化,便于做本地匹配。 说明: 1. 这里会去掉空白和大部分标点,让“就离谱”“就 离谱”“就离谱啊”更容易靠近; 2. 只做轻量归一化,不做分词和语义扩展,避免把普通文本误命中成表情; 3. 自动回复侧会继续叠加长度和匹配分阈值,控制替换激进度。 """ text = sanitize_emoji_semantic_text(value).lower() text = re.sub(r"[,。!?、;:,.!?\-~~`'\"“”‘’()()\[\]【】<>《》/\\|_]+", "", text) text = re.sub(r"\s+", "", text) return text.strip()