# -*- coding: utf-8 -*- import json from datetime import datetime from typing import Dict, Optional from db.base import BaseDBOperator from db.connection import DBConnectionManager class MessageSummaryDBOperator(BaseDBOperator): """群消息总结数据库操作""" def __init__(self, db_manager: DBConnectionManager): super().__init__(db_manager) self._create_tables() def _create_tables(self): try: self.execute_update(""" CREATE TABLE IF NOT EXISTS t_message_summary ( id INT AUTO_INCREMENT PRIMARY KEY, chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', group_name VARCHAR(128) DEFAULT '' COMMENT '群名称', summary_type VARCHAR(16) NOT NULL COMMENT '总结类型 daily|manual', period_key VARCHAR(32) NOT NULL COMMENT '周期主键,如 2026-04-01', period_start DATETIME NULL COMMENT '总结周期开始时间', period_end DATETIME NULL COMMENT '总结周期结束时间', source_message_count INT NOT NULL DEFAULT 0 COMMENT '源消息数量', summary_text LONGTEXT COMMENT '总结文本', image_path VARCHAR(255) DEFAULT NULL COMMENT '总结图片路径', meta_json LONGTEXT COMMENT '附加元数据JSON', last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY idx_message_summary (chatroom_id, summary_type, period_key), KEY idx_message_summary_lookup (chatroom_id, period_end) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群消息总结表'; """) except Exception as e: self.LOG.error(f"创建群消息总结表失败: {e}") def save_summary(self, summary: Dict) -> bool: try: data = { "chatroom_id": summary.get("chatroom_id", ""), "group_name": summary.get("group_name", ""), "summary_type": summary.get("summary_type", "daily"), "period_key": summary.get("period_key", ""), "period_start": summary.get("period_start"), "period_end": summary.get("period_end"), "source_message_count": int(summary.get("source_message_count", 0) or 0), "summary_text": summary.get("summary_text", ""), "image_path": summary.get("image_path"), "meta_json": json.dumps(summary.get("meta", {}), ensure_ascii=False), "last_generated_at": summary.get( "last_generated_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S") ), } fields = ", ".join(data.keys()) placeholders = ", ".join(["%s"] * len(data)) update_clause = ", ".join( [ f"{key}=VALUES({key})" for key in data.keys() if key not in ("chatroom_id", "summary_type", "period_key") ] ) sql = f""" INSERT INTO t_message_summary ({fields}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {update_clause} """ return self.execute_update(sql, tuple(data.values())) except Exception as e: self.LOG.error(f"保存群消息总结失败: {e}") return False def get_summary(self, chatroom_id: str, summary_type: str, period_key: str) -> Optional[Dict]: try: sql = """ SELECT * FROM t_message_summary WHERE chatroom_id = %s AND summary_type = %s AND period_key = %s LIMIT 1 """ row = self.execute_query(sql, (chatroom_id, summary_type, period_key), fetch_one=True) return self._deserialize_row(row) except Exception as e: self.LOG.error(f"获取群消息总结失败: {e}") return None @staticmethod def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]: if not row: return row meta_json = row.get("meta_json") if meta_json: try: row["meta_json"] = json.loads(meta_json) except Exception: row["meta_json"] = {} else: row["meta_json"] = {} for key in ("period_start", "period_end", "last_generated_at", "create_time", "update_time"): value = row.get(key) if isinstance(value, datetime): row[key] = value.strftime("%Y-%m-%d %H:%M:%S") row["meta"] = row.get("meta_json", {}) return row