为自动回复增加群画像数据库快照缓存

This commit is contained in:
liuwei
2026-04-24 16:21:00 +08:00
parent 8a813df4a3
commit fa9dc44bbe
3 changed files with 236 additions and 1 deletions

View File

@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
import json
from datetime import datetime
from typing import Dict, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class GroupProfileSnapshotDBOperator(BaseDBOperator):
"""群画像快照数据库操作"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self._create_tables()
def _create_tables(self):
try:
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_group_profile_snapshot (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
group_name VARCHAR(128) DEFAULT '' COMMENT '群名称',
profile_json LONGTEXT COMMENT '群画像快照JSON',
source_summary_latest_at DATETIME NULL COMMENT '构建时参考的最近群总结更新时间',
source_message_latest_at DATETIME NULL COMMENT '构建时参考的最近群消息时间',
source_summary_count INT NOT NULL DEFAULT 0 COMMENT '构建时参考的群总结条数',
source_message_sample_count INT NOT NULL DEFAULT 0 COMMENT '构建时参考的消息样本数',
last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后一次生成时间',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY idx_group_profile_snapshot (chatroom_id),
KEY idx_group_profile_generated_at (last_generated_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群画像快照表';
""")
except Exception as e:
self.LOG.error(f"创建群画像快照表失败: {e}")
def get_snapshot(self, chatroom_id: str) -> Optional[Dict]:
try:
sql = """
SELECT *
FROM t_group_profile_snapshot
WHERE chatroom_id = %s
LIMIT 1
"""
row = self.execute_query(sql, (chatroom_id,), fetch_one=True)
return self._deserialize_row(row)
except Exception as e:
self.LOG.error(f"获取群画像快照失败: {e}")
return None
def save_snapshot(self, snapshot: Dict) -> bool:
try:
data = {
"chatroom_id": snapshot.get("chatroom_id", ""),
"group_name": snapshot.get("group_name", ""),
"profile_json": json.dumps(snapshot.get("profile", {}), ensure_ascii=False),
"source_summary_latest_at": snapshot.get("source_summary_latest_at"),
"source_message_latest_at": snapshot.get("source_message_latest_at"),
"source_summary_count": int(snapshot.get("source_summary_count", 0) or 0),
"source_message_sample_count": int(snapshot.get("source_message_sample_count", 0) or 0),
"last_generated_at": snapshot.get(
"last_generated_at",
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
),
}
fields = ", ".join(data.keys())
placeholders = ", ".join(["%s"] * len(data))
update_clause = ", ".join(
[f"{key}=VALUES({key})" for key in data.keys() if key != "chatroom_id"]
)
sql = f"""
INSERT INTO t_group_profile_snapshot ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
return self.execute_update(sql, tuple(data.values()))
except Exception as e:
self.LOG.error(f"保存群画像快照失败: {e}")
return False
@staticmethod
def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]:
if not row:
return row
profile_json = row.get("profile_json")
if profile_json:
try:
row["profile_json"] = json.loads(profile_json)
except Exception:
row["profile_json"] = {}
else:
row["profile_json"] = {}
for key in (
"source_summary_latest_at",
"source_message_latest_at",
"last_generated_at",
"create_time",
"update_time",
):
value = row.get(key)
if isinstance(value, datetime):
row[key] = value.strftime("%Y-%m-%d %H:%M:%S")
row["profile"] = row.get("profile_json", {})
return row

View File

@@ -195,8 +195,10 @@ debug = true
# 1. 这里读取最近 5 份群摘要,再聚合成稳定主题/近期重点/未决问题;
# 2. 自动回复消费时优先走这些结构字段,减少 markdown 大段文本的理解损耗;
# 3. item_limit 控制每类字段带给模型的条数,避免群背景过重。
# 4. cache_ttl_sec 让群画像结果落库复用,在短时间内且源数据没变时直接读快照,避免每条消息重复聚合。
summary_history_limit = 5
summary_item_limit = 4
cache_ttl_sec = 600
[group_profiles.default]
mode = "social"

View File

@@ -3,8 +3,10 @@ from __future__ import annotations
import json
import re
from collections import Counter
from datetime import datetime
from typing import Any, Dict, List, Optional
from db.group_profile_snapshot_db import GroupProfileSnapshotDBOperator
from db.message_storage import MessageStorageDB
from db.message_summary_db import MessageSummaryDBOperator
@@ -29,14 +31,31 @@ class GroupMemoryService:
self.config = config or {}
self.message_db = MessageStorageDB(db_manager)
self.summary_db = MessageSummaryDBOperator(db_manager)
self.snapshot_db = GroupProfileSnapshotDBOperator(db_manager)
# 群聊自动回复不应该只盯着“昨天那一篇总结”:
# 1. 日摘要天然是日维度,如果只读最新一条,很容易把短期偶发波动误当成长期背景;
# 2. 这里改成读取最近几份摘要,再做轻量聚合,能让群长期画像更稳定;
# 3. 同时保留条数上限,避免群摘要本身反过来把 prompt 挤爆。
self.summary_history_limit = max(int(self.config.get("summary_history_limit", 5) or 5), 1)
self.summary_item_limit = max(int(self.config.get("summary_item_limit", 4) or 4), 1)
# 群画像快照缓存:
# 1. 自动回复是高频路径,群画像如果每条消息都重新聚合,会重复扫群总结和近期消息;
# 2. 这里引入数据库快照,只要在 TTL 内且源数据没有变化,就直接复用;
# 3. 这样快照既能跨进程/重启保留,又能把每条消息的聚合成本压下来。
self.cache_ttl_sec = max(int(self.config.get("cache_ttl_sec", 600) or 600), 0)
def build_group_memory_profile(self, room_id: str, group_name: str = "") -> Dict:
source_summary_latest_at = self._get_latest_summary_time(room_id)
source_message_latest_at = self._get_latest_group_message_time(room_id)
cached_profile = self._load_cached_profile_if_fresh(
room_id=room_id,
group_name=group_name,
source_summary_latest_at=source_summary_latest_at,
source_message_latest_at=source_message_latest_at,
)
if cached_profile:
return cached_profile
recent_messages = self.message_db.get_messages_for_summary(
room_id, hours_ago=48, min_messages=20, max_hours=168, max_results=300
) or []
@@ -100,7 +119,7 @@ class GroupMemoryService:
serious_hits=serious_hits,
short_message_ratio=(short_message_count / message_count) if message_count else 0.0,
)
return {
profile = {
"room_id": room_id,
"group_name": group_name,
"inferred_domain": inferred_domain,
@@ -111,7 +130,19 @@ class GroupMemoryService:
"structured_summary": structured_summary,
"summary_source_count": len(summary_records),
"summary_timeline": structured_summary.get("timeline", []) or [],
"cache_status": "rebuilt",
"last_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
self._save_profile_snapshot(
room_id=room_id,
group_name=group_name,
profile=profile,
source_summary_latest_at=source_summary_latest_at,
source_message_latest_at=source_message_latest_at,
source_summary_count=len(summary_records),
source_message_sample_count=len(recent_messages),
)
return profile
@staticmethod
def _count_hits(text: str, keywords: List[str]) -> int:
@@ -181,6 +212,99 @@ class GroupMemoryService:
records.append(normalized)
return records
def _load_cached_profile_if_fresh(
self,
*,
room_id: str,
group_name: str,
source_summary_latest_at: str,
source_message_latest_at: str,
) -> Optional[Dict]:
if self.cache_ttl_sec <= 0:
return None
snapshot = self.snapshot_db.get_snapshot(room_id)
if not snapshot:
return None
# 快照新鲜度判断分两层:
# 1. 先看快照是否还在 TTL 内,避免长期无限复用旧画像;
# 2. 再看“最近群总结更新时间 / 最近群消息时间”是否和上次构建时一致,
# 只有源数据没变,才允许直接复用。
last_generated_at = self._parse_dt(snapshot.get("last_generated_at"))
if not last_generated_at:
return None
age_sec = max((datetime.now() - last_generated_at).total_seconds(), 0.0)
if age_sec > float(self.cache_ttl_sec):
return None
if str(snapshot.get("source_summary_latest_at", "") or "") != str(source_summary_latest_at or ""):
return None
if str(snapshot.get("source_message_latest_at", "") or "") != str(source_message_latest_at or ""):
return None
profile = dict(snapshot.get("profile", {}) or {})
if not profile:
return None
profile["group_name"] = group_name or profile.get("group_name", "")
profile["cache_status"] = "hit"
profile["last_generated_at"] = str(snapshot.get("last_generated_at", "") or "")
return profile
def _save_profile_snapshot(
self,
*,
room_id: str,
group_name: str,
profile: Dict,
source_summary_latest_at: str,
source_message_latest_at: str,
source_summary_count: int,
source_message_sample_count: int,
) -> bool:
return self.snapshot_db.save_snapshot({
"chatroom_id": room_id,
"group_name": group_name,
"profile": profile,
"source_summary_latest_at": source_summary_latest_at or None,
"source_message_latest_at": source_message_latest_at or None,
"source_summary_count": source_summary_count,
"source_message_sample_count": source_message_sample_count,
"last_generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
})
def _get_latest_summary_time(self, room_id: str) -> str:
sql = """
SELECT COALESCE(MAX(update_time), MAX(last_generated_at), MAX(period_end)) AS latest_time
FROM t_message_summary
WHERE chatroom_id = %s
AND summary_type IN ('daily', 'manual')
"""
row = self.summary_db.execute_query(sql, (room_id,), fetch_one=True) or {}
return self._normalize_dt(row.get("latest_time"))
def _get_latest_group_message_time(self, room_id: str) -> str:
latest_message = self.message_db.get_group_last_message(room_id) or {}
return self._normalize_dt(latest_message.get("timestamp"))
@staticmethod
def _normalize_dt(value: Any) -> str:
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
text = str(value or "").strip()
return text
@staticmethod
def _parse_dt(value: Any) -> Optional[datetime]:
text = str(value or "").strip()
if not text:
return None
for fmt, size in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d", 10)):
try:
return datetime.strptime(text[:size], fmt)
except Exception:
continue
return None
def _build_structured_summary_digest(self, records: List[Dict]) -> Dict:
if not records:
return {