持久化表情中文语义资产\n\n- 新增 t_emoji_assets 表及迁移脚本,持久化保存表情发送参数、中文语义与预览图路径\n- 在消息归档与媒体补偿流程中自动回填表情资产,实现收到表情即落语义、补图后回填预览\n- 后台表情库与自动回复优先读取持久化表情资产,仅在空表场景下小范围回补历史数据

This commit is contained in:
liuwei
2026-04-27 11:52:31 +08:00
parent 623ca505d4
commit 62e6f67836
7 changed files with 334 additions and 172 deletions

View File

@@ -1,8 +1,15 @@
# -*- coding: utf-8 -*-
from typing import Dict, List, Optional
import json
from typing import Any, Dict, List, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
from utils.wechat.emoji_semantic_parser import (
dedupe_emoji_semantic_candidates,
extract_emoji_meta,
extract_emoji_semantic_info,
safe_text,
)
class EmojiAssetDB(BaseDBOperator):
@@ -16,6 +23,37 @@ class EmojiAssetDB(BaseDBOperator):
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self.init_tables()
def init_tables(self) -> bool:
"""初始化表情语义资产表。
说明:
1. 这张表专门保存“可发送表情”的结构化资产,避免每次都去 messages 现算语义;
2. md5 作为主键,同一个表情后续只做增量补充:先有发送参数,后补预览图,再补人工修正都方便;
3. semantic_aliases 用 JSON 字符串保存,便于后续扩展同义词和人工校准。
"""
return self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_emoji_assets (
md5 VARCHAR(64) PRIMARY KEY,
total_length INT NOT NULL DEFAULT 0,
semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本',
semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)',
semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如 desc/emojiattr',
preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径',
sample_message_id VARCHAR(32) DEFAULT '' COMMENT '示例消息ID',
sample_group_id VARCHAR(100) DEFAULT '' COMMENT '示例群ID',
sample_sender VARCHAR(100) DEFAULT '' COMMENT '示例发送者',
first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_emoji_last_seen (last_seen_at),
INDEX idx_emoji_preview (preview_url)
)
"""
)
def get_recent_emoji_assets(self, limit: int = 500) -> List[Dict]:
"""获取近期表情消息记录。"""
@@ -30,6 +68,32 @@ class EmojiAssetDB(BaseDBOperator):
"""
return self.execute_query(sql, (limit,)) or []
def list_emoji_assets(self, limit: int = 500, require_preview: bool = False) -> List[Dict]:
"""获取持久化后的表情资产列表。"""
sql = """
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender,
first_seen_at, last_seen_at
FROM t_emoji_assets
WHERE total_length > 0
"""
params: List[Any] = []
if require_preview:
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
sql += " ORDER BY last_seen_at DESC LIMIT %s"
params.append(int(limit))
rows = self.execute_query(sql, tuple(params)) or []
return [self._normalize_asset_row(row) for row in rows]
def count_emoji_assets(self, require_preview: bool = False) -> int:
"""统计当前持久化表情资产数量。"""
sql = "SELECT COUNT(*) AS total FROM t_emoji_assets WHERE total_length > 0"
params: List[Any] = []
if require_preview:
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
row = self.execute_query(sql, tuple(params), fetch_one=True) or {}
return int(row.get("total") or 0)
def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
"""根据 md5 获取最近一条表情记录。"""
sql = """
@@ -43,3 +107,129 @@ class EmojiAssetDB(BaseDBOperator):
LIMIT 1
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
def get_persisted_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
"""按 md5 读取持久化表情资产。"""
row = self.execute_query(
"""
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender,
first_seen_at, last_seen_at
FROM t_emoji_assets
WHERE md5 = %s
LIMIT 1
""",
(safe_text(md5).strip().lower(),),
fetch_one=True,
)
return self._normalize_asset_row(row) if row else None
def sync_recent_emoji_assets(self, limit: int = 500) -> int:
"""从最近历史消息回填表情资产表。
说明:
1. 这一步既用于老数据补齐,也用于兜底线上漏网之鱼;
2. 真正的“自动增量”仍由消息归档和媒体补偿流程触发,这里主要做懒回填;
3. 返回成功同步的条数,便于上层做日志和观察。
"""
synced = 0
for row in self.get_recent_emoji_assets(limit=limit):
if self.upsert_asset_from_message_record(row):
synced += 1
return synced
def upsert_asset_from_message_record(self, row: Dict[str, Any]) -> bool:
"""把一条 messages 表记录同步到表情资产表。"""
attachment_url = safe_text((row or {}).get("attachment_url")).strip()
md5, total_length = extract_emoji_meta(attachment_url)
if not md5 or total_length <= 0:
return False
semantic_info = extract_emoji_semantic_info(attachment_url)
asset = {
"md5": md5,
"total_length": total_length,
"semantic_text": semantic_info.get("semantic_text", ""),
"semantic_aliases": semantic_info.get("semantic_aliases", []) or [],
"semantic_source": semantic_info.get("semantic_source", ""),
"preview_url": safe_text(row.get("image_path") or row.get("preview_url")).strip(),
"sample_message_id": safe_text(row.get("message_id")).strip(),
"sample_group_id": safe_text(row.get("group_id")).strip(),
"sample_sender": safe_text(row.get("sender")).strip(),
}
return self.upsert_asset(asset)
def upsert_asset(self, asset: Dict[str, Any]) -> bool:
"""写入或更新一条表情资产。"""
normalized_md5 = safe_text(asset.get("md5")).strip().lower()
total_length = int(asset.get("total_length") or 0)
if not normalized_md5 or total_length <= 0:
return False
current = self.get_persisted_emoji_asset_by_md5(normalized_md5) or {}
merged_aliases = dedupe_emoji_semantic_candidates(
list(current.get("semantic_aliases") or []) + list(asset.get("semantic_aliases") or [])
)
# 合并策略:
# 1. preview_url、semantic_text 这类字段优先保留“新值非空,否则沿用旧值”;
# 2. 别名列表总是做并集,避免后来的历史样本把早先语义覆盖掉;
# 3. sample_* 记录最近一条有用样本,方便后续人工回查原消息来源。
payload = {
"md5": normalized_md5,
"total_length": total_length if total_length > 0 else int(current.get("total_length") or 0),
"semantic_text": safe_text(asset.get("semantic_text")).strip() or safe_text(current.get("semantic_text")).strip(),
"semantic_aliases": merged_aliases,
"semantic_source": safe_text(asset.get("semantic_source")).strip() or safe_text(current.get("semantic_source")).strip(),
"preview_url": safe_text(asset.get("preview_url")).strip() or safe_text(current.get("preview_url")).strip(),
"sample_message_id": safe_text(asset.get("sample_message_id")).strip() or safe_text(current.get("sample_message_id")).strip(),
"sample_group_id": safe_text(asset.get("sample_group_id")).strip() or safe_text(current.get("sample_group_id")).strip(),
"sample_sender": safe_text(asset.get("sample_sender")).strip() or safe_text(current.get("sample_sender")).strip(),
}
return self.execute_update(
"""
INSERT INTO t_emoji_assets (
md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_length = VALUES(total_length),
semantic_text = VALUES(semantic_text),
semantic_aliases = VALUES(semantic_aliases),
semantic_source = VALUES(semantic_source),
preview_url = VALUES(preview_url),
sample_message_id = VALUES(sample_message_id),
sample_group_id = VALUES(sample_group_id),
sample_sender = VALUES(sample_sender),
last_seen_at = CURRENT_TIMESTAMP
""",
(
payload["md5"],
payload["total_length"],
payload["semantic_text"],
json.dumps(payload["semantic_aliases"], ensure_ascii=False),
payload["semantic_source"],
payload["preview_url"],
payload["sample_message_id"],
payload["sample_group_id"],
payload["sample_sender"],
),
)
@staticmethod
def _normalize_asset_row(row: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""把数据库行统一整理成业务侧更好消费的结构。"""
if not row:
return {}
normalized = dict(row)
aliases = normalized.get("semantic_aliases")
if isinstance(aliases, str):
try:
normalized["semantic_aliases"] = json.loads(aliases)
except Exception:
normalized["semantic_aliases"] = []
elif aliases is None:
normalized["semantic_aliases"] = []
return normalized

View File

@@ -61,6 +61,31 @@ create or replace index idx_message_type
create or replace index messages_message_id_index
on message_archive.messages (message_id);
create or replace table message_archive.t_emoji_assets
(
md5 varchar(64) not null comment '表情MD5'
primary key,
total_length int not null default 0 comment '表情总长度',
semantic_text varchar(255) null comment '主语义文本',
semantic_aliases longtext null comment '语义别名列表(JSON数组)',
semantic_source varchar(64) null comment '语义来源字段如desc/emojiattr',
preview_url varchar(255) null comment '本地预览图路径',
sample_message_id varchar(32) null comment '样例消息ID',
sample_group_id varchar(100) null comment '样例群ID',
sample_sender varchar(100) null comment '样例发送者',
first_seen_at datetime default current_timestamp() null comment '首次看到时间',
last_seen_at datetime default current_timestamp() null on update current_timestamp() comment '最近看到时间',
created_at datetime default current_timestamp() null comment '创建时间',
updated_at datetime default current_timestamp() null on update current_timestamp() comment '更新时间'
)
comment '表情语义资产表';
create or replace index idx_emoji_last_seen
on message_archive.t_emoji_assets (last_seen_at);
create or replace index idx_emoji_preview
on message_archive.t_emoji_assets (preview_url);
create or replace table message_archive.t_message_mentions
(
id bigint auto_increment

View File

@@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS message_archive.t_emoji_assets (
md5 VARCHAR(64) PRIMARY KEY COMMENT '表情MD5',
total_length INT NOT NULL DEFAULT 0 COMMENT '表情总长度',
semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本',
semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)',
semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段如desc/emojiattr',
preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径',
sample_message_id VARCHAR(32) DEFAULT '' COMMENT '样例消息ID',
sample_group_id VARCHAR(100) DEFAULT '' COMMENT '样例群ID',
sample_sender VARCHAR(100) DEFAULT '' COMMENT '样例发送者',
first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '首次看到时间',
last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近看到时间',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_emoji_last_seen (last_seen_at),
INDEX idx_emoji_preview (preview_url)
);