# -*- coding: utf-8 -*- import json from typing import Any, Dict, List, Optional from db.base import BaseDBOperator from db.connection import DBConnectionManager from utils.wechat.emoji_semantic_parser import ( dedupe_emoji_semantic_candidates, extract_emoji_meta, extract_emoji_semantic_info, safe_text, ) class EmojiAssetDB(BaseDBOperator): """表情资产查询。 说明: 1. 这里单独抽出查询类,避免自动回复插件为了拿表情库再去依赖后台蓝图; 2. 查询只关心消息表里的原始表情记录,不负责语义解析和匹配打分; 3. 后续无论后台页面、自动回复还是其他插件,都可以复用同一份表情资产数据源。 """ def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) self.init_tables() def init_tables(self) -> bool: """初始化表情语义资产表。 说明: 1. 这张表专门保存“可发送表情”的结构化资产,避免每次都去 messages 现算语义; 2. md5 作为主键,同一个表情后续只做增量补充:先有发送参数,后补预览图,再补人工修正都方便; 3. semantic_aliases 用 JSON 字符串保存,便于后续扩展同义词和人工校准。 """ return self.execute_update( """ CREATE TABLE IF NOT EXISTS t_emoji_assets ( md5 VARCHAR(64) PRIMARY KEY, total_length INT NOT NULL DEFAULT 0, semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本', semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)', semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如 desc/emojiattr', preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径', sample_message_id VARCHAR(32) DEFAULT '' COMMENT '示例消息ID', sample_group_id VARCHAR(100) DEFAULT '' COMMENT '示例群ID', sample_sender VARCHAR(100) DEFAULT '' COMMENT '示例发送者', first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_emoji_last_seen (last_seen_at), INDEX idx_emoji_preview (preview_url) ) """ ) def get_recent_emoji_assets(self, limit: int = 500) -> List[Dict]: """获取近期表情消息记录。""" sql = """ SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path FROM messages WHERE message_type IN ('47', '1048625', '1090519089') AND attachment_url IS NOT NULL AND attachment_url <> '' ORDER BY timestamp DESC LIMIT %s """ return self.execute_query(sql, (limit,)) or [] def list_emoji_assets(self, limit: int = 500, require_preview: bool = False) -> List[Dict]: """获取持久化后的表情资产列表。""" sql = """ SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source, preview_url, sample_message_id, sample_group_id, sample_sender, first_seen_at, last_seen_at FROM t_emoji_assets WHERE total_length > 0 """ params: List[Any] = [] if require_preview: sql += " AND preview_url IS NOT NULL AND preview_url <> ''" sql += " ORDER BY last_seen_at DESC LIMIT %s" params.append(int(limit)) rows = self.execute_query(sql, tuple(params)) or [] return [self._normalize_asset_row(row) for row in rows] def count_emoji_assets(self, require_preview: bool = False) -> int: """统计当前持久化表情资产数量。""" sql = "SELECT COUNT(*) AS total FROM t_emoji_assets WHERE total_length > 0" params: List[Any] = [] if require_preview: sql += " AND preview_url IS NOT NULL AND preview_url <> ''" row = self.execute_query(sql, tuple(params), fetch_one=True) or {} return int(row.get("total") or 0) def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]: """根据 md5 获取最近一条表情记录。""" sql = """ SELECT message_id, group_id, sender, timestamp, message_type, attachment_url, image_path FROM messages WHERE message_type IN ('47', '1048625', '1090519089') AND attachment_url IS NOT NULL AND attachment_url <> '' AND attachment_url LIKE %s ORDER BY timestamp DESC LIMIT 1 """ return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True) def get_persisted_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]: """按 md5 读取持久化表情资产。""" row = self.execute_query( """ SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source, preview_url, sample_message_id, sample_group_id, sample_sender, first_seen_at, last_seen_at FROM t_emoji_assets WHERE md5 = %s LIMIT 1 """, (safe_text(md5).strip().lower(),), fetch_one=True, ) return self._normalize_asset_row(row) if row else None def sync_recent_emoji_assets(self, limit: int = 500) -> int: """从最近历史消息回填表情资产表。 说明: 1. 这一步既用于老数据补齐,也用于兜底线上漏网之鱼; 2. 真正的“自动增量”仍由消息归档和媒体补偿流程触发,这里主要做懒回填; 3. 返回成功同步的条数,便于上层做日志和观察。 """ synced = 0 for row in self.get_recent_emoji_assets(limit=limit): if self.upsert_asset_from_message_record(row): synced += 1 return synced def upsert_asset_from_message_record(self, row: Dict[str, Any]) -> bool: """把一条 messages 表记录同步到表情资产表。""" attachment_url = safe_text((row or {}).get("attachment_url")).strip() md5, total_length = extract_emoji_meta(attachment_url) if not md5 or total_length <= 0: return False semantic_info = extract_emoji_semantic_info(attachment_url) asset = { "md5": md5, "total_length": total_length, "semantic_text": semantic_info.get("semantic_text", ""), "semantic_aliases": semantic_info.get("semantic_aliases", []) or [], "semantic_source": semantic_info.get("semantic_source", ""), "preview_url": safe_text(row.get("image_path") or row.get("preview_url")).strip(), "sample_message_id": safe_text(row.get("message_id")).strip(), "sample_group_id": safe_text(row.get("group_id")).strip(), "sample_sender": safe_text(row.get("sender")).strip(), } return self.upsert_asset(asset) def upsert_asset(self, asset: Dict[str, Any]) -> bool: """写入或更新一条表情资产。""" normalized_md5 = safe_text(asset.get("md5")).strip().lower() total_length = int(asset.get("total_length") or 0) if not normalized_md5 or total_length <= 0: return False current = self.get_persisted_emoji_asset_by_md5(normalized_md5) or {} merged_aliases = dedupe_emoji_semantic_candidates( list(current.get("semantic_aliases") or []) + list(asset.get("semantic_aliases") or []) ) # 合并策略: # 1. preview_url、semantic_text 这类字段优先保留“新值非空,否则沿用旧值”; # 2. 别名列表总是做并集,避免后来的历史样本把早先语义覆盖掉; # 3. sample_* 记录最近一条有用样本,方便后续人工回查原消息来源。 payload = { "md5": normalized_md5, "total_length": total_length if total_length > 0 else int(current.get("total_length") or 0), "semantic_text": safe_text(asset.get("semantic_text")).strip() or safe_text(current.get("semantic_text")).strip(), "semantic_aliases": merged_aliases, "semantic_source": safe_text(asset.get("semantic_source")).strip() or safe_text(current.get("semantic_source")).strip(), "preview_url": safe_text(asset.get("preview_url")).strip() or safe_text(current.get("preview_url")).strip(), "sample_message_id": safe_text(asset.get("sample_message_id")).strip() or safe_text(current.get("sample_message_id")).strip(), "sample_group_id": safe_text(asset.get("sample_group_id")).strip() or safe_text(current.get("sample_group_id")).strip(), "sample_sender": safe_text(asset.get("sample_sender")).strip() or safe_text(current.get("sample_sender")).strip(), } return self.execute_update( """ INSERT INTO t_emoji_assets ( md5, total_length, semantic_text, semantic_aliases, semantic_source, preview_url, sample_message_id, sample_group_id, sample_sender ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE total_length = VALUES(total_length), semantic_text = VALUES(semantic_text), semantic_aliases = VALUES(semantic_aliases), semantic_source = VALUES(semantic_source), preview_url = VALUES(preview_url), sample_message_id = VALUES(sample_message_id), sample_group_id = VALUES(sample_group_id), sample_sender = VALUES(sample_sender), last_seen_at = CURRENT_TIMESTAMP """, ( payload["md5"], payload["total_length"], payload["semantic_text"], json.dumps(payload["semantic_aliases"], ensure_ascii=False), payload["semantic_source"], payload["preview_url"], payload["sample_message_id"], payload["sample_group_id"], payload["sample_sender"], ), ) @staticmethod def _normalize_asset_row(row: Optional[Dict[str, Any]]) -> Dict[str, Any]: """把数据库行统一整理成业务侧更好消费的结构。""" if not row: return {} normalized = dict(row) aliases = normalized.get("semantic_aliases") if isinstance(aliases, str): try: normalized["semantic_aliases"] = json.loads(aliases) except Exception: normalized["semantic_aliases"] = [] elif aliases is None: normalized["semantic_aliases"] = [] return normalized