Files
abot/db/member_context_db.py
2026-04-02 11:49:20 +08:00

135 lines
5.9 KiB
Python

# -*- coding: utf-8 -*-
import json
from datetime import datetime
from typing import Dict, List, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class MemberContextDBOperator(BaseDBOperator):
"""群成员交互摘要数据库操作"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self._create_tables()
def _create_tables(self):
try:
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_member_context (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
display_name VARCHAR(128) COMMENT '成员展示名',
activity_level VARCHAR(32) COMMENT '活跃等级',
message_pattern VARCHAR(255) COMMENT '发言模式',
interaction_style VARCHAR(255) COMMENT '互动风格',
response_style_hint VARCHAR(255) COMMENT '回复建议',
topics_of_interest TEXT COMMENT '兴趣主题(JSON)',
recent_focus TEXT COMMENT '近期关注(JSON)',
summary_text TEXT COMMENT '交互摘要',
confidence DECIMAL(4, 2) DEFAULT 0.00 COMMENT '摘要置信度',
source_message_count INT DEFAULT 0 COMMENT '样本消息数',
source_days INT DEFAULT 30 COMMENT '采样天数',
meta_json LONGTEXT COMMENT '附加元数据(JSON)',
last_profiled_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY idx_member_context (chatroom_id, wxid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群成员交互摘要表';
""")
# 兼容已存在旧表的场景,补齐新增列
self.execute_update("""
ALTER TABLE t_member_context
ADD COLUMN IF NOT EXISTS interaction_style VARCHAR(255) COMMENT '互动风格'
""")
except Exception as e:
self.LOG.error(f"创建群成员交互摘要表失败: {e}")
def save_member_context(self, context: Dict) -> bool:
try:
data = {
"chatroom_id": context.get("chatroom_id", ""),
"wxid": context.get("wxid", ""),
"display_name": context.get("display_name", ""),
"activity_level": context.get("activity_level", ""),
"message_pattern": context.get("message_pattern", ""),
"interaction_style": context.get("interaction_style", ""),
"response_style_hint": context.get("response_style_hint", ""),
"topics_of_interest": json.dumps(context.get("topics_of_interest", []), ensure_ascii=False),
"recent_focus": json.dumps(context.get("recent_focus", []), ensure_ascii=False),
"summary_text": context.get("summary_text", ""),
"confidence": context.get("confidence", 0),
"source_message_count": context.get("source_message_count", 0),
"source_days": context.get("source_days", 30),
"meta_json": json.dumps(context.get("meta", {}), ensure_ascii=False),
"last_profiled_at": context.get("last_profiled_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
}
fields = ", ".join(data.keys())
placeholders = ", ".join(["%s"] * len(data))
update_clause = ", ".join(
[f"{k}=VALUES({k})" for k in data.keys() if k not in ("chatroom_id", "wxid")]
)
sql = f"""
INSERT INTO t_member_context ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
return self.execute_update(sql, tuple(data.values()))
except Exception as e:
self.LOG.error(f"保存群成员交互摘要失败: {e}")
return False
def get_member_context(self, chatroom_id: str, wxid: str) -> Optional[Dict]:
try:
sql = """
SELECT *
FROM t_member_context
WHERE chatroom_id = %s AND wxid = %s
LIMIT 1
"""
result = self.execute_query(sql, (chatroom_id, wxid), fetch_one=True)
return self._deserialize_row(result)
except Exception as e:
self.LOG.error(f"获取群成员交互摘要失败: {e}")
return None
def list_group_member_contexts(self, chatroom_id: str) -> List[Dict]:
try:
sql = """
SELECT *
FROM t_member_context
WHERE chatroom_id = %s
ORDER BY last_profiled_at DESC
"""
results = self.execute_query(sql, (chatroom_id,)) or []
return [self._deserialize_row(row) for row in results]
except Exception as e:
self.LOG.error(f"获取群成员交互摘要列表失败: {e}")
return []
@staticmethod
def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]:
if not row:
return row
for key in ("topics_of_interest", "recent_focus", "meta_json"):
value = row.get(key)
if not value:
row[key] = [] if key != "meta_json" else {}
continue
try:
row[key] = json.loads(value)
except Exception:
row[key] = [] if key != "meta_json" else {}
last_profiled_at = row.get("last_profiled_at")
if isinstance(last_profiled_at, datetime):
row["last_profiled_at"] = last_profiled_at.strftime("%Y-%m-%d %H:%M:%S")
row["meta"] = row.get("meta_json", {})
return row