完善表情库中文语义解析与检索展示\n\n- 解析表情 desc 和 emojiattr 字段,提取可读中文语义与别名\n- 按 md5 聚合表情历史记录,合并发送参数、预览图与语义信息\n- 后台表情库弹窗增加语义展示与按中文语义搜索能力
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import base64
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
@@ -22,6 +23,23 @@ contacts_refresh_lock = threading.Lock()
|
||||
contacts_refresh_running = False
|
||||
_EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE)
|
||||
_EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE)
|
||||
_EMOJI_BASE64_RE = re.compile(r"^[A-Za-z0-9+/=]+$")
|
||||
_EMOJI_LOCALE_KEYS = {"zh_cn", "zh_tw", "zh_hk", "default", "en", "ja", "ko"}
|
||||
_EMOJI_SEMANTIC_STOPWORDS = {
|
||||
"default",
|
||||
"zh_cn",
|
||||
"zh_tw",
|
||||
"zh_hk",
|
||||
"en",
|
||||
"ja",
|
||||
"ko",
|
||||
"opus",
|
||||
"gif",
|
||||
"png",
|
||||
"jpg",
|
||||
"jpeg",
|
||||
"webp",
|
||||
}
|
||||
|
||||
def get_or_create_loop():
|
||||
"""获取或创建共享的事件循环"""
|
||||
@@ -203,6 +221,244 @@ def _extract_emoji_meta(attachment_url: str, image_path: str):
|
||||
return md5, total_length
|
||||
|
||||
|
||||
def _read_protobuf_varint(payload: bytes, offset: int):
|
||||
"""读取 protobuf varint。
|
||||
|
||||
说明:
|
||||
1. 微信表情的 desc / emojiattr 经常是 base64 后的 protobuf 片段;
|
||||
2. 这里不依赖 schema,只做最小化的通用 varint 解析,便于递归提取字符串字段;
|
||||
3. 一旦遇到异常字节,直接抛错交给上层兜底,避免误读出脏语义。
|
||||
"""
|
||||
result = 0
|
||||
shift = 0
|
||||
index = offset
|
||||
while index < len(payload) and shift <= 63:
|
||||
current = payload[index]
|
||||
index += 1
|
||||
result |= (current & 0x7F) << shift
|
||||
if not (current & 0x80):
|
||||
return result, index
|
||||
shift += 7
|
||||
raise ValueError("protobuf varint 读取失败")
|
||||
|
||||
|
||||
def _extract_protobuf_strings(payload: bytes, depth: int = 0):
|
||||
"""递归提取 protobuf length-delimited 字段中的 UTF-8 文本。
|
||||
|
||||
说明:
|
||||
1. 这里的目标不是完整反序列化,而是从未知结构中尽量稳定地把“可读文本”捞出来;
|
||||
2. desc 常见格式是 zh_cn/default 语言包嵌套结构,递归 2 层就足够覆盖;
|
||||
3. 如果字段本身是纯文本,递归会自然停掉,不会影响最终结果。
|
||||
"""
|
||||
if not payload:
|
||||
return []
|
||||
results = []
|
||||
index = 0
|
||||
while index < len(payload):
|
||||
try:
|
||||
tag, index = _read_protobuf_varint(payload, index)
|
||||
except Exception:
|
||||
break
|
||||
if tag <= 0:
|
||||
break
|
||||
|
||||
wire_type = tag & 0x07
|
||||
if wire_type == 0:
|
||||
try:
|
||||
_, index = _read_protobuf_varint(payload, index)
|
||||
except Exception:
|
||||
break
|
||||
continue
|
||||
|
||||
if wire_type == 1:
|
||||
index += 8
|
||||
continue
|
||||
|
||||
if wire_type == 5:
|
||||
index += 4
|
||||
continue
|
||||
|
||||
if wire_type != 2:
|
||||
break
|
||||
|
||||
try:
|
||||
length, index = _read_protobuf_varint(payload, index)
|
||||
except Exception:
|
||||
break
|
||||
if length < 0 or index + length > len(payload):
|
||||
break
|
||||
|
||||
chunk = payload[index:index + length]
|
||||
index += length
|
||||
if not chunk:
|
||||
continue
|
||||
|
||||
try:
|
||||
decoded = chunk.decode("utf-8")
|
||||
except Exception:
|
||||
decoded = ""
|
||||
if decoded:
|
||||
results.append(decoded)
|
||||
|
||||
if depth < 2:
|
||||
results.extend(_extract_protobuf_strings(chunk, depth + 1))
|
||||
return results
|
||||
|
||||
|
||||
def _sanitize_emoji_semantic_text(value: str):
|
||||
"""清洗候选语义文本,去掉控制字符和多余空白。"""
|
||||
text = "".join(ch for ch in _safe_text(value) if ch.isprintable()).strip()
|
||||
text = re.sub(r"\s+", " ", text)
|
||||
return text.strip()
|
||||
|
||||
|
||||
def _is_emoji_semantic_candidate(value: str):
|
||||
"""判断一个候选文本是否像“可读的表情语义”。
|
||||
|
||||
说明:
|
||||
1. 过滤 locale key、文件扩展名、产品 ID 这类元数据;
|
||||
2. 只保留包含中文或英文字母的短文本,避免把长链接、哈希、协议字段误当语义;
|
||||
3. 单字语义也允许保留,例如“害”这类表情实际就有意义。
|
||||
"""
|
||||
text = _sanitize_emoji_semantic_text(value)
|
||||
if not text:
|
||||
return False
|
||||
lowered = text.lower()
|
||||
if lowered in _EMOJI_LOCALE_KEYS or lowered in _EMOJI_SEMANTIC_STOPWORDS:
|
||||
return False
|
||||
if any(locale_key in lowered for locale_key in _EMOJI_LOCALE_KEYS):
|
||||
return False
|
||||
if lowered.startswith("com.tencent.") or lowered.startswith("finder:"):
|
||||
return False
|
||||
if re.fullmatch(r"[0-9a-f]{16,64}", lowered):
|
||||
return False
|
||||
if len(text) >= 8 and _EMOJI_BASE64_RE.fullmatch(text):
|
||||
return False
|
||||
if len(text) > 40:
|
||||
return False
|
||||
return bool(re.search(r"[\u4e00-\u9fffA-Za-z]", text))
|
||||
|
||||
|
||||
def _dedupe_emoji_semantic_candidates(values):
|
||||
"""按出现顺序去重候选语义文本。"""
|
||||
seen = set()
|
||||
results = []
|
||||
for item in values or []:
|
||||
text = _sanitize_emoji_semantic_text(item)
|
||||
if not _is_emoji_semantic_candidate(text):
|
||||
continue
|
||||
key = text.lower()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
results.append(text)
|
||||
return results
|
||||
|
||||
|
||||
def _maybe_decode_base64_payload(value: str):
|
||||
"""尽量把字段值解成 base64 原始字节,失败时返回空字节。
|
||||
|
||||
说明:
|
||||
1. 微信的 desc / emojiattr 并不总是明文,有不少是 base64 包起来的 protobuf;
|
||||
2. 这里先做格式筛选,避免把普通中文直接当 base64 解坏;
|
||||
3. 允许缺省 padding,兼容历史数据里的非标准尾部。
|
||||
"""
|
||||
normalized = re.sub(r"\s+", "", _safe_text(value))
|
||||
if len(normalized) < 4 or not _EMOJI_BASE64_RE.fullmatch(normalized):
|
||||
return b""
|
||||
normalized += "=" * (-len(normalized) % 4)
|
||||
try:
|
||||
return base64.b64decode(normalized, validate=False)
|
||||
except Exception:
|
||||
return b""
|
||||
|
||||
|
||||
def _decode_emoji_semantic_value(value: str):
|
||||
"""解析单个表情语义字段,输出候选语义文本列表。
|
||||
|
||||
说明:
|
||||
1. 若字段本身就是明文中文,直接保留;
|
||||
2. 若字段是 base64,则先尝试整段 UTF-8,再递归提取 protobuf 内嵌字符串;
|
||||
3. 最终统一做去重和脏值过滤,避免把 locale key 一起带回前端。
|
||||
"""
|
||||
raw_text = _safe_text(value).strip()
|
||||
if not raw_text:
|
||||
return []
|
||||
|
||||
candidates = []
|
||||
if _is_emoji_semantic_candidate(raw_text):
|
||||
candidates.append(raw_text)
|
||||
|
||||
decoded_bytes = _maybe_decode_base64_payload(raw_text)
|
||||
if not decoded_bytes:
|
||||
return _dedupe_emoji_semantic_candidates(candidates)
|
||||
|
||||
protobuf_texts = _extract_protobuf_strings(decoded_bytes)
|
||||
candidates.extend(protobuf_texts)
|
||||
|
||||
# 某些 emojiattr 不是 protobuf,而是“base64 后的纯文本”。
|
||||
# 只有在 protobuf 路径没抽到结果时,才退回整段 UTF-8 解码,避免把外层语言包拼接串带进来。
|
||||
if not _dedupe_emoji_semantic_candidates(candidates):
|
||||
try:
|
||||
decoded_text = decoded_bytes.decode("utf-8")
|
||||
except Exception:
|
||||
decoded_text = ""
|
||||
if decoded_text:
|
||||
candidates.append(decoded_text)
|
||||
return _dedupe_emoji_semantic_candidates(candidates)
|
||||
|
||||
|
||||
def _extract_emoji_semantic_info(attachment_url: str):
|
||||
"""从表情 XML 中提取“可读语义”。
|
||||
|
||||
说明:
|
||||
1. 当前表情库主要只有 md5/len,不方便后续让 AI 直接利用;
|
||||
2. 这里优先解析 desc、attachedtext、emojiattr 这些潜在语义字段;
|
||||
3. 返回主语义 + 别名列表 + 来源,后续无论是后台展示还是自动回复匹配都能复用。
|
||||
"""
|
||||
text = _safe_text(attachment_url).strip()
|
||||
if not text.startswith("<"):
|
||||
return {
|
||||
"semantic_text": "",
|
||||
"semantic_aliases": [],
|
||||
"semantic_source": "",
|
||||
}
|
||||
|
||||
field_values = []
|
||||
try:
|
||||
root = ET.fromstring(text)
|
||||
emoji_node = root.find(".//emoji")
|
||||
if emoji_node is not None:
|
||||
for field_name in ("desc", "attachedtext", "emojiattr"):
|
||||
field_values.append((field_name, _safe_text(emoji_node.attrib.get(field_name, "")).strip()))
|
||||
except Exception:
|
||||
for field_name in ("desc", "attachedtext", "emojiattr"):
|
||||
match = re.search(rf'{field_name}\s*=\s*[\"\']([^\"\']+)[\"\']', text, re.IGNORECASE)
|
||||
field_values.append((field_name, _safe_text(match.group(1) if match else "").strip()))
|
||||
|
||||
aliases = []
|
||||
sources = []
|
||||
for field_name, field_value in field_values:
|
||||
decoded_candidates = _decode_emoji_semantic_value(field_value)
|
||||
if not decoded_candidates:
|
||||
continue
|
||||
sources.append(field_name)
|
||||
aliases.extend(decoded_candidates)
|
||||
|
||||
semantic_aliases = _dedupe_emoji_semantic_candidates(aliases)
|
||||
semantic_text = ""
|
||||
if semantic_aliases:
|
||||
# 优先选中文最明显的候选,尽量把“哈哈哈”“害”这类直观语义放到第一位。
|
||||
chinese_first = [item for item in semantic_aliases if re.search(r"[\u4e00-\u9fff]", item)]
|
||||
semantic_text = chinese_first[0] if chinese_first else semantic_aliases[0]
|
||||
|
||||
return {
|
||||
"semantic_text": semantic_text,
|
||||
"semantic_aliases": semantic_aliases,
|
||||
"semantic_source": ",".join(sources),
|
||||
}
|
||||
|
||||
|
||||
def _parse_positive_int(value):
|
||||
"""将任意输入尽量解析为正整数,失败时返回 0。
|
||||
|
||||
@@ -781,24 +1037,47 @@ def api_emoji_library():
|
||||
|
||||
dedup = {}
|
||||
for item in records:
|
||||
attachment_url = _safe_text(item.get("attachment_url"))
|
||||
image_path = _safe_text(item.get("image_path")).strip()
|
||||
if not image_path:
|
||||
continue
|
||||
md5, total_length = _extract_emoji_meta(_safe_text(item.get("attachment_url")), image_path)
|
||||
md5, total_length = _extract_emoji_meta(attachment_url, image_path)
|
||||
if not md5 or total_length <= 0:
|
||||
continue
|
||||
if md5 in dedup:
|
||||
continue
|
||||
dedup[md5] = {
|
||||
semantic_info = _extract_emoji_semantic_info(attachment_url)
|
||||
|
||||
# 同一个 md5 可能在多条历史里反复出现:
|
||||
# 1. 有的记录有预览图但没有语义;
|
||||
# 2. 有的记录有语义但图片还没落盘;
|
||||
# 3. 因此这里按 md5 聚合,尽量把“发送参数 + 预览图 + 中文语义”拼成一条完整资产。
|
||||
target = dedup.setdefault(md5, {
|
||||
"md5": md5,
|
||||
"total_length": total_length,
|
||||
"preview_url": image_path,
|
||||
"preview_url": "",
|
||||
"timestamp": _safe_text(item.get("timestamp")),
|
||||
"group_id": _safe_text(item.get("group_id")),
|
||||
"message_id": _safe_text(item.get("message_id")),
|
||||
}
|
||||
"semantic_text": "",
|
||||
"semantic_aliases": [],
|
||||
"semantic_source": "",
|
||||
})
|
||||
|
||||
emojis = list(dedup.values())
|
||||
if not target.get("preview_url") and image_path:
|
||||
target["preview_url"] = image_path
|
||||
if not target.get("total_length") and total_length > 0:
|
||||
target["total_length"] = total_length
|
||||
|
||||
target_aliases = target.get("semantic_aliases") or []
|
||||
merged_aliases = _dedupe_emoji_semantic_candidates(target_aliases + (semantic_info.get("semantic_aliases") or []))
|
||||
target["semantic_aliases"] = merged_aliases
|
||||
if not target.get("semantic_text") and semantic_info.get("semantic_text"):
|
||||
target["semantic_text"] = semantic_info.get("semantic_text")
|
||||
if not target.get("semantic_source") and semantic_info.get("semantic_source"):
|
||||
target["semantic_source"] = semantic_info.get("semantic_source")
|
||||
|
||||
# 只有带预览图的表情才回给前端弹窗:
|
||||
# 1. 目前弹窗主要承担“人工挑选并发送”的作用,没有缩略图会很难用;
|
||||
# 2. 语义可以从其他重复记录补过来,但最终仍要求至少有一条落盘图片;
|
||||
# 3. 后续若要纯语义离线匹配,可再单独开放无预览的内部接口。
|
||||
emojis = [item for item in dedup.values() if item.get("preview_url")]
|
||||
return jsonify({
|
||||
"success": True,
|
||||
"data": {
|
||||
|
||||
@@ -789,7 +789,7 @@
|
||||
|
||||
<el-dialog title="表情库" :visible.sync="emojiDialogVisible" width="52%">
|
||||
<div class="emoji-toolbar">
|
||||
<el-input v-model="emojiKeyword" clearable placeholder="搜索 md5..." size="small"></el-input>
|
||||
<el-input v-model="emojiKeyword" clearable placeholder="搜索 md5 / 中文语义..." size="small"></el-input>
|
||||
<el-button size="small" icon="el-icon-refresh" :loading="emojiLibraryLoading" @click="loadEmojiLibrary">刷新</el-button>
|
||||
</div>
|
||||
<div class="emoji-grid" v-loading="emojiLibraryLoading">
|
||||
@@ -797,6 +797,8 @@
|
||||
<div v-if="!filteredEmojiLibrary.length" class="emoji-empty">暂无可用表情,先在群里让媒体下载插件抓取几条表情。</div>
|
||||
<div v-for="item in filteredEmojiLibrary" :key="item.md5" class="emoji-card">
|
||||
<img class="emoji-thumb" :src="getChatMediaUrl(item.preview_url)" />
|
||||
<div v-if="item.semantic_text" class="emoji-semantic">{{ item.semantic_text }}</div>
|
||||
<div v-if="item.semantic_aliases && item.semantic_aliases.length > 1" class="emoji-aliases">{{ item.semantic_aliases.join(' / ') }}</div>
|
||||
<div class="emoji-md5">{{ item.md5 }}</div>
|
||||
<div class="emoji-actions">
|
||||
<el-button type="primary" size="mini" @click="sendEmojiItem(item)">发送</el-button>
|
||||
@@ -905,7 +907,14 @@
|
||||
filteredEmojiLibrary() {
|
||||
const keyword = (this.emojiKeyword || '').trim().toLowerCase();
|
||||
if (!keyword) return this.emojiLibrary;
|
||||
return this.emojiLibrary.filter(item => (item.md5 || '').toLowerCase().includes(keyword));
|
||||
return this.emojiLibrary.filter(item => {
|
||||
// 表情库后续要服务 AI 自动回复,因此这里除了 md5,也支持按主语义和别名检索。
|
||||
// 这样人工整理映射时,可以直接搜“哈哈/害/难道”之类语义词,不需要反复记 md5。
|
||||
const md5 = (item.md5 || '').toLowerCase();
|
||||
const semanticText = (item.semantic_text || '').toLowerCase();
|
||||
const aliases = Array.isArray(item.semantic_aliases) ? item.semantic_aliases.join(' ').toLowerCase() : '';
|
||||
return md5.includes(keyword) || semanticText.includes(keyword) || aliases.includes(keyword);
|
||||
});
|
||||
},
|
||||
previewGroupWelcomeText() {
|
||||
return this.renderWelcomeTemplate(this.groupWelcomeConfig.welcome_text_template);
|
||||
@@ -1856,6 +1865,8 @@
|
||||
}
|
||||
.preview-box p{margin:0 0 4px 0}
|
||||
.emoji-thumb { width: 72px; height: 72px; object-fit: contain; border-radius: 8px; background: rgba(148,163,184,0.08); }
|
||||
.emoji-semantic { font-size: 13px; font-weight: 600; color: #0f172a; text-align: center; min-height: 18px; }
|
||||
.emoji-aliases { font-size: 11px; color: #475569; text-align: center; line-height: 1.45; max-width: 100%; word-break: break-word; min-height: 16px; }
|
||||
.emoji-md5 { font-size: 11px; color: #64748b; word-break: break-all; text-align: center; min-height: 30px; }
|
||||
.emoji-actions { width: 100%; display: flex; justify-content: center; }
|
||||
.emoji-empty { color: #94a3b8; padding: 12px; }
|
||||
|
||||
Reference in New Issue
Block a user