From dfa17c5f9546ec53311cd9319743c73a1d52cbf0 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 21 Apr 2026 13:26:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BA=20messages=20=E8=A1=A8=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=20raw=5Fpayload=20=E5=B9=B6=E8=90=BD=E7=9B=98?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=E5=8E=9F=E5=A7=8B=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 messages 表结构中新增 raw_payload LONGTEXT 字段(init.sql) - 新增数据库迁移脚本 20260421_add_raw_payload_to_messages.sql,支持现网平滑加列 - 改造 MessageStorageDB.archive_message:优先写入 raw_payload,若旧库未加列自动回退旧 SQL - 新增 _serialize_raw_payload 序列化逻辑,按要求保存完整消息信息且不做脱敏 - 增加详细中文注释,说明设计目的与兼容策略 --- db/message_storage.py | 77 ++++++++++++++++--- db/scripts/init.sql | 1 + .../20260421_add_raw_payload_to_messages.sql | 3 + 3 files changed, 72 insertions(+), 9 deletions(-) create mode 100644 db/scripts/migrations/20260421_add_raw_payload_to_messages.sql diff --git a/db/message_storage.py b/db/message_storage.py index ec5f4b6..234114c 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- from datetime import datetime +import json from typing import Dict, List, Optional from db.base import BaseDBOperator @@ -15,18 +16,76 @@ class MessageStorageDB(BaseDBOperator): super().__init__(db_manager) def archive_message(self, msg: WxMessage) -> bool: - """存档消息""" + """存档消息 + + 说明: + 1. 结构化字段(content/message_type/message_xml 等)继续保留,便于高频查询; + 2. 新增 raw_payload 字段,落盘 API 原始消息的完整序列化内容,便于后续排障与二次特征提取; + 3. 为兼容尚未升级表结构的老环境,先尝试新 SQL,失败后自动回退旧 SQL。 + """ now_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - sql = """ - INSERT INTO messages (group_id, timestamp, sender, content, message_type, attachment_url, message_id, message_xml, message_thumb) + # 先准备好可复用字段,避免在两套 SQL 中重复拼接业务字段。 + base_params = ( + msg.roomid, + now_time, + msg.sender, + str(msg.content.clean_content), + msg.msg_type.value, + str(msg.content.xml_content), + msg.msg_id, + msg.msg_source, + "", + ) + + # 尽可能保存完整原始负载:优先使用对象自带序列化能力,其次兜底到 __dict__。 + raw_payload = self._serialize_raw_payload(msg) + + sql_with_raw_payload = """ + INSERT INTO messages ( + group_id, timestamp, sender, content, message_type, + attachment_url, message_id, message_xml, raw_payload, message_thumb + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + params_with_raw_payload = (*base_params[:8], raw_payload, base_params[8]) + if self.execute_update(sql_with_raw_payload, params_with_raw_payload): + return True + + # 兼容旧表结构:如果线上还没执行 ALTER TABLE,加列前仍可继续正常归档。 + sql_legacy = """ + INSERT INTO messages ( + group_id, timestamp, sender, content, message_type, + attachment_url, message_id, message_xml, message_thumb + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ - params = ( - msg.roomid, now_time, msg.sender, str(msg.content.clean_content), msg.msg_type.value, - str(msg.content.xml_content), msg.msg_id, - msg.msg_source, "") - result = self.execute_update(sql, params) - return result + return self.execute_update(sql_legacy, base_params) + + def _serialize_raw_payload(self, msg: WxMessage) -> str: + """将消息对象尽量完整地序列化为 JSON 字符串。 + + 序列化优先级(从高到低): + 1. pydantic v2: model_dump_json + 2. pydantic v1: json + 3. 自定义方法: to_json + 4. 通用兜底: __dict__ + + 说明: + - 不做脱敏,按用户要求保存完整信息; + - 使用 ensure_ascii=False 保留中文原文,便于后续排查。 + """ + try: + if hasattr(msg, "model_dump_json") and callable(getattr(msg, "model_dump_json")): + return msg.model_dump_json(exclude_none=False) + if hasattr(msg, "json") and callable(getattr(msg, "json")): + return msg.json(ensure_ascii=False) + if hasattr(msg, "to_json") and callable(getattr(msg, "to_json")): + raw_text = msg.to_json() + return raw_text if isinstance(raw_text, str) else json.dumps(raw_text, ensure_ascii=False, default=str) + return json.dumps(getattr(msg, "__dict__", {"repr": str(msg)}), ensure_ascii=False, default=str) + except Exception: + # 最后的保底策略:即使序列化失败,也确保字段有可追溯文本,避免丢失原始上下文。 + return str(msg) def get_recent_messages(self, group_id: str, hours_ago: int = 8, min_content_length: int = 6) -> List[Dict]: """获取最近的消息""" diff --git a/db/scripts/init.sql b/db/scripts/init.sql index 5f3e123..dfe804b 100644 --- a/db/scripts/init.sql +++ b/db/scripts/init.sql @@ -39,6 +39,7 @@ create or replace table message_archive.messages attachment_url text null comment '附件URL(图片、视频链接)', message_id varchar(32) null comment '消息 id', message_xml text null comment '消息 xml 部分', + raw_payload longtext null comment 'API 原始消息完整负载(完整序列化数据)', message_thumb longtext null comment '视频或图片消息的缩略图路径', image_path varchar(255) null comment '图片URL路径' ) diff --git a/db/scripts/migrations/20260421_add_raw_payload_to_messages.sql b/db/scripts/migrations/20260421_add_raw_payload_to_messages.sql new file mode 100644 index 0000000..c397f1b --- /dev/null +++ b/db/scripts/migrations/20260421_add_raw_payload_to_messages.sql @@ -0,0 +1,3 @@ +-- 为 messages 表新增 raw_payload 字段,用于存储 API 原始消息完整负载 +ALTER TABLE message_archive.messages + ADD COLUMN IF NOT EXISTS raw_payload LONGTEXT NULL COMMENT 'API 原始消息完整负载(完整序列化数据)' AFTER message_xml;