236 lines
11 KiB
Python
236 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from db.base import BaseDBOperator
|
|
from db.connection import DBConnectionManager
|
|
from utils.wechat.emoji_semantic_parser import (
|
|
dedupe_emoji_semantic_candidates,
|
|
extract_emoji_meta,
|
|
extract_emoji_semantic_info,
|
|
safe_text,
|
|
)
|
|
|
|
|
|
class EmojiAssetDB(BaseDBOperator):
|
|
"""表情资产查询。
|
|
|
|
说明:
|
|
1. 这里单独抽出查询类,避免自动回复插件为了拿表情库再去依赖后台蓝图;
|
|
2. 查询只关心消息表里的原始表情记录,不负责语义解析和匹配打分;
|
|
3. 后续无论后台页面、自动回复还是其他插件,都可以复用同一份表情资产数据源。
|
|
"""
|
|
|
|
def __init__(self, db_manager: DBConnectionManager):
|
|
super().__init__(db_manager)
|
|
self.init_tables()
|
|
|
|
def init_tables(self) -> bool:
|
|
"""初始化表情语义资产表。
|
|
|
|
说明:
|
|
1. 这张表专门保存“可发送表情”的结构化资产,避免每次都去 messages 现算语义;
|
|
2. md5 作为主键,同一个表情后续只做增量补充:先有发送参数,后补预览图,再补人工修正都方便;
|
|
3. semantic_aliases 用 JSON 字符串保存,便于后续扩展同义词和人工校准。
|
|
"""
|
|
return self.execute_update(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS t_emoji_assets (
|
|
md5 VARCHAR(64) PRIMARY KEY,
|
|
total_length INT NOT NULL DEFAULT 0,
|
|
semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本',
|
|
semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)',
|
|
semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如 desc/emojiattr',
|
|
preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径',
|
|
sample_message_id VARCHAR(32) DEFAULT '' COMMENT '示例消息ID',
|
|
sample_group_id VARCHAR(100) DEFAULT '' COMMENT '示例群ID',
|
|
sample_sender VARCHAR(100) DEFAULT '' COMMENT '示例发送者',
|
|
first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
INDEX idx_emoji_last_seen (last_seen_at),
|
|
INDEX idx_emoji_preview (preview_url)
|
|
)
|
|
"""
|
|
)
|
|
|
|
def get_recent_emoji_assets(self, limit: int = 500) -> List[Dict]:
|
|
"""获取近期表情消息记录。"""
|
|
sql = """
|
|
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
|
|
FROM messages
|
|
WHERE message_type IN ('47', '1048625', '1090519089')
|
|
AND attachment_url IS NOT NULL
|
|
AND attachment_url <> ''
|
|
ORDER BY timestamp DESC
|
|
LIMIT %s
|
|
"""
|
|
return self.execute_query(sql, (limit,)) or []
|
|
|
|
def list_emoji_assets(self, limit: int = 500, require_preview: bool = False) -> List[Dict]:
|
|
"""获取持久化后的表情资产列表。"""
|
|
sql = """
|
|
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
|
|
preview_url, sample_message_id, sample_group_id, sample_sender,
|
|
first_seen_at, last_seen_at
|
|
FROM t_emoji_assets
|
|
WHERE total_length > 0
|
|
"""
|
|
params: List[Any] = []
|
|
if require_preview:
|
|
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
|
|
sql += " ORDER BY last_seen_at DESC LIMIT %s"
|
|
params.append(int(limit))
|
|
rows = self.execute_query(sql, tuple(params)) or []
|
|
return [self._normalize_asset_row(row) for row in rows]
|
|
|
|
def count_emoji_assets(self, require_preview: bool = False) -> int:
|
|
"""统计当前持久化表情资产数量。"""
|
|
sql = "SELECT COUNT(*) AS total FROM t_emoji_assets WHERE total_length > 0"
|
|
params: List[Any] = []
|
|
if require_preview:
|
|
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
|
|
row = self.execute_query(sql, tuple(params), fetch_one=True) or {}
|
|
return int(row.get("total") or 0)
|
|
|
|
def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
|
|
"""根据 md5 获取最近一条表情记录。"""
|
|
sql = """
|
|
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
|
|
FROM messages
|
|
WHERE message_type IN ('47', '1048625', '1090519089')
|
|
AND attachment_url IS NOT NULL
|
|
AND attachment_url <> ''
|
|
AND attachment_url LIKE %s
|
|
ORDER BY timestamp DESC
|
|
LIMIT 1
|
|
"""
|
|
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
|
|
|
|
def get_persisted_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
|
|
"""按 md5 读取持久化表情资产。"""
|
|
row = self.execute_query(
|
|
"""
|
|
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
|
|
preview_url, sample_message_id, sample_group_id, sample_sender,
|
|
first_seen_at, last_seen_at
|
|
FROM t_emoji_assets
|
|
WHERE md5 = %s
|
|
LIMIT 1
|
|
""",
|
|
(safe_text(md5).strip().lower(),),
|
|
fetch_one=True,
|
|
)
|
|
return self._normalize_asset_row(row) if row else None
|
|
|
|
def sync_recent_emoji_assets(self, limit: int = 500) -> int:
|
|
"""从最近历史消息回填表情资产表。
|
|
|
|
说明:
|
|
1. 这一步既用于老数据补齐,也用于兜底线上漏网之鱼;
|
|
2. 真正的“自动增量”仍由消息归档和媒体补偿流程触发,这里主要做懒回填;
|
|
3. 返回成功同步的条数,便于上层做日志和观察。
|
|
"""
|
|
synced = 0
|
|
for row in self.get_recent_emoji_assets(limit=limit):
|
|
if self.upsert_asset_from_message_record(row):
|
|
synced += 1
|
|
return synced
|
|
|
|
def upsert_asset_from_message_record(self, row: Dict[str, Any]) -> bool:
|
|
"""把一条 messages 表记录同步到表情资产表。"""
|
|
attachment_url = safe_text((row or {}).get("attachment_url")).strip()
|
|
md5, total_length = extract_emoji_meta(attachment_url)
|
|
if not md5 or total_length <= 0:
|
|
return False
|
|
|
|
semantic_info = extract_emoji_semantic_info(attachment_url)
|
|
asset = {
|
|
"md5": md5,
|
|
"total_length": total_length,
|
|
"semantic_text": semantic_info.get("semantic_text", ""),
|
|
"semantic_aliases": semantic_info.get("semantic_aliases", []) or [],
|
|
"semantic_source": semantic_info.get("semantic_source", ""),
|
|
"preview_url": safe_text(row.get("image_path") or row.get("preview_url")).strip(),
|
|
"sample_message_id": safe_text(row.get("message_id")).strip(),
|
|
"sample_group_id": safe_text(row.get("group_id")).strip(),
|
|
"sample_sender": safe_text(row.get("sender")).strip(),
|
|
}
|
|
return self.upsert_asset(asset)
|
|
|
|
def upsert_asset(self, asset: Dict[str, Any]) -> bool:
|
|
"""写入或更新一条表情资产。"""
|
|
normalized_md5 = safe_text(asset.get("md5")).strip().lower()
|
|
total_length = int(asset.get("total_length") or 0)
|
|
if not normalized_md5 or total_length <= 0:
|
|
return False
|
|
|
|
current = self.get_persisted_emoji_asset_by_md5(normalized_md5) or {}
|
|
merged_aliases = dedupe_emoji_semantic_candidates(
|
|
list(current.get("semantic_aliases") or []) + list(asset.get("semantic_aliases") or [])
|
|
)
|
|
|
|
# 合并策略:
|
|
# 1. preview_url、semantic_text 这类字段优先保留“新值非空,否则沿用旧值”;
|
|
# 2. 别名列表总是做并集,避免后来的历史样本把早先语义覆盖掉;
|
|
# 3. sample_* 记录最近一条有用样本,方便后续人工回查原消息来源。
|
|
payload = {
|
|
"md5": normalized_md5,
|
|
"total_length": total_length if total_length > 0 else int(current.get("total_length") or 0),
|
|
"semantic_text": safe_text(asset.get("semantic_text")).strip() or safe_text(current.get("semantic_text")).strip(),
|
|
"semantic_aliases": merged_aliases,
|
|
"semantic_source": safe_text(asset.get("semantic_source")).strip() or safe_text(current.get("semantic_source")).strip(),
|
|
"preview_url": safe_text(asset.get("preview_url")).strip() or safe_text(current.get("preview_url")).strip(),
|
|
"sample_message_id": safe_text(asset.get("sample_message_id")).strip() or safe_text(current.get("sample_message_id")).strip(),
|
|
"sample_group_id": safe_text(asset.get("sample_group_id")).strip() or safe_text(current.get("sample_group_id")).strip(),
|
|
"sample_sender": safe_text(asset.get("sample_sender")).strip() or safe_text(current.get("sample_sender")).strip(),
|
|
}
|
|
|
|
return self.execute_update(
|
|
"""
|
|
INSERT INTO t_emoji_assets (
|
|
md5, total_length, semantic_text, semantic_aliases, semantic_source,
|
|
preview_url, sample_message_id, sample_group_id, sample_sender
|
|
)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
ON DUPLICATE KEY UPDATE
|
|
total_length = VALUES(total_length),
|
|
semantic_text = VALUES(semantic_text),
|
|
semantic_aliases = VALUES(semantic_aliases),
|
|
semantic_source = VALUES(semantic_source),
|
|
preview_url = VALUES(preview_url),
|
|
sample_message_id = VALUES(sample_message_id),
|
|
sample_group_id = VALUES(sample_group_id),
|
|
sample_sender = VALUES(sample_sender),
|
|
last_seen_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(
|
|
payload["md5"],
|
|
payload["total_length"],
|
|
payload["semantic_text"],
|
|
json.dumps(payload["semantic_aliases"], ensure_ascii=False),
|
|
payload["semantic_source"],
|
|
payload["preview_url"],
|
|
payload["sample_message_id"],
|
|
payload["sample_group_id"],
|
|
payload["sample_sender"],
|
|
),
|
|
)
|
|
|
|
@staticmethod
|
|
def _normalize_asset_row(row: Optional[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""把数据库行统一整理成业务侧更好消费的结构。"""
|
|
if not row:
|
|
return {}
|
|
normalized = dict(row)
|
|
aliases = normalized.get("semantic_aliases")
|
|
if isinstance(aliases, str):
|
|
try:
|
|
normalized["semantic_aliases"] = json.loads(aliases)
|
|
except Exception:
|
|
normalized["semantic_aliases"] = []
|
|
elif aliases is None:
|
|
normalized["semantic_aliases"] = []
|
|
return normalized
|