改动结果:

聊天窗口工具栏新增了“表情”按钮,打开表情库弹窗。
表情库会从历史“已下载落盘的表情消息”里自动聚合。
选中后直接通过 send_emoji_message(wxid, md5, total_length) 发原生表情,不是当普通图片发。
仍保持你现在的发送通道和聊天刷新逻辑。
主要改动文件:

后端接口与发送支持:contacts.py
表情资源查询:message_storage.py
前端表情面板与发送交互:contacts_management.html
新增接口:

GET /contacts/api/emojis:返回聚合后的表情库(md5、total_length、预览图)。
POST /contacts/api/send_message 新增 type=emoji。
我也做了 Python 语法检查,相关后端文件都通过了。
你可以直接在聊天弹窗里点“表情”试一下。如果表情库为空,通常是该群还没落盘到 image_path,让媒体下载功能先抓几条表情就会出现。
This commit is contained in:
liuwei
2026-04-15 11:21:32 +08:00
parent 96f50d929b
commit 47f8bd5717
3 changed files with 191 additions and 1 deletions

View File

@@ -1,4 +1,6 @@
import asyncio
import os
import re
import threading
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor
@@ -15,6 +17,9 @@ message_thread_pool = ThreadPoolExecutor(max_workers=10, thread_name_prefix="mes
# 创建共享的事件循环
shared_loop = None
loop_lock = threading.Lock()
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
_EMOJI_MD5_RE = re.compile(r'md5\s*=\s*[\"\']([0-9a-fA-F]{16,64})[\"\']', re.IGNORECASE)
_EMOJI_TOTALLEN_RE = re.compile(r'(?:totallen|total_len|len)\s*=\s*[\"\'](\d+)[\"\']', re.IGNORECASE)
def get_or_create_loop():
"""获取或创建共享的事件循环"""
@@ -139,6 +144,39 @@ def _compact_media_caption(content: str, fallback: str) -> str:
return text
def _extract_emoji_meta(attachment_url: str, image_path: str):
text = _safe_text(attachment_url)
md5 = ""
total_length = 0
md5_match = _EMOJI_MD5_RE.search(text)
if md5_match:
md5 = md5_match.group(1).lower()
len_match = _EMOJI_TOTALLEN_RE.search(text)
if len_match:
try:
total_length = int(len_match.group(1))
except Exception:
total_length = 0
if not md5 and image_path:
filename = os.path.basename(_safe_text(image_path))
stem = os.path.splitext(filename)[0]
if re.fullmatch(r"[0-9a-fA-F]{16,64}", stem):
md5 = stem.lower()
if total_length <= 0 and image_path and image_path.startswith("/static/"):
abs_path = os.path.join(_PROJECT_ROOT, image_path.lstrip("/").replace("/", os.sep))
if os.path.isfile(abs_path):
try:
total_length = int(os.path.getsize(abs_path))
except Exception:
total_length = 0
return md5, total_length
def _normalize_recent_message(server, raw_message: dict, chat_type: str, target_wxid: str):
sender = _safe_text(raw_message.get("sender")).strip()
message_type = str(raw_message.get("message_type", ""))
@@ -499,6 +537,46 @@ def api_recent_messages():
return jsonify({"success": False, "message": str(e)}), 500
@contacts_bp.route('/api/emojis', methods=['GET'])
@login_required
def api_emoji_library():
"""获取已下载表情库(从历史消息聚合)。"""
try:
server = current_app.dashboard_server
limit = min(max(int(request.args.get("limit", 200)), 1), 500)
records = server.message_storage.message_db.get_recent_emoji_assets(limit=limit)
dedup = {}
for item in records:
image_path = _safe_text(item.get("image_path")).strip()
if not image_path:
continue
md5, total_length = _extract_emoji_meta(_safe_text(item.get("attachment_url")), image_path)
if not md5 or total_length <= 0:
continue
if md5 in dedup:
continue
dedup[md5] = {
"md5": md5,
"total_length": total_length,
"preview_url": image_path,
"timestamp": _safe_text(item.get("timestamp")),
"group_id": _safe_text(item.get("group_id")),
"message_id": _safe_text(item.get("message_id")),
}
emojis = list(dedup.values())
return jsonify({
"success": True,
"data": {
"emojis": emojis,
"count": len(emojis)
}
})
except Exception as e:
logger.exception(f"获取表情库失败: {e}")
return jsonify({"success": False, "message": str(e)}), 500
@contacts_bp.route('/api/send_message', methods=['POST'])
@login_required
def api_send_message():
@@ -582,6 +660,19 @@ def api_send_message():
'message': '消息发送中'
})
elif msg_type == 'emoji':
if not isinstance(content, dict):
return jsonify({'success': False, 'message': '表情参数格式错误'})
md5 = _safe_text(content.get('md5')).strip().lower()
total_length = int(content.get('total_length') or 0)
if not md5 or total_length <= 0:
return jsonify({'success': False, 'message': '缺少表情 md5 或长度'})
send_message_in_thread(server.client.send_emoji_message, wxid, md5, total_length)
return jsonify({
'success': True,
'message': '消息发送中'
})
else:
return jsonify({'success': False, 'message': '不支持的消息类型'})