135 lines
5.9 KiB
Python
135 lines
5.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
|
|
from db.base import BaseDBOperator
|
|
from db.connection import DBConnectionManager
|
|
|
|
|
|
class MemberContextDBOperator(BaseDBOperator):
|
|
"""群成员交互摘要数据库操作"""
|
|
|
|
def __init__(self, db_manager: DBConnectionManager):
|
|
super().__init__(db_manager)
|
|
self._create_tables()
|
|
|
|
def _create_tables(self):
|
|
try:
|
|
self.execute_update("""
|
|
CREATE TABLE IF NOT EXISTS t_member_context (
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
|
|
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
|
|
display_name VARCHAR(128) COMMENT '成员展示名',
|
|
activity_level VARCHAR(32) COMMENT '活跃等级',
|
|
message_pattern VARCHAR(255) COMMENT '发言模式',
|
|
interaction_style VARCHAR(255) COMMENT '互动风格',
|
|
response_style_hint VARCHAR(255) COMMENT '回复建议',
|
|
topics_of_interest TEXT COMMENT '兴趣主题(JSON)',
|
|
recent_focus TEXT COMMENT '近期关注(JSON)',
|
|
summary_text TEXT COMMENT '交互摘要',
|
|
confidence DECIMAL(4, 2) DEFAULT 0.00 COMMENT '摘要置信度',
|
|
source_message_count INT DEFAULT 0 COMMENT '样本消息数',
|
|
source_days INT DEFAULT 30 COMMENT '采样天数',
|
|
meta_json LONGTEXT COMMENT '附加元数据(JSON)',
|
|
last_profiled_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间',
|
|
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
|
|
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
|
|
UNIQUE KEY idx_member_context (chatroom_id, wxid)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='群成员交互摘要表';
|
|
""")
|
|
# 兼容已存在旧表的场景,补齐新增列
|
|
self.execute_update("""
|
|
ALTER TABLE t_member_context
|
|
ADD COLUMN IF NOT EXISTS interaction_style VARCHAR(255) COMMENT '互动风格'
|
|
""")
|
|
except Exception as e:
|
|
self.LOG.error(f"创建群成员交互摘要表失败: {e}")
|
|
|
|
def save_member_context(self, context: Dict) -> bool:
|
|
try:
|
|
data = {
|
|
"chatroom_id": context.get("chatroom_id", ""),
|
|
"wxid": context.get("wxid", ""),
|
|
"display_name": context.get("display_name", ""),
|
|
"activity_level": context.get("activity_level", ""),
|
|
"message_pattern": context.get("message_pattern", ""),
|
|
"interaction_style": context.get("interaction_style", ""),
|
|
"response_style_hint": context.get("response_style_hint", ""),
|
|
"topics_of_interest": json.dumps(context.get("topics_of_interest", []), ensure_ascii=False),
|
|
"recent_focus": json.dumps(context.get("recent_focus", []), ensure_ascii=False),
|
|
"summary_text": context.get("summary_text", ""),
|
|
"confidence": context.get("confidence", 0),
|
|
"source_message_count": context.get("source_message_count", 0),
|
|
"source_days": context.get("source_days", 30),
|
|
"meta_json": json.dumps(context.get("meta", {}), ensure_ascii=False),
|
|
"last_profiled_at": context.get("last_profiled_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
|
|
}
|
|
|
|
fields = ", ".join(data.keys())
|
|
placeholders = ", ".join(["%s"] * len(data))
|
|
update_clause = ", ".join(
|
|
[f"{k}=VALUES({k})" for k in data.keys() if k not in ("chatroom_id", "wxid")]
|
|
)
|
|
sql = f"""
|
|
INSERT INTO t_member_context ({fields})
|
|
VALUES ({placeholders})
|
|
ON DUPLICATE KEY UPDATE {update_clause}
|
|
"""
|
|
return self.execute_update(sql, tuple(data.values()))
|
|
except Exception as e:
|
|
self.LOG.error(f"保存群成员交互摘要失败: {e}")
|
|
return False
|
|
|
|
def get_member_context(self, chatroom_id: str, wxid: str) -> Optional[Dict]:
|
|
try:
|
|
sql = """
|
|
SELECT *
|
|
FROM t_member_context
|
|
WHERE chatroom_id = %s AND wxid = %s
|
|
LIMIT 1
|
|
"""
|
|
result = self.execute_query(sql, (chatroom_id, wxid), fetch_one=True)
|
|
return self._deserialize_row(result)
|
|
except Exception as e:
|
|
self.LOG.error(f"获取群成员交互摘要失败: {e}")
|
|
return None
|
|
|
|
def list_group_member_contexts(self, chatroom_id: str) -> List[Dict]:
|
|
try:
|
|
sql = """
|
|
SELECT *
|
|
FROM t_member_context
|
|
WHERE chatroom_id = %s
|
|
ORDER BY last_profiled_at DESC
|
|
"""
|
|
results = self.execute_query(sql, (chatroom_id,)) or []
|
|
return [self._deserialize_row(row) for row in results]
|
|
except Exception as e:
|
|
self.LOG.error(f"获取群成员交互摘要列表失败: {e}")
|
|
return []
|
|
|
|
@staticmethod
|
|
def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]:
|
|
if not row:
|
|
return row
|
|
|
|
for key in ("topics_of_interest", "recent_focus", "meta_json"):
|
|
value = row.get(key)
|
|
if not value:
|
|
row[key] = [] if key != "meta_json" else {}
|
|
continue
|
|
try:
|
|
row[key] = json.loads(value)
|
|
except Exception:
|
|
row[key] = [] if key != "meta_json" else {}
|
|
|
|
last_profiled_at = row.get("last_profiled_at")
|
|
if isinstance(last_profiled_at, datetime):
|
|
row["last_profiled_at"] = last_profiled_at.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
row["meta"] = row.get("meta_json", {})
|
|
|
|
return row
|