From 62e6f678366cf97d5d7ae4e9cae1dc585d2206d7 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 27 Apr 2026 11:52:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E8=A1=A8=E6=83=85?= =?UTF-8?q?=E4=B8=AD=E6=96=87=E8=AF=AD=E4=B9=89=E8=B5=84=E4=BA=A7\n\n-=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20t=5Femoji=5Fassets=20=E8=A1=A8=E5=8F=8A?= =?UTF-8?q?=E8=BF=81=E7=A7=BB=E8=84=9A=E6=9C=AC=EF=BC=8C=E6=8C=81=E4=B9=85?= =?UTF-8?q?=E5=8C=96=E4=BF=9D=E5=AD=98=E8=A1=A8=E6=83=85=E5=8F=91=E9=80=81?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E3=80=81=E4=B8=AD=E6=96=87=E8=AF=AD=E4=B9=89?= =?UTF-8?q?=E4=B8=8E=E9=A2=84=E8=A7=88=E5=9B=BE=E8=B7=AF=E5=BE=84\n-=20?= =?UTF-8?q?=E5=9C=A8=E6=B6=88=E6=81=AF=E5=BD=92=E6=A1=A3=E4=B8=8E=E5=AA=92?= =?UTF-8?q?=E4=BD=93=E8=A1=A5=E5=81=BF=E6=B5=81=E7=A8=8B=E4=B8=AD=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=9B=9E=E5=A1=AB=E8=A1=A8=E6=83=85=E8=B5=84=E4=BA=A7?= =?UTF-8?q?=EF=BC=8C=E5=AE=9E=E7=8E=B0=E6=94=B6=E5=88=B0=E8=A1=A8=E6=83=85?= =?UTF-8?q?=E5=8D=B3=E8=90=BD=E8=AF=AD=E4=B9=89=E3=80=81=E8=A1=A5=E5=9B=BE?= =?UTF-8?q?=E5=90=8E=E5=9B=9E=E5=A1=AB=E9=A2=84=E8=A7=88\n-=20=E5=90=8E?= =?UTF-8?q?=E5=8F=B0=E8=A1=A8=E6=83=85=E5=BA=93=E4=B8=8E=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E5=9B=9E=E5=A4=8D=E4=BC=98=E5=85=88=E8=AF=BB=E5=8F=96=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96=E8=A1=A8=E6=83=85=E8=B5=84=E4=BA=A7=EF=BC=8C?= =?UTF-8?q?=E4=BB=85=E5=9C=A8=E7=A9=BA=E8=A1=A8=E5=9C=BA=E6=99=AF=E4=B8=8B?= =?UTF-8?q?=E5=B0=8F=E8=8C=83=E5=9B=B4=E5=9B=9E=E8=A1=A5=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- admin/dashboard/blueprints/contacts.py | 189 ++++------------- admin/dashboard/server.py | 2 + db/emoji_asset_db.py | 192 +++++++++++++++++- db/scripts/init.sql | 25 +++ .../20260427_add_emoji_assets_table.sql | 17 ++ plugins/ai_auto_response/core/emoji_reply.py | 41 ++-- utils/wechat/message_to_db.py | 40 ++++ 7 files changed, 334 insertions(+), 172 deletions(-) create mode 100644 db/scripts/migrations/20260427_add_emoji_assets_table.sql diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index 19909ed..842b925 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -9,6 +9,11 @@ from urllib.parse import quote from flask import Blueprint, render_template, jsonify, request, current_app, redirect, send_file from .auth import login_required from loguru import logger +from utils.wechat.emoji_semantic_parser import ( + dedupe_emoji_semantic_candidates as shared_dedupe_emoji_semantic_candidates, + extract_emoji_meta as shared_extract_emoji_meta, + extract_emoji_semantic_info as shared_extract_emoji_semantic_info, +) # 创建联系人管理蓝图 contacts_bp = Blueprint('contacts', __name__, url_prefix='/contacts') @@ -189,36 +194,8 @@ def _compact_media_caption(content: str, fallback: str) -> str: def _extract_emoji_meta(attachment_url: str, image_path: str): - text = _safe_text(attachment_url) - md5 = "" - total_length = 0 - - # 只接受 XML 中的参数,不做文件名或文件大小回退,避免参数污染。 - if not text.startswith("<"): - return "", 0 - try: - root = ET.fromstring(text) - emoji_node = root.find(".//emoji") - if emoji_node is None: - return "", 0 - md5 = _safe_text(emoji_node.attrib.get("md5", "")).strip().lower() - for key in ("totallen", "total_len", "totalLen", "len"): - value = _safe_text(emoji_node.attrib.get(key, "")).strip() - if value.isdigit(): - total_length = int(value) - break - except Exception: - md5_match = _EMOJI_MD5_RE.search(text) - if md5_match: - md5 = md5_match.group(1).lower() - len_match = _EMOJI_TOTALLEN_RE.search(text) - if len_match: - try: - total_length = int(len_match.group(1)) - except Exception: - total_length = 0 - - return md5, total_length + # 这里保留原函数签名,内部切到共享解析模块,避免后台和自动回复两边出现语义漂移。 + return shared_extract_emoji_meta(attachment_url) def _read_protobuf_varint(payload: bytes, offset: int): @@ -341,18 +318,7 @@ def _is_emoji_semantic_candidate(value: str): def _dedupe_emoji_semantic_candidates(values): """按出现顺序去重候选语义文本。""" - seen = set() - results = [] - for item in values or []: - text = _sanitize_emoji_semantic_text(item) - if not _is_emoji_semantic_candidate(text): - continue - key = text.lower() - if key in seen: - continue - seen.add(key) - results.append(text) - return results + return shared_dedupe_emoji_semantic_candidates(values) def _maybe_decode_base64_payload(value: str): @@ -411,52 +377,9 @@ def _decode_emoji_semantic_value(value: str): def _extract_emoji_semantic_info(attachment_url: str): """从表情 XML 中提取“可读语义”。 - 说明: - 1. 当前表情库主要只有 md5/len,不方便后续让 AI 直接利用; - 2. 这里优先解析 desc、attachedtext、emojiattr 这些潜在语义字段; - 3. 返回主语义 + 别名列表 + 来源,后续无论是后台展示还是自动回复匹配都能复用。 + 这里统一走共享解析模块,保证后台展示、持久化回填和自动回复使用同一套语义规则。 """ - text = _safe_text(attachment_url).strip() - if not text.startswith("<"): - return { - "semantic_text": "", - "semantic_aliases": [], - "semantic_source": "", - } - - field_values = [] - try: - root = ET.fromstring(text) - emoji_node = root.find(".//emoji") - if emoji_node is not None: - for field_name in ("desc", "attachedtext", "emojiattr"): - field_values.append((field_name, _safe_text(emoji_node.attrib.get(field_name, "")).strip())) - except Exception: - for field_name in ("desc", "attachedtext", "emojiattr"): - match = re.search(rf'{field_name}\s*=\s*[\"\']([^\"\']+)[\"\']', text, re.IGNORECASE) - field_values.append((field_name, _safe_text(match.group(1) if match else "").strip())) - - aliases = [] - sources = [] - for field_name, field_value in field_values: - decoded_candidates = _decode_emoji_semantic_value(field_value) - if not decoded_candidates: - continue - sources.append(field_name) - aliases.extend(decoded_candidates) - - semantic_aliases = _dedupe_emoji_semantic_candidates(aliases) - semantic_text = "" - if semantic_aliases: - # 优先选中文最明显的候选,尽量把“哈哈哈”“害”这类直观语义放到第一位。 - chinese_first = [item for item in semantic_aliases if re.search(r"[\u4e00-\u9fff]", item)] - semantic_text = chinese_first[0] if chinese_first else semantic_aliases[0] - - return { - "semantic_text": semantic_text, - "semantic_aliases": semantic_aliases, - "semantic_source": ",".join(sources), - } + return shared_extract_emoji_semantic_info(attachment_url) def _parse_positive_int(value): @@ -484,6 +407,24 @@ def _get_emoji_asset_by_md5(message_storage, md5: str): if not message_storage or not md5: return None + # 优先读取持久化表情资产: + # 1. 这张表已经做过语义和参数收敛,命中速度更快; + # 2. 若拿不到,再回退到原始 messages 表反查,兼容老数据和初始化阶段; + # 3. 这样后台发送、表情库展示、自动回复三条链路都共享统一资产源。 + emoji_asset_db = getattr(message_storage, "emoji_asset_db", None) + if emoji_asset_db and hasattr(emoji_asset_db, "get_persisted_emoji_asset_by_md5"): + asset = emoji_asset_db.get_persisted_emoji_asset_by_md5(md5) + if asset: + return { + "attachment_url": "", + "image_path": _safe_text(asset.get("preview_url")).strip(), + "message_id": _safe_text(asset.get("sample_message_id")).strip(), + "group_id": _safe_text(asset.get("sample_group_id")).strip(), + "sender": _safe_text(asset.get("sample_sender")).strip(), + "md5": _safe_text(asset.get("md5")).strip(), + "total_length": asset.get("total_length"), + } + if hasattr(message_storage, "get_emoji_asset_by_md5"): return message_storage.get_emoji_asset_by_md5(md5) @@ -515,10 +456,13 @@ def _resolve_emoji_send_meta(message_storage, md5: str, total_length: int): if not asset: return normalized_md5, 0 - resolved_md5, resolved_total_length = _extract_emoji_meta( - _safe_text(asset.get("attachment_url")), - _safe_text(asset.get("image_path")) - ) + resolved_total_length = _parse_positive_int(asset.get("total_length")) + resolved_md5 = _safe_text(asset.get("md5")).strip().lower() + if not resolved_md5 or resolved_total_length <= 0: + resolved_md5, resolved_total_length = _extract_emoji_meta( + _safe_text(asset.get("attachment_url")), + _safe_text(asset.get("image_path")) + ) if resolved_md5 and resolved_md5 != normalized_md5: # 历史数据如果出现大小写或异常值,以前端传入的 md5 为准,避免串表情。 logger.warning(f"表情参数回填命中 md5 不一致,request_md5={normalized_md5}, record_md5={resolved_md5}") @@ -1023,61 +967,16 @@ def api_emoji_library(): try: server = current_app.dashboard_server limit = min(max(int(request.args.get("limit", 200)), 1), 500) - message_storage = getattr(server, "message_storage", None) - if not message_storage: - return jsonify({"success": False, "message": "消息存储未初始化"}), 503 + emoji_asset_db = getattr(server, "emoji_asset_db", None) + if emoji_asset_db is None: + return jsonify({"success": False, "message": "表情资产库未初始化"}), 503 - if hasattr(message_storage, "get_recent_emoji_assets"): - records = message_storage.get_recent_emoji_assets(limit=limit) - elif hasattr(message_storage, "message_db") and hasattr(message_storage.message_db, "get_recent_emoji_assets"): - records = message_storage.message_db.get_recent_emoji_assets(limit=limit) - else: - logger.error("当前 message_storage 不支持 get_recent_emoji_assets") - return jsonify({"success": False, "message": "当前消息存储版本不支持表情库"}), 500 - - dedup = {} - for item in records: - attachment_url = _safe_text(item.get("attachment_url")) - image_path = _safe_text(item.get("image_path")).strip() - md5, total_length = _extract_emoji_meta(attachment_url, image_path) - if not md5 or total_length <= 0: - continue - semantic_info = _extract_emoji_semantic_info(attachment_url) - - # 同一个 md5 可能在多条历史里反复出现: - # 1. 有的记录有预览图但没有语义; - # 2. 有的记录有语义但图片还没落盘; - # 3. 因此这里按 md5 聚合,尽量把“发送参数 + 预览图 + 中文语义”拼成一条完整资产。 - target = dedup.setdefault(md5, { - "md5": md5, - "total_length": total_length, - "preview_url": "", - "timestamp": _safe_text(item.get("timestamp")), - "group_id": _safe_text(item.get("group_id")), - "message_id": _safe_text(item.get("message_id")), - "semantic_text": "", - "semantic_aliases": [], - "semantic_source": "", - }) - - if not target.get("preview_url") and image_path: - target["preview_url"] = image_path - if not target.get("total_length") and total_length > 0: - target["total_length"] = total_length - - target_aliases = target.get("semantic_aliases") or [] - merged_aliases = _dedupe_emoji_semantic_candidates(target_aliases + (semantic_info.get("semantic_aliases") or [])) - target["semantic_aliases"] = merged_aliases - if not target.get("semantic_text") and semantic_info.get("semantic_text"): - target["semantic_text"] = semantic_info.get("semantic_text") - if not target.get("semantic_source") and semantic_info.get("semantic_source"): - target["semantic_source"] = semantic_info.get("semantic_source") - - # 只有带预览图的表情才回给前端弹窗: - # 1. 目前弹窗主要承担“人工挑选并发送”的作用,没有缩略图会很难用; - # 2. 语义可以从其他重复记录补过来,但最终仍要求至少有一条落盘图片; - # 3. 后续若要纯语义离线匹配,可再单独开放无预览的内部接口。 - emojis = [item for item in dedup.values() if item.get("preview_url")] + emojis = emoji_asset_db.list_emoji_assets(limit=limit, require_preview=True) + if not emojis: + # 只有当持久化表还是空的时候,才对最近一小批历史消息做一次兜底回填。 + # 正常运行时,新表情会在“消息归档 + 媒体补偿”阶段自动写入资产表,不需要每次接口都回扫历史。 + emoji_asset_db.sync_recent_emoji_assets(limit=min(max(limit, 50), 120)) + emojis = emoji_asset_db.list_emoji_assets(limit=limit, require_preview=True) return jsonify({ "success": True, "data": { diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index e5e53ca..d8521f8 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -12,6 +12,7 @@ from loguru import logger from db.contacts_db import ContactsDBOperator from db.admin_account_db import AdminAccountDBOperator +from db.emoji_asset_db import EmojiAssetDB from db.member_context_db import MemberContextDBOperator from db.message_storage import MessageStorageDB from db.stats_db import StatsDBOperator @@ -46,6 +47,7 @@ class DashboardServer: self.db_manager = robot_instance.db_manager self.stats_db = StatsDBOperator(self.db_manager) self.message_storage = MessageStorageDB(self.db_manager) + self.emoji_asset_db = getattr(robot_instance.message_storage, "emoji_asset_db", None) or EmojiAssetDB(self.db_manager) self.contact_db: ContactsDBOperator = ContactsDBOperator(self.db_manager) self.member_context_db = MemberContextDBOperator(self.db_manager) self.task_db: TaskDBOperator = TaskDBOperator(self.db_manager) diff --git a/db/emoji_asset_db.py b/db/emoji_asset_db.py index d58dbf9..97e1fea 100644 --- a/db/emoji_asset_db.py +++ b/db/emoji_asset_db.py @@ -1,8 +1,15 @@ # -*- coding: utf-8 -*- -from typing import Dict, List, Optional +import json +from typing import Any, Dict, List, Optional from db.base import BaseDBOperator from db.connection import DBConnectionManager +from utils.wechat.emoji_semantic_parser import ( + dedupe_emoji_semantic_candidates, + extract_emoji_meta, + extract_emoji_semantic_info, + safe_text, +) class EmojiAssetDB(BaseDBOperator): @@ -16,6 +23,37 @@ class EmojiAssetDB(BaseDBOperator): def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) + self.init_tables() + + def init_tables(self) -> bool: + """初始化表情语义资产表。 + + 说明: + 1. 这张表专门保存“可发送表情”的结构化资产,避免每次都去 messages 现算语义; + 2. md5 作为主键,同一个表情后续只做增量补充:先有发送参数,后补预览图,再补人工修正都方便; + 3. semantic_aliases 用 JSON 字符串保存,便于后续扩展同义词和人工校准。 + """ + return self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_emoji_assets ( + md5 VARCHAR(64) PRIMARY KEY, + total_length INT NOT NULL DEFAULT 0, + semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本', + semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)', + semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如 desc/emojiattr', + preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径', + sample_message_id VARCHAR(32) DEFAULT '' COMMENT '示例消息ID', + sample_group_id VARCHAR(100) DEFAULT '' COMMENT '示例群ID', + sample_sender VARCHAR(100) DEFAULT '' COMMENT '示例发送者', + first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP, + last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_emoji_last_seen (last_seen_at), + INDEX idx_emoji_preview (preview_url) + ) + """ + ) def get_recent_emoji_assets(self, limit: int = 500) -> List[Dict]: """获取近期表情消息记录。""" @@ -30,6 +68,32 @@ class EmojiAssetDB(BaseDBOperator): """ return self.execute_query(sql, (limit,)) or [] + def list_emoji_assets(self, limit: int = 500, require_preview: bool = False) -> List[Dict]: + """获取持久化后的表情资产列表。""" + sql = """ + SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source, + preview_url, sample_message_id, sample_group_id, sample_sender, + first_seen_at, last_seen_at + FROM t_emoji_assets + WHERE total_length > 0 + """ + params: List[Any] = [] + if require_preview: + sql += " AND preview_url IS NOT NULL AND preview_url <> ''" + sql += " ORDER BY last_seen_at DESC LIMIT %s" + params.append(int(limit)) + rows = self.execute_query(sql, tuple(params)) or [] + return [self._normalize_asset_row(row) for row in rows] + + def count_emoji_assets(self, require_preview: bool = False) -> int: + """统计当前持久化表情资产数量。""" + sql = "SELECT COUNT(*) AS total FROM t_emoji_assets WHERE total_length > 0" + params: List[Any] = [] + if require_preview: + sql += " AND preview_url IS NOT NULL AND preview_url <> ''" + row = self.execute_query(sql, tuple(params), fetch_one=True) or {} + return int(row.get("total") or 0) + def get_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]: """根据 md5 获取最近一条表情记录。""" sql = """ @@ -43,3 +107,129 @@ class EmojiAssetDB(BaseDBOperator): LIMIT 1 """ return self.execute_query(sql, (f'%md5="{md5}"%',), fetch_one=True) + + def get_persisted_emoji_asset_by_md5(self, md5: str) -> Optional[Dict]: + """按 md5 读取持久化表情资产。""" + row = self.execute_query( + """ + SELECT md5, total_length, semantic_text, semantic_aliases, semantic_source, + preview_url, sample_message_id, sample_group_id, sample_sender, + first_seen_at, last_seen_at + FROM t_emoji_assets + WHERE md5 = %s + LIMIT 1 + """, + (safe_text(md5).strip().lower(),), + fetch_one=True, + ) + return self._normalize_asset_row(row) if row else None + + def sync_recent_emoji_assets(self, limit: int = 500) -> int: + """从最近历史消息回填表情资产表。 + + 说明: + 1. 这一步既用于老数据补齐,也用于兜底线上漏网之鱼; + 2. 真正的“自动增量”仍由消息归档和媒体补偿流程触发,这里主要做懒回填; + 3. 返回成功同步的条数,便于上层做日志和观察。 + """ + synced = 0 + for row in self.get_recent_emoji_assets(limit=limit): + if self.upsert_asset_from_message_record(row): + synced += 1 + return synced + + def upsert_asset_from_message_record(self, row: Dict[str, Any]) -> bool: + """把一条 messages 表记录同步到表情资产表。""" + attachment_url = safe_text((row or {}).get("attachment_url")).strip() + md5, total_length = extract_emoji_meta(attachment_url) + if not md5 or total_length <= 0: + return False + + semantic_info = extract_emoji_semantic_info(attachment_url) + asset = { + "md5": md5, + "total_length": total_length, + "semantic_text": semantic_info.get("semantic_text", ""), + "semantic_aliases": semantic_info.get("semantic_aliases", []) or [], + "semantic_source": semantic_info.get("semantic_source", ""), + "preview_url": safe_text(row.get("image_path") or row.get("preview_url")).strip(), + "sample_message_id": safe_text(row.get("message_id")).strip(), + "sample_group_id": safe_text(row.get("group_id")).strip(), + "sample_sender": safe_text(row.get("sender")).strip(), + } + return self.upsert_asset(asset) + + def upsert_asset(self, asset: Dict[str, Any]) -> bool: + """写入或更新一条表情资产。""" + normalized_md5 = safe_text(asset.get("md5")).strip().lower() + total_length = int(asset.get("total_length") or 0) + if not normalized_md5 or total_length <= 0: + return False + + current = self.get_persisted_emoji_asset_by_md5(normalized_md5) or {} + merged_aliases = dedupe_emoji_semantic_candidates( + list(current.get("semantic_aliases") or []) + list(asset.get("semantic_aliases") or []) + ) + + # 合并策略: + # 1. preview_url、semantic_text 这类字段优先保留“新值非空,否则沿用旧值”; + # 2. 别名列表总是做并集,避免后来的历史样本把早先语义覆盖掉; + # 3. sample_* 记录最近一条有用样本,方便后续人工回查原消息来源。 + payload = { + "md5": normalized_md5, + "total_length": total_length if total_length > 0 else int(current.get("total_length") or 0), + "semantic_text": safe_text(asset.get("semantic_text")).strip() or safe_text(current.get("semantic_text")).strip(), + "semantic_aliases": merged_aliases, + "semantic_source": safe_text(asset.get("semantic_source")).strip() or safe_text(current.get("semantic_source")).strip(), + "preview_url": safe_text(asset.get("preview_url")).strip() or safe_text(current.get("preview_url")).strip(), + "sample_message_id": safe_text(asset.get("sample_message_id")).strip() or safe_text(current.get("sample_message_id")).strip(), + "sample_group_id": safe_text(asset.get("sample_group_id")).strip() or safe_text(current.get("sample_group_id")).strip(), + "sample_sender": safe_text(asset.get("sample_sender")).strip() or safe_text(current.get("sample_sender")).strip(), + } + + return self.execute_update( + """ + INSERT INTO t_emoji_assets ( + md5, total_length, semantic_text, semantic_aliases, semantic_source, + preview_url, sample_message_id, sample_group_id, sample_sender + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + total_length = VALUES(total_length), + semantic_text = VALUES(semantic_text), + semantic_aliases = VALUES(semantic_aliases), + semantic_source = VALUES(semantic_source), + preview_url = VALUES(preview_url), + sample_message_id = VALUES(sample_message_id), + sample_group_id = VALUES(sample_group_id), + sample_sender = VALUES(sample_sender), + last_seen_at = CURRENT_TIMESTAMP + """, + ( + payload["md5"], + payload["total_length"], + payload["semantic_text"], + json.dumps(payload["semantic_aliases"], ensure_ascii=False), + payload["semantic_source"], + payload["preview_url"], + payload["sample_message_id"], + payload["sample_group_id"], + payload["sample_sender"], + ), + ) + + @staticmethod + def _normalize_asset_row(row: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """把数据库行统一整理成业务侧更好消费的结构。""" + if not row: + return {} + normalized = dict(row) + aliases = normalized.get("semantic_aliases") + if isinstance(aliases, str): + try: + normalized["semantic_aliases"] = json.loads(aliases) + except Exception: + normalized["semantic_aliases"] = [] + elif aliases is None: + normalized["semantic_aliases"] = [] + return normalized diff --git a/db/scripts/init.sql b/db/scripts/init.sql index b803c94..5db917a 100644 --- a/db/scripts/init.sql +++ b/db/scripts/init.sql @@ -61,6 +61,31 @@ create or replace index idx_message_type create or replace index messages_message_id_index on message_archive.messages (message_id); +create or replace table message_archive.t_emoji_assets +( + md5 varchar(64) not null comment '表情MD5' + primary key, + total_length int not null default 0 comment '表情总长度', + semantic_text varchar(255) null comment '主语义文本', + semantic_aliases longtext null comment '语义别名列表(JSON数组)', + semantic_source varchar(64) null comment '语义来源字段,如desc/emojiattr', + preview_url varchar(255) null comment '本地预览图路径', + sample_message_id varchar(32) null comment '样例消息ID', + sample_group_id varchar(100) null comment '样例群ID', + sample_sender varchar(100) null comment '样例发送者', + first_seen_at datetime default current_timestamp() null comment '首次看到时间', + last_seen_at datetime default current_timestamp() null on update current_timestamp() comment '最近看到时间', + created_at datetime default current_timestamp() null comment '创建时间', + updated_at datetime default current_timestamp() null on update current_timestamp() comment '更新时间' +) + comment '表情语义资产表'; + +create or replace index idx_emoji_last_seen + on message_archive.t_emoji_assets (last_seen_at); + +create or replace index idx_emoji_preview + on message_archive.t_emoji_assets (preview_url); + create or replace table message_archive.t_message_mentions ( id bigint auto_increment diff --git a/db/scripts/migrations/20260427_add_emoji_assets_table.sql b/db/scripts/migrations/20260427_add_emoji_assets_table.sql new file mode 100644 index 0000000..21e8bec --- /dev/null +++ b/db/scripts/migrations/20260427_add_emoji_assets_table.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS message_archive.t_emoji_assets ( + md5 VARCHAR(64) PRIMARY KEY COMMENT '表情MD5', + total_length INT NOT NULL DEFAULT 0 COMMENT '表情总长度', + semantic_text VARCHAR(255) DEFAULT '' COMMENT '主语义文本', + semantic_aliases LONGTEXT NULL COMMENT '语义别名列表(JSON数组)', + semantic_source VARCHAR(64) DEFAULT '' COMMENT '语义来源字段,如desc/emojiattr', + preview_url VARCHAR(255) DEFAULT '' COMMENT '本地预览图路径', + sample_message_id VARCHAR(32) DEFAULT '' COMMENT '样例消息ID', + sample_group_id VARCHAR(100) DEFAULT '' COMMENT '样例群ID', + sample_sender VARCHAR(100) DEFAULT '' COMMENT '样例发送者', + first_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '首次看到时间', + last_seen_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最近看到时间', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + INDEX idx_emoji_last_seen (last_seen_at), + INDEX idx_emoji_preview (preview_url) +); diff --git a/plugins/ai_auto_response/core/emoji_reply.py b/plugins/ai_auto_response/core/emoji_reply.py index 005a4aa..5e9b9f6 100644 --- a/plugins/ai_auto_response/core/emoji_reply.py +++ b/plugins/ai_auto_response/core/emoji_reply.py @@ -6,8 +6,6 @@ from typing import Any, Dict, List, Optional from db.emoji_asset_db import EmojiAssetDB from utils.wechat.emoji_semantic_parser import ( dedupe_emoji_semantic_candidates, - extract_emoji_meta, - extract_emoji_semantic_info, normalize_emoji_match_text, safe_text, ) @@ -88,38 +86,29 @@ class EmojiReplySelector: if self._cache_assets and now < self._cache_expires_at: return self._cache_assets - rows = self.asset_db.get_recent_emoji_assets(limit=self.asset_limit) - assets: Dict[str, Dict[str, Any]] = {} - for row in rows: - attachment_url = safe_text(row.get("attachment_url")) - md5, total_length = extract_emoji_meta(attachment_url) - if not md5 or total_length <= 0: - continue - - semantic_info = extract_emoji_semantic_info(attachment_url) + assets = self.asset_db.list_emoji_assets(limit=self.asset_limit, require_preview=False) + if not assets: + # 只有持久化资产表还是空时,才回补一小批最近历史; + # 正常情况下,归档和媒体补偿流程会自动维护这张表,自动回复不该在每次加载时去扫原始消息。 + self.asset_db.sync_recent_emoji_assets(limit=min(self.asset_limit, 120)) + assets = self.asset_db.list_emoji_assets(limit=self.asset_limit, require_preview=False) + normalized_assets: List[Dict[str, Any]] = [] + for asset in assets: semantic_aliases = [ alias - for alias in (semantic_info.get("semantic_aliases") or []) + for alias in (asset.get("semantic_aliases") or []) if len(alias) <= self.max_alias_chars ] if not semantic_aliases: continue - - target = assets.setdefault(md5, { - "md5": md5, - "total_length": total_length, - "semantic_text": "", - "semantic_aliases": [], + normalized_assets.append({ + "md5": asset.get("md5", ""), + "total_length": int(asset.get("total_length") or 0), + "semantic_text": asset.get("semantic_text", ""), + "semantic_aliases": dedupe_emoji_semantic_candidates(semantic_aliases), }) - if not target.get("total_length") and total_length > 0: - target["total_length"] = total_length - if not target.get("semantic_text") and semantic_info.get("semantic_text"): - target["semantic_text"] = semantic_info.get("semantic_text") - target["semantic_aliases"] = dedupe_emoji_semantic_candidates( - list(target.get("semantic_aliases") or []) + semantic_aliases - ) - self._cache_assets = [asset for asset in assets.values() if asset.get("semantic_aliases")] + self._cache_assets = [asset for asset in normalized_assets if asset.get("semantic_aliases")] self._cache_expires_at = now + self.cache_ttl_sec return self._cache_assets diff --git a/utils/wechat/message_to_db.py b/utils/wechat/message_to_db.py index 9f8029d..7063827 100644 --- a/utils/wechat/message_to_db.py +++ b/utils/wechat/message_to_db.py @@ -15,12 +15,14 @@ from typing import Dict, Optional from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator +from db.emoji_asset_db import EmojiAssetDB from db.levels_db import LevelsDBOperator from db.message_storage import MessageStorageDB # 导入积分系统 from db.points_db import PointsDBOperator, PointSource from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus from utils.wechat.contact_manager import ContactManager +from utils.wechat.emoji_semantic_parser import safe_text from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import WxMessage, MessageType @@ -36,6 +38,7 @@ class MessageStorage: self.db_manager = DBConnectionManager.get_instance() self.message_db = MessageStorageDB(self.db_manager) self.contacts_db = ContactsDBOperator(self.db_manager) + self.emoji_asset_db = EmojiAssetDB(self.db_manager) self.points_db = PointsDBOperator(self.db_manager) # 初始化本地缓存字典,使用 group_id 作为键 @@ -180,6 +183,8 @@ class MessageStorage: # 使用 MessageStorageDB 类存档消息 result = self.message_db.archive_message(msg) + if result and msg.msg_type in {MessageType.EMOTICON, MessageType.EMOJI}: + self._sync_emoji_asset_from_wx_message(msg) return { 'success': result, 'roomid': msg.roomid, @@ -198,6 +203,25 @@ class MessageStorage: 'error': str(e) } + def _sync_emoji_asset_from_wx_message(self, msg: WxMessage): + """将表情消息同步到持久化表情资产表。 + + 说明: + 1. 群里一旦收到表情,先把 md5/len/语义落表,不必等预览图下载完成; + 2. 这样自动回复和后台检索都能尽早看到结构化资产; + 3. 后续定时媒体任务补到 image_path 后,会再次 upsert 同一 md5,把 preview_url 回填完整。 + """ + attachment_url = safe_text(getattr(msg.content, "xml_content", "") or getattr(msg.content, "clean_content", "")).strip() + if not attachment_url.startswith("<"): + return + self.emoji_asset_db.upsert_asset_from_message_record({ + "attachment_url": attachment_url, + "image_path": "", + "message_id": safe_text(msg.msg_id), + "group_id": safe_text(msg.roomid), + "sender": safe_text(msg.sender), + }) + def process_image(self, msg: WxMessage): """图片消息已通过 archive_message 存入数据库,不再实时处理 改为定时任务批量处理,减少对主流程的影响和数据库锁竞争 @@ -243,6 +267,14 @@ class MessageStorage: if existing and existing.get("image_path"): linked_path = existing.get("image_path") success = self.message_db.update_message_image_file_path(message_id, linked_path) + if success and message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}: + self.emoji_asset_db.upsert_asset_from_message_record({ + "attachment_url": xml_content, + "image_path": linked_path, + "message_id": safe_text(message_id), + "group_id": safe_text(group_id), + "sender": safe_text(sender), + }) return { "success": bool(success), "message_id": message_id, @@ -283,6 +315,14 @@ class MessageStorage: success = self.message_db.update_message_image_file_path(message_id, web_path) if success: + if message_type in {str(MessageType.EMOTICON.value), str(MessageType.EMOJI.value)}: + self.emoji_asset_db.upsert_asset_from_message_record({ + "attachment_url": xml_content, + "image_path": web_path, + "message_id": safe_text(message_id), + "group_id": safe_text(group_id), + "sender": safe_text(sender), + }) logger.debug(f"媒体处理成功: message_id={message_id}, path={web_path}") else: logger.warning(f"媒体路径更新失败: message_id={message_id}")