From fa9dc44bbe47cff49da3aa6dc375cbc00824cb1d Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 24 Apr 2026 16:21:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=BA=E8=87=AA=E5=8A=A8=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BE=A4=E7=94=BB=E5=83=8F=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=BF=AB=E7=85=A7=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/group_profile_snapshot_db.py | 109 +++++++++++++++ plugins/ai_auto_response/config.toml | 2 + .../memory/group_memory_profile.py | 126 +++++++++++++++++- 3 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 db/group_profile_snapshot_db.py diff --git a/db/group_profile_snapshot_db.py b/db/group_profile_snapshot_db.py new file mode 100644 index 0000000..195af92 --- /dev/null +++ b/db/group_profile_snapshot_db.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +import json +from datetime import datetime +from typing import Dict, Optional + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class GroupProfileSnapshotDBOperator(BaseDBOperator): + """群画像快照数据库操作""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + self._create_tables() + + def _create_tables(self): + try: + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_group_profile_snapshot ( + id INT AUTO_INCREMENT PRIMARY KEY, + chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', + group_name VARCHAR(128) DEFAULT '' COMMENT '群名称', + profile_json LONGTEXT COMMENT '群画像快照JSON', + source_summary_latest_at DATETIME NULL COMMENT '构建时参考的最近群总结更新时间', + source_message_latest_at DATETIME NULL COMMENT '构建时参考的最近群消息时间', + source_summary_count INT NOT NULL DEFAULT 0 COMMENT '构建时参考的群总结条数', + source_message_sample_count INT NOT NULL DEFAULT 0 COMMENT '构建时参考的消息样本数', + last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后一次生成时间', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY idx_group_profile_snapshot (chatroom_id), + KEY idx_group_profile_generated_at (last_generated_at) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群画像快照表'; + """) + except Exception as e: + self.LOG.error(f"创建群画像快照表失败: {e}") + + def get_snapshot(self, chatroom_id: str) -> Optional[Dict]: + try: + sql = """ + SELECT * + FROM t_group_profile_snapshot + WHERE chatroom_id = %s + LIMIT 1 + """ + row = self.execute_query(sql, (chatroom_id,), fetch_one=True) + return self._deserialize_row(row) + except Exception as e: + self.LOG.error(f"获取群画像快照失败: {e}") + return None + + def save_snapshot(self, snapshot: Dict) -> bool: + try: + data = { + "chatroom_id": snapshot.get("chatroom_id", ""), + "group_name": snapshot.get("group_name", ""), + "profile_json": json.dumps(snapshot.get("profile", {}), ensure_ascii=False), + "source_summary_latest_at": snapshot.get("source_summary_latest_at"), + "source_message_latest_at": snapshot.get("source_message_latest_at"), + "source_summary_count": int(snapshot.get("source_summary_count", 0) or 0), + "source_message_sample_count": int(snapshot.get("source_message_sample_count", 0) or 0), + "last_generated_at": snapshot.get( + "last_generated_at", + datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + ), + } + fields = ", ".join(data.keys()) + placeholders = ", ".join(["%s"] * len(data)) + update_clause = ", ".join( + [f"{key}=VALUES({key})" for key in data.keys() if key != "chatroom_id"] + ) + sql = f""" + INSERT INTO t_group_profile_snapshot ({fields}) + VALUES ({placeholders}) + ON DUPLICATE KEY UPDATE {update_clause} + """ + return self.execute_update(sql, tuple(data.values())) + except Exception as e: + self.LOG.error(f"保存群画像快照失败: {e}") + return False + + @staticmethod + def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]: + if not row: + return row + + profile_json = row.get("profile_json") + if profile_json: + try: + row["profile_json"] = json.loads(profile_json) + except Exception: + row["profile_json"] = {} + else: + row["profile_json"] = {} + + for key in ( + "source_summary_latest_at", + "source_message_latest_at", + "last_generated_at", + "create_time", + "update_time", + ): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") + + row["profile"] = row.get("profile_json", {}) + return row diff --git a/plugins/ai_auto_response/config.toml b/plugins/ai_auto_response/config.toml index 178a5e7..479f856 100644 --- a/plugins/ai_auto_response/config.toml +++ b/plugins/ai_auto_response/config.toml @@ -195,8 +195,10 @@ debug = true # 1. 这里读取最近 5 份群摘要,再聚合成稳定主题/近期重点/未决问题; # 2. 自动回复消费时优先走这些结构字段,减少 markdown 大段文本的理解损耗; # 3. item_limit 控制每类字段带给模型的条数,避免群背景过重。 +# 4. cache_ttl_sec 让群画像结果落库复用,在短时间内且源数据没变时直接读快照,避免每条消息重复聚合。 summary_history_limit = 5 summary_item_limit = 4 +cache_ttl_sec = 600 [group_profiles.default] mode = "social" diff --git a/plugins/ai_auto_response/memory/group_memory_profile.py b/plugins/ai_auto_response/memory/group_memory_profile.py index bbcf6d9..b6d3416 100644 --- a/plugins/ai_auto_response/memory/group_memory_profile.py +++ b/plugins/ai_auto_response/memory/group_memory_profile.py @@ -3,8 +3,10 @@ from __future__ import annotations import json import re from collections import Counter +from datetime import datetime from typing import Any, Dict, List, Optional +from db.group_profile_snapshot_db import GroupProfileSnapshotDBOperator from db.message_storage import MessageStorageDB from db.message_summary_db import MessageSummaryDBOperator @@ -29,14 +31,31 @@ class GroupMemoryService: self.config = config or {} self.message_db = MessageStorageDB(db_manager) self.summary_db = MessageSummaryDBOperator(db_manager) + self.snapshot_db = GroupProfileSnapshotDBOperator(db_manager) # 群聊自动回复不应该只盯着“昨天那一篇总结”: # 1. 日摘要天然是日维度,如果只读最新一条,很容易把短期偶发波动误当成长期背景; # 2. 这里改成读取最近几份摘要,再做轻量聚合,能让群长期画像更稳定; # 3. 同时保留条数上限,避免群摘要本身反过来把 prompt 挤爆。 self.summary_history_limit = max(int(self.config.get("summary_history_limit", 5) or 5), 1) self.summary_item_limit = max(int(self.config.get("summary_item_limit", 4) or 4), 1) + # 群画像快照缓存: + # 1. 自动回复是高频路径,群画像如果每条消息都重新聚合,会重复扫群总结和近期消息; + # 2. 这里引入数据库快照,只要在 TTL 内且源数据没有变化,就直接复用; + # 3. 这样快照既能跨进程/重启保留,又能把每条消息的聚合成本压下来。 + self.cache_ttl_sec = max(int(self.config.get("cache_ttl_sec", 600) or 600), 0) def build_group_memory_profile(self, room_id: str, group_name: str = "") -> Dict: + source_summary_latest_at = self._get_latest_summary_time(room_id) + source_message_latest_at = self._get_latest_group_message_time(room_id) + cached_profile = self._load_cached_profile_if_fresh( + room_id=room_id, + group_name=group_name, + source_summary_latest_at=source_summary_latest_at, + source_message_latest_at=source_message_latest_at, + ) + if cached_profile: + return cached_profile + recent_messages = self.message_db.get_messages_for_summary( room_id, hours_ago=48, min_messages=20, max_hours=168, max_results=300 ) or [] @@ -100,7 +119,7 @@ class GroupMemoryService: serious_hits=serious_hits, short_message_ratio=(short_message_count / message_count) if message_count else 0.0, ) - return { + profile = { "room_id": room_id, "group_name": group_name, "inferred_domain": inferred_domain, @@ -111,7 +130,19 @@ class GroupMemoryService: "structured_summary": structured_summary, "summary_source_count": len(summary_records), "summary_timeline": structured_summary.get("timeline", []) or [], + "cache_status": "rebuilt", + "last_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } + self._save_profile_snapshot( + room_id=room_id, + group_name=group_name, + profile=profile, + source_summary_latest_at=source_summary_latest_at, + source_message_latest_at=source_message_latest_at, + source_summary_count=len(summary_records), + source_message_sample_count=len(recent_messages), + ) + return profile @staticmethod def _count_hits(text: str, keywords: List[str]) -> int: @@ -181,6 +212,99 @@ class GroupMemoryService: records.append(normalized) return records + def _load_cached_profile_if_fresh( + self, + *, + room_id: str, + group_name: str, + source_summary_latest_at: str, + source_message_latest_at: str, + ) -> Optional[Dict]: + if self.cache_ttl_sec <= 0: + return None + snapshot = self.snapshot_db.get_snapshot(room_id) + if not snapshot: + return None + + # 快照新鲜度判断分两层: + # 1. 先看快照是否还在 TTL 内,避免长期无限复用旧画像; + # 2. 再看“最近群总结更新时间 / 最近群消息时间”是否和上次构建时一致, + # 只有源数据没变,才允许直接复用。 + last_generated_at = self._parse_dt(snapshot.get("last_generated_at")) + if not last_generated_at: + return None + age_sec = max((datetime.now() - last_generated_at).total_seconds(), 0.0) + if age_sec > float(self.cache_ttl_sec): + return None + + if str(snapshot.get("source_summary_latest_at", "") or "") != str(source_summary_latest_at or ""): + return None + if str(snapshot.get("source_message_latest_at", "") or "") != str(source_message_latest_at or ""): + return None + + profile = dict(snapshot.get("profile", {}) or {}) + if not profile: + return None + profile["group_name"] = group_name or profile.get("group_name", "") + profile["cache_status"] = "hit" + profile["last_generated_at"] = str(snapshot.get("last_generated_at", "") or "") + return profile + + def _save_profile_snapshot( + self, + *, + room_id: str, + group_name: str, + profile: Dict, + source_summary_latest_at: str, + source_message_latest_at: str, + source_summary_count: int, + source_message_sample_count: int, + ) -> bool: + return self.snapshot_db.save_snapshot({ + "chatroom_id": room_id, + "group_name": group_name, + "profile": profile, + "source_summary_latest_at": source_summary_latest_at or None, + "source_message_latest_at": source_message_latest_at or None, + "source_summary_count": source_summary_count, + "source_message_sample_count": source_message_sample_count, + "last_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + }) + + def _get_latest_summary_time(self, room_id: str) -> str: + sql = """ + SELECT COALESCE(MAX(update_time), MAX(last_generated_at), MAX(period_end)) AS latest_time + FROM t_message_summary + WHERE chatroom_id = %s + AND summary_type IN ('daily', 'manual') + """ + row = self.summary_db.execute_query(sql, (room_id,), fetch_one=True) or {} + return self._normalize_dt(row.get("latest_time")) + + def _get_latest_group_message_time(self, room_id: str) -> str: + latest_message = self.message_db.get_group_last_message(room_id) or {} + return self._normalize_dt(latest_message.get("timestamp")) + + @staticmethod + def _normalize_dt(value: Any) -> str: + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + text = str(value or "").strip() + return text + + @staticmethod + def _parse_dt(value: Any) -> Optional[datetime]: + text = str(value or "").strip() + if not text: + return None + for fmt, size in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d", 10)): + try: + return datetime.strptime(text[:size], fmt) + except Exception: + continue + return None + def _build_structured_summary_digest(self, records: List[Dict]) -> Dict: if not records: return {