Files
abot/utils/wechat/emoji_semantic_parser.py

283 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import re
import xml.etree.ElementTree as ET
from typing import Dict, List, Tuple
# 说明:
# 1. 微信表情消息里的语义字段并不稳定,有时是明文,有时是 base64 + protobuf
# 2. 这里把“发送参数解析”和“中文语义提取”收敛成独立工具,便于后台表情库和 AI 自动回复共用;
# 3. 模块只保留纯解析逻辑,不依赖 Flask / DB方便在任何场景下直接复用。
_EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE)
_EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE)
_EMOJI_BASE64_RE = re.compile(r"^[A-Za-z0-9+/=]+$")
_EMOJI_LOCALE_KEYS = {"zh_cn", "zh_tw", "zh_hk", "default", "en", "ja", "ko"}
_EMOJI_SEMANTIC_STOPWORDS = {
"default",
"zh_cn",
"zh_tw",
"zh_hk",
"en",
"ja",
"ko",
"opus",
"gif",
"png",
"jpg",
"jpeg",
"webp",
}
def safe_text(value) -> str:
"""安全转字符串,避免 None 参与解析。"""
return "" if value is None else str(value)
def extract_emoji_meta(attachment_url: str) -> Tuple[str, int]:
"""从表情 XML 中提取发送所需的 md5 与 total_length。"""
text = safe_text(attachment_url).strip()
if not text.startswith("<"):
return "", 0
md5 = ""
total_length = 0
try:
root = ET.fromstring(text)
emoji_node = root.find(".//emoji")
if emoji_node is None:
return "", 0
md5 = safe_text(emoji_node.attrib.get("md5", "")).strip().lower()
for key in ("totallen", "total_len", "totalLen", "len"):
value = safe_text(emoji_node.attrib.get(key, "")).strip()
if value.isdigit():
total_length = int(value)
break
except Exception:
md5_match = _EMOJI_MD5_RE.search(text)
if md5_match:
md5 = md5_match.group(1).lower()
len_match = _EMOJI_TOTALLEN_RE.search(text)
if len_match:
try:
total_length = int(len_match.group(1))
except Exception:
total_length = 0
return md5, total_length
def _read_protobuf_varint(payload: bytes, offset: int):
"""读取 protobuf varint。"""
result = 0
shift = 0
index = offset
while index < len(payload) and shift <= 63:
current = payload[index]
index += 1
result |= (current & 0x7F) << shift
if not (current & 0x80):
return result, index
shift += 7
raise ValueError("protobuf varint 读取失败")
def _extract_protobuf_strings(payload: bytes, depth: int = 0) -> List[str]:
"""递归提取 protobuf length-delimited 字段里的 UTF-8 文本。"""
if not payload:
return []
results: List[str] = []
index = 0
while index < len(payload):
try:
tag, index = _read_protobuf_varint(payload, index)
except Exception:
break
if tag <= 0:
break
wire_type = tag & 0x07
if wire_type == 0:
try:
_, index = _read_protobuf_varint(payload, index)
except Exception:
break
continue
if wire_type == 1:
index += 8
continue
if wire_type == 5:
index += 4
continue
if wire_type != 2:
break
try:
length, index = _read_protobuf_varint(payload, index)
except Exception:
break
if length < 0 or index + length > len(payload):
break
chunk = payload[index:index + length]
index += length
if not chunk:
continue
try:
decoded = chunk.decode("utf-8")
except Exception:
decoded = ""
if decoded:
results.append(decoded)
# desc 常见是语言包嵌套结构,递归两层足够覆盖大多数历史数据。
if depth < 2:
results.extend(_extract_protobuf_strings(chunk, depth + 1))
return results
def sanitize_emoji_semantic_text(value: str) -> str:
"""清洗候选语义文本,去掉控制字符和多余空白。"""
text = "".join(ch for ch in safe_text(value) if ch.isprintable()).strip()
text = re.sub(r"\s+", " ", text)
return text.strip()
def is_emoji_semantic_candidate(value: str) -> bool:
"""判断一个候选文本是否像“可读的表情语义”。"""
text = sanitize_emoji_semantic_text(value)
if not text:
return False
lowered = text.lower()
if lowered in _EMOJI_LOCALE_KEYS or lowered in _EMOJI_SEMANTIC_STOPWORDS:
return False
if any(locale_key in lowered for locale_key in _EMOJI_LOCALE_KEYS):
return False
if lowered.startswith("com.tencent.") or lowered.startswith("finder:"):
return False
if re.fullmatch(r"[0-9a-f]{16,64}", lowered):
return False
if len(text) >= 8 and _EMOJI_BASE64_RE.fullmatch(text):
return False
if len(text) > 40:
return False
return bool(re.search(r"[\u4e00-\u9fffA-Za-z]", text))
def dedupe_emoji_semantic_candidates(values: List[str]) -> List[str]:
"""按出现顺序去重候选语义文本。"""
seen = set()
results: List[str] = []
for item in values or []:
text = sanitize_emoji_semantic_text(item)
if not is_emoji_semantic_candidate(text):
continue
key = text.lower()
if key in seen:
continue
seen.add(key)
results.append(text)
return results
def _maybe_decode_base64_payload(value: str) -> bytes:
"""尽量把字段值解成 base64 原始字节,失败时返回空字节。"""
normalized = re.sub(r"\s+", "", safe_text(value))
if len(normalized) < 4 or not _EMOJI_BASE64_RE.fullmatch(normalized):
return b""
normalized += "=" * (-len(normalized) % 4)
try:
return base64.b64decode(normalized, validate=False)
except Exception:
return b""
def decode_emoji_semantic_value(value: str) -> List[str]:
"""解析单个表情语义字段,输出候选语义文本列表。"""
raw_text = safe_text(value).strip()
if not raw_text:
return []
candidates: List[str] = []
if is_emoji_semantic_candidate(raw_text):
candidates.append(raw_text)
decoded_bytes = _maybe_decode_base64_payload(raw_text)
if not decoded_bytes:
return dedupe_emoji_semantic_candidates(candidates)
protobuf_texts = _extract_protobuf_strings(decoded_bytes)
candidates.extend(protobuf_texts)
# 有些字段是“base64 后的纯文本”,不是 protobuf。
# 只有在 protobuf 路径没有抽出有效文本时,才回退整段 UTF-8 解码,避免把语言包壳子拼成脏值。
if not dedupe_emoji_semantic_candidates(candidates):
try:
decoded_text = decoded_bytes.decode("utf-8")
except Exception:
decoded_text = ""
if decoded_text:
candidates.append(decoded_text)
return dedupe_emoji_semantic_candidates(candidates)
def extract_emoji_semantic_info(attachment_url: str) -> Dict[str, object]:
"""从表情 XML 中提取“主语义 + 别名列表 + 来源字段”。"""
text = safe_text(attachment_url).strip()
if not text.startswith("<"):
return {
"semantic_text": "",
"semantic_aliases": [],
"semantic_source": "",
}
field_values = []
try:
root = ET.fromstring(text)
emoji_node = root.find(".//emoji")
if emoji_node is not None:
for field_name in ("desc", "attachedtext", "emojiattr"):
field_values.append((field_name, safe_text(emoji_node.attrib.get(field_name, "")).strip()))
except Exception:
for field_name in ("desc", "attachedtext", "emojiattr"):
match = re.search(rf'{field_name}\s*=\s*[\"\']([^\"\']+)[\"\']', text, re.IGNORECASE)
field_values.append((field_name, safe_text(match.group(1) if match else "").strip()))
aliases: List[str] = []
sources: List[str] = []
for field_name, field_value in field_values:
decoded_candidates = decode_emoji_semantic_value(field_value)
if not decoded_candidates:
continue
aliases.extend(decoded_candidates)
sources.append(field_name)
semantic_aliases = dedupe_emoji_semantic_candidates(aliases)
semantic_text = ""
if semantic_aliases:
# 优先选中文最明显的语义,便于后续直接拿来做展示和匹配。
chinese_first = [item for item in semantic_aliases if re.search(r"[\u4e00-\u9fff]", item)]
semantic_text = chinese_first[0] if chinese_first else semantic_aliases[0]
return {
"semantic_text": semantic_text,
"semantic_aliases": semantic_aliases,
"semantic_source": ",".join(sources),
}
def normalize_emoji_match_text(value: str) -> str:
"""把回复文本和表情语义统一归一化,便于做本地匹配。
说明:
1. 这里会去掉空白和大部分标点,让“就离谱”“就 离谱”“就离谱啊”更容易靠近;
2. 只做轻量归一化,不做分词和语义扩展,避免把普通文本误命中成表情;
3. 自动回复侧会继续叠加长度和匹配分阈值,控制替换激进度。
"""
text = sanitize_emoji_semantic_text(value).lower()
text = re.sub(r"[,。!?、;:,.!?\-~`'\"“”‘’()()\[\]【】<>《》/\\|_]+", "", text)
text = re.sub(r"\s+", "", text)
return text.strip()