完善表情资产后台能力并补充群总结落库

- 新增表情资产表,支持表情文件落盘后的资产沉淀、查询与发送时间回写
- 将表情下载从消息主链路中移出,改为后台定时批处理,降低同步入库阻塞风险
- 抽取通用 CDN 下载与 base64 落盘能力,统一图片与表情文件处理方式
- 在后台通讯录聊天窗口增加表情资产面板,支持查看资产并直接选择发送表情
- 新增后台表情资产接口,支持按群过滤最近表情素材
- 优化消息列表中的表情消息展示,支持在后台直接预览表情图片
- 启动时不再同步补偿历史表情,统一交由定时任务处理,避免影响系统稳定性
- 新增群总结落库表,支持将每日总结写入数据库,便于后续知识库提取与复用
- 将定时总结结果写入数据库,保留总结文本、周期信息、消息数量和元数据
This commit is contained in:
liuwei
2026-04-02 17:52:17 +08:00
parent a4b87f4c7a
commit 2a54650a6f
11 changed files with 671 additions and 17 deletions

View File

@@ -5,14 +5,13 @@ import xml.etree.ElementTree as ET
import concurrent.futures # 添加线程池支持
import os
import base64
import imghdr
import re
from threading import Lock
from typing import Dict
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from db.emoji_asset_db import EmojiAssetDBOperator
from db.levels_db import LevelsDBOperator
from db.message_storage import MessageStorageDB
# 导入积分系统
@@ -33,6 +32,7 @@ class MessageStorage:
self.db_manager = DBConnectionManager.get_instance()
self.message_db = MessageStorageDB(self.db_manager)
self.contacts_db = ContactsDBOperator(self.db_manager)
self.emoji_asset_db = EmojiAssetDBOperator(self.db_manager)
self.points_db = PointsDBOperator(self.db_manager)
# 初始化本地缓存字典,使用 group_id 作为键
@@ -60,6 +60,9 @@ class MessageStorage:
# 正则(替代 XML 解析)
self._aeskey_re = re.compile(r'aeskey="(.*?)"')
self._cdn_re = re.compile(r'cdnthumburl="(.*?)"')
self._emoji_cdn_re = re.compile(r'cdnurl="(.*?)"')
self._emoji_encrypt_re = re.compile(r'encrypturl="(.*?)"')
self._emoji_thumb_re = re.compile(r'thumburl="(.*?)"')
# 修改为项目根目录下的 static/images
self.image_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static", "images")
@@ -119,6 +122,118 @@ class MessageStorage:
'error': str(e)
}
def _extract_emoji_download_info(self, xml_content: str) -> Dict:
if not xml_content:
return {}
aeskey_match = self._aeskey_re.search(xml_content)
if not aeskey_match:
return {}
url_match = (
self._emoji_cdn_re.search(xml_content)
or self._emoji_encrypt_re.search(xml_content)
or self._emoji_thumb_re.search(xml_content)
)
if not url_match:
return {}
md5_match = re.search(r'md5="(.*?)"', xml_content)
length_match = re.search(r'len="(\d+)"', xml_content)
return {
"aeskey": aeskey_match.group(1),
"url": url_match.group(1),
"md5": md5_match.group(1) if md5_match else "",
"length": int(length_match.group(1)) if length_match else 0,
}
async def _process_emoji_record(self, msg_record: Dict) -> bool:
if not self.client:
logger.warning("表情消息未处理,微信客户端未初始化")
return False
message_id = int(msg_record.get("message_id") or 0)
room_id = msg_record.get("group_id") or "unknown"
sender = msg_record.get("sender", "")
xml_content = msg_record.get("attachment_url") or msg_record.get("message_xml") or ""
emoji_info = self._extract_emoji_download_info(xml_content)
if not emoji_info:
logger.warning(f"表情消息解析失败,未提取到下载参数: msg_id={message_id}")
return False
try:
base64_str = await self.client.download_cdn_file(emoji_info["aeskey"], emoji_info["url"])
if not base64_str:
logger.warning(f"表情下载返回为空: msg_id={message_id}")
return False
group_dir = os.path.join(self.image_dir, room_id)
file_stem = f"{message_id}_emoji"
file_path = await self.client.base64_to_file_autoext(
base64_str,
file_stem=file_stem,
file_path=group_dir,
default_ext=".bin",
)
ext = os.path.splitext(file_path)[1] or ".bin"
file_name = os.path.basename(file_path)
web_path = f"/static/images/{room_id}/{file_name}"
updated = self.message_db.update_message_image_file_path(message_id, web_path)
if updated:
if emoji_info.get("md5"):
self.emoji_asset_db.save_asset({
"md5": emoji_info.get("md5", ""),
"total_length": emoji_info.get("length", 0),
"file_path": web_path,
"file_ext": ext,
"source_message_id": message_id,
"source_chatroom_id": room_id,
"source_wxid": sender,
})
else:
logger.warning(f"表情已落盘但缺少md5跳过资产入库: msg_id={message_id}")
logger.info(
f"表情处理成功: msg_id={message_id}, roomid={room_id}, ext={ext}, path={web_path}"
)
else:
logger.warning(
f"表情文件已落盘但数据库未更新: msg_id={message_id}, roomid={room_id}, path={web_path}"
)
return updated
except Exception as e:
logger.exception(f"处理表情消息出错: msg_id={message_id}, error={e}")
return False
async def process_pending_emojis(self, minutes_ago: int = 1440, batch_size: int = 20):
"""定时处理最近未落盘的表情消息"""
if not self.client:
logger.warning("微信客户端未初始化,跳过表情处理")
return
try:
pending_messages = self.message_db.get_pending_emoji_messages(minutes_ago, batch_size)
if not pending_messages:
logger.debug(f"未发现待处理的表情消息(最近{minutes_ago}分钟)")
return
logger.info(f"开始处理 {len(pending_messages)} 条待处理表情消息")
success_count = 0
fail_count = 0
for msg_record in pending_messages:
if await self._process_emoji_record(msg_record):
success_count += 1
else:
fail_count += 1
logger.info(
f"表情处理完成: 成功={success_count}, 失败={fail_count}, 总计={len(pending_messages)}"
)
except Exception as e:
logger.exception(f"定时处理表情任务出错: {e}")
def process_image(self, msg: WxMessage):
"""图片消息已通过 archive_message 存入数据库,不再实时处理
改为定时任务批量处理,减少对主流程的影响和数据库锁竞争