Files
abot/db/emoji_asset_db.py

236 lines
11 KiB
Python

# -*- coding: utf-8 -*-
import json
from typing import Any, Dict, List, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
from utils.wechat.emoji_semantic_parser import (
dedupe_emoji_semantic_candidates,
extract_emoji_meta,
extract_emoji_semantic_info,
safe_text,
)
class EmojiAssetDB(BaseDBOperator):
"""表情资产查询。
说明:
1. 这里单独抽出查询类,避免自动回复插件为了拿表情库再去依赖后台蓝图;
2. 查询只关心消息表里的原始表情记录,不负责语义解析和匹配打分;
3. 后续无论后台页面、自动回复还是其他插件,都可以复用同一份表情资产数据源。
"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self.init_tables()
def init_tables(self) -> bool:
"""初始化表情语义资产表。
说明:
1. 这张表专门保存“可发送表情”的结构化资产,避免每次都去 messages 现算语义;
2. md5 作为主键,同一个表情后续只做增量补充:先有发送参数,后补预览图,再补人工修正都方便;
3. semantic_aliases 用 JSON 字符串保存,便于后续扩展同义词和人工校准。
"""
return self.execute_update(
"""
CREATE TABLE IF NOT EXISTS t_emoji_assets (
md5 VARCHAR(64) PRIMARY KEY,
total_length INT NOT NULL DEFAULT 0,
semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本',
semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)',
semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如 desc/emojiattr',
preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径',
sample_message_id VARCHAR(32) DEFAULT '' COMMENT '示例消息ID',
sample_group_id VARCHAR(100) DEFAULT '' COMMENT '示例群ID',
sample_sender VARCHAR(100) DEFAULT '' COMMENT '示例发送者',
first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_emoji_last_seen (last_seen_at),
INDEX idx_emoji_preview (preview_url)
)
"""
)
def get_recent_emoji_assets(self, limit: int = 500) -> List[Dict]:
"""获取近期表情消息记录。"""
sql = """
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
FROM messages
WHERE message_type IN ('47', '1048625', '1090519089')
AND attachment_url IS NOT NULL
AND attachment_url <> ''
ORDER BY timestamp DESC
LIMIT %s
"""
return self.execute_query(sql, (limit,)) or []
def list_emoji_assets(self, limit: int = 500, require_preview: bool = False) -> List[Dict]:
"""获取持久化后的表情资产列表。"""
sql = """
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender,
first_seen_at, last_seen_at
FROM t_emoji_assets
WHERE total_length > 0
"""
params: List[Any] = []
if require_preview:
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
sql += " ORDER BY last_seen_at DESC LIMIT %s"
params.append(int(limit))
rows = self.execute_query(sql, tuple(params)) or []
return [self._normalize_asset_row(row) for row in rows]
def count_emoji_assets(self, require_preview: bool = False) -> int:
"""统计当前持久化表情资产数量。"""
sql = "SELECT COUNT(*) AS total FROM t_emoji_assets WHERE total_length > 0"
params: List[Any] = []
if require_preview:
sql += " AND preview_url IS NOT NULL AND preview_url <> ''"
row = self.execute_query(sql, tuple(params), fetch_one=True) or {}
return int(row.get("total") or 0)
def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
"""根据 md5 获取最近一条表情记录。"""
sql = """
SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path
FROM messages
WHERE message_type IN ('47', '1048625', '1090519089')
AND attachment_url IS NOT NULL
AND attachment_url <> ''
AND attachment_url LIKE %s
ORDER BY timestamp DESC
LIMIT 1
"""
return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True)
def get_persisted_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]:
"""按 md5 读取持久化表情资产。"""
row = self.execute_query(
"""
SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender,
first_seen_at, last_seen_at
FROM t_emoji_assets
WHERE md5 = %s
LIMIT 1
""",
(safe_text(md5).strip().lower(),),
fetch_one=True,
)
return self._normalize_asset_row(row) if row else None
def sync_recent_emoji_assets(self, limit: int = 500) -> int:
"""从最近历史消息回填表情资产表。
说明:
1. 这一步既用于老数据补齐,也用于兜底线上漏网之鱼;
2. 真正的“自动增量”仍由消息归档和媒体补偿流程触发,这里主要做懒回填;
3. 返回成功同步的条数,便于上层做日志和观察。
"""
synced = 0
for row in self.get_recent_emoji_assets(limit=limit):
if self.upsert_asset_from_message_record(row):
synced += 1
return synced
def upsert_asset_from_message_record(self, row: Dict[str, Any]) -> bool:
"""把一条 messages 表记录同步到表情资产表。"""
attachment_url = safe_text((row or {}).get("attachment_url")).strip()
md5, total_length = extract_emoji_meta(attachment_url)
if not md5 or total_length <= 0:
return False
semantic_info = extract_emoji_semantic_info(attachment_url)
asset = {
"md5": md5,
"total_length": total_length,
"semantic_text": semantic_info.get("semantic_text", ""),
"semantic_aliases": semantic_info.get("semantic_aliases", []) or [],
"semantic_source": semantic_info.get("semantic_source", ""),
"preview_url": safe_text(row.get("image_path") or row.get("preview_url")).strip(),
"sample_message_id": safe_text(row.get("message_id")).strip(),
"sample_group_id": safe_text(row.get("group_id")).strip(),
"sample_sender": safe_text(row.get("sender")).strip(),
}
return self.upsert_asset(asset)
def upsert_asset(self, asset: Dict[str, Any]) -> bool:
"""写入或更新一条表情资产。"""
normalized_md5 = safe_text(asset.get("md5")).strip().lower()
total_length = int(asset.get("total_length") or 0)
if not normalized_md5 or total_length <= 0:
return False
current = self.get_persisted_emoji_asset_by_md5(normalized_md5) or {}
merged_aliases = dedupe_emoji_semantic_candidates(
list(current.get("semantic_aliases") or []) + list(asset.get("semantic_aliases") or [])
)
# 合并策略:
# 1. preview_url、semantic_text 这类字段优先保留“新值非空,否则沿用旧值”;
# 2. 别名列表总是做并集,避免后来的历史样本把早先语义覆盖掉;
# 3. sample_* 记录最近一条有用样本,方便后续人工回查原消息来源。
payload = {
"md5": normalized_md5,
"total_length": total_length if total_length > 0 else int(current.get("total_length") or 0),
"semantic_text": safe_text(asset.get("semantic_text")).strip() or safe_text(current.get("semantic_text")).strip(),
"semantic_aliases": merged_aliases,
"semantic_source": safe_text(asset.get("semantic_source")).strip() or safe_text(current.get("semantic_source")).strip(),
"preview_url": safe_text(asset.get("preview_url")).strip() or safe_text(current.get("preview_url")).strip(),
"sample_message_id": safe_text(asset.get("sample_message_id")).strip() or safe_text(current.get("sample_message_id")).strip(),
"sample_group_id": safe_text(asset.get("sample_group_id")).strip() or safe_text(current.get("sample_group_id")).strip(),
"sample_sender": safe_text(asset.get("sample_sender")).strip() or safe_text(current.get("sample_sender")).strip(),
}
return self.execute_update(
"""
INSERT INTO t_emoji_assets (
md5, total_length, semantic_text, semantic_aliases, semantic_source,
preview_url, sample_message_id, sample_group_id, sample_sender
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_length = VALUES(total_length),
semantic_text = VALUES(semantic_text),
semantic_aliases = VALUES(semantic_aliases),
semantic_source = VALUES(semantic_source),
preview_url = VALUES(preview_url),
sample_message_id = VALUES(sample_message_id),
sample_group_id = VALUES(sample_group_id),
sample_sender = VALUES(sample_sender),
last_seen_at = CURRENT_TIMESTAMP
""",
(
payload["md5"],
payload["total_length"],
payload["semantic_text"],
json.dumps(payload["semantic_aliases"], ensure_ascii=False),
payload["semantic_source"],
payload["preview_url"],
payload["sample_message_id"],
payload["sample_group_id"],
payload["sample_sender"],
),
)
@staticmethod
def _normalize_asset_row(row: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""把数据库行统一整理成业务侧更好消费的结构。"""
if not row:
return {}
normalized = dict(row)
aliases = normalized.get("semantic_aliases")
if isinstance(aliases, str):
try:
normalized["semantic_aliases"] = json.loads(aliases)
except Exception:
normalized["semantic_aliases"] = []
elif aliases is None:
normalized["semantic_aliases"] = []
return normalized