为 messages 表增加 raw_payload 并落盘完整原始消息

- 在 messages 表结构中新增 raw_payload LONGTEXT 字段(init.sql)

- 新增数据库迁移脚本 20260421_add_raw_payload_to_messages.sql,支持现网平滑加列

- 改造 MessageStorageDB.archive_message:优先写入 raw_payload,若旧库未加列自动回退旧 SQL

- 新增 _serialize_raw_payload 序列化逻辑,按要求保存完整消息信息且不做脱敏

- 增加详细中文注释,说明设计目的与兼容策略
This commit is contained in:
liuwei
2026-04-21 13:26:57 +08:00
parent a0c9c23e2c
commit dfa17c5f95
3 changed files with 72 additions and 9 deletions

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
from datetime import datetime
import json
from typing import Dict, List, Optional
from db.base import BaseDBOperator
@@ -15,18 +16,76 @@ class MessageStorageDB(BaseDBOperator):
super().__init__(db_manager)
def archive_message(self, msg: WxMessage) -> bool:
"""存档消息"""
"""存档消息
说明:
1. 结构化字段content/message_type/message_xml 等)继续保留,便于高频查询;
2. 新增 raw_payload 字段,落盘 API 原始消息的完整序列化内容,便于后续排障与二次特征提取;
3. 为兼容尚未升级表结构的老环境,先尝试新 SQL失败后自动回退旧 SQL。
"""
now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sql = """
INSERT INTO messages (group_id, timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb)
# 先准备好可复用字段,避免在两套 SQL 中重复拼接业务字段。
base_params = (
msg.roomid,
now_time,
msg.sender,
str(msg.content.clean_content),
msg.msg_type.value,
str(msg.content.xml_content),
msg.msg_id,
msg.msg_source,
"",
)
# 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。
raw_payload = self._serialize_raw_payload(msg)
sql_with_raw_payload = """
INSERT INTO messages (
group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, raw_payload, message_thumb
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params_with_raw_payload = (*base_params[:8], raw_payload, base_params[8])
if self.execute_update(sql_with_raw_payload, params_with_raw_payload):
return True
# 兼容旧表结构:如果线上还没执行 ALTER TABLE加列前仍可继续正常归档。
sql_legacy = """
INSERT INTO messages (
group_id, timestamp, sender, content, message_type,
attachment_url, message_id, message_xml, message_thumb
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
msg.roomid, now_time, msg.sender, str(msg.content.clean_content), msg.msg_type.value,
str(msg.content.xml_content), msg.msg_id,
msg.msg_source, "")
result = self.execute_update(sql, params)
return result
return self.execute_update(sql_legacy, base_params)
def _serialize_raw_payload(self, msg: WxMessage) -> str:
"""将消息对象尽量完整地序列化为 JSON 字符串。
序列化优先级(从高到低):
1. pydantic v2: model_dump_json
2. pydantic v1: json
3. 自定义方法: to_json
4. 通用兜底: __dict__
说明:
- 不做脱敏,按用户要求保存完整信息;
- 使用 ensure_ascii=False 保留中文原文,便于后续排查。
"""
try:
if hasattr(msg, "model_dump_json") and callable(getattr(msg, "model_dump_json")):
return msg.model_dump_json(exclude_none=False)
if hasattr(msg, "json") and callable(getattr(msg, "json")):
return msg.json(ensure_ascii=False)
if hasattr(msg, "to_json") and callable(getattr(msg, "to_json")):
raw_text = msg.to_json()
return raw_text if isinstance(raw_text, str) else json.dumps(raw_text, ensure_ascii=False, default=str)
return json.dumps(getattr(msg, "__dict__", {"repr": str(msg)}), ensure_ascii=False, default=str)
except Exception:
# 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。
return str(msg)
def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]:
"""获取最近的消息"""

View File

@@ -39,6 +39,7 @@ create or replace table message_archive.messages
attachment_url text null comment '附件URL图片、视频链接',
message_id varchar(32) null comment '消息 id',
message_xml text null comment '消息 xml 部分',
raw_payload longtext null comment 'API 原始消息完整负载(完整序列化数据)',
message_thumb longtext null comment '视频或图片消息的缩略图路径',
image_path varchar(255) null comment '图片URL路径'
)

View File

@@ -0,0 +1,3 @@
-- 为 messages 表新增 raw_payload 字段,用于存储 API 原始消息完整负载
ALTER TABLE message_archive.messages
ADD COLUMN IF NOT EXISTS raw_payload LONGTEXT NULL COMMENT 'API 原始消息完整负载(完整序列化数据)' AFTER message_xml;