Files
abot/db/member_digest_db.py
liuwei 75dc3b289d 优化成员画像初始化状态管理,避免空群重复回扫历史窗口
- 新增成员摘要群级状态表,记录每个群的初始化状态、最近一次初始化时间、初始化窗口天数、生成摘要数量与触达成员数量
- 将成员画像群日摘要逻辑拆分为初始化模式与日常增量模式
- 首次运行且群内尚无日摘要时,自动按 bootstrap_days 回补最近已结束日期
- 初始化完成后写入群级状态:有产出记为 done,无产出记为 empty
- 后续调度或手动刷新时,若群级状态已是 done/empty,则不再重复按 60 天历史窗口回扫,改为仅按日常增量窗口处理前一天数据
- 解决冷群、空群、长期低活跃群在每天定时任务中反复初始化扫描的问题,减少无意义数据库扫描与等待时间
- 调整刷新群画像逻辑,即使最近 72 小时无活跃成员,首次初始化也能先尝试补历史摘要,再决定是否需要刷新成员画像
- 初始化模式下,会把历史窗口中真正产出过日摘要的成员补充进候选刷新集合,避免只依赖最近 72 小时活跃成员导致历史初始化不完整
2026-04-02 13:54:24 +08:00

229 lines
10 KiB
Python

# -*- coding: utf-8 -*-
import json
from datetime import datetime
from typing import Dict, List, Optional
from db.base import BaseDBOperator
from db.connection import DBConnectionManager
class MemberDigestDBOperator(BaseDBOperator):
"""成员分层摘要数据库操作"""
def __init__(self, db_manager: DBConnectionManager):
super().__init__(db_manager)
self._create_tables()
def _create_tables(self):
try:
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_member_digest (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID',
digest_type VARCHAR(16) NOT NULL COMMENT '摘要类型 daily|weekly|monthly',
period_key VARCHAR(32) NOT NULL COMMENT '周期主键',
period_start DATETIME NULL COMMENT '周期开始时间',
period_end DATETIME NULL COMMENT '周期结束时间',
display_name VARCHAR(128) COMMENT '成员展示名',
source_count INT DEFAULT 0 COMMENT '源数据条数',
summary_text TEXT COMMENT '摘要说明',
structured_json LONGTEXT COMMENT '结构化摘要JSON',
meta_json LONGTEXT COMMENT '附加元数据JSON',
last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY idx_member_digest (chatroom_id, wxid, digest_type, period_key),
KEY idx_digest_lookup (chatroom_id, wxid, digest_type, period_end)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='成员分层摘要表';
""")
self.execute_update("""
CREATE TABLE IF NOT EXISTS t_member_digest_group_state (
id INT AUTO_INCREMENT PRIMARY KEY,
chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID',
bootstrap_status VARCHAR(16) NOT NULL DEFAULT 'pending' COMMENT '初始化状态 pending|done|empty',
last_bootstrap_at DATETIME NULL COMMENT '最近一次初始化尝试时间',
bootstrap_days INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化窗口天数',
built_daily_count INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化生成的日摘要数量',
touched_member_count INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化触达的成员数量',
extra_json LONGTEXT NULL COMMENT '附加状态信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY idx_digest_group_state (chatroom_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='成员分层摘要群级状态表';
""")
except Exception as e:
self.LOG.error(f"创建成员分层摘要表失败: {e}")
def save_digest(self, digest: Dict) -> bool:
try:
data = {
"chatroom_id": digest.get("chatroom_id", ""),
"wxid": digest.get("wxid", ""),
"digest_type": digest.get("digest_type", ""),
"period_key": digest.get("period_key", ""),
"period_start": digest.get("period_start"),
"period_end": digest.get("period_end"),
"display_name": digest.get("display_name", ""),
"source_count": digest.get("source_count", 0),
"summary_text": digest.get("summary_text", ""),
"structured_json": json.dumps(digest.get("structured", {}), ensure_ascii=False),
"meta_json": json.dumps(digest.get("meta", {}), ensure_ascii=False),
"last_generated_at": digest.get("last_generated_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
}
fields = ", ".join(data.keys())
placeholders = ", ".join(["%s"] * len(data))
update_clause = ", ".join(
[f"{key}=VALUES({key})" for key in data.keys() if key not in ("chatroom_id", "wxid", "digest_type", "period_key")]
)
sql = f"""
INSERT INTO t_member_digest ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
return self.execute_update(sql, tuple(data.values()))
except Exception as e:
self.LOG.error(f"保存成员分层摘要失败: {e}")
return False
def get_digest(self, chatroom_id: str, wxid: str, digest_type: str, period_key: str) -> Optional[Dict]:
try:
sql = """
SELECT *
FROM t_member_digest
WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s AND period_key = %s
LIMIT 1
"""
row = self.execute_query(sql, (chatroom_id, wxid, digest_type, period_key), fetch_one=True)
return self._deserialize_row(row)
except Exception as e:
self.LOG.error(f"获取成员分层摘要失败: {e}")
return None
def list_digests(self, chatroom_id: str, wxid: str, digest_type: str, limit: int = 20) -> List[Dict]:
try:
sql = """
SELECT *
FROM t_member_digest
WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s
ORDER BY period_end DESC, period_key DESC
LIMIT %s
"""
rows = self.execute_query(sql, (chatroom_id, wxid, digest_type, limit)) or []
return [self._deserialize_row(row) for row in rows]
except Exception as e:
self.LOG.error(f"获取成员分层摘要列表失败: {e}")
return []
def list_digest_keys(self, chatroom_id: str, wxid: str, digest_type: str) -> List[str]:
try:
sql = """
SELECT period_key
FROM t_member_digest
WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s
"""
rows = self.execute_query(sql, (chatroom_id, wxid, digest_type)) or []
return [str(row.get("period_key")) for row in rows if row.get("period_key")]
except Exception as e:
self.LOG.error(f"获取成员摘要key失败: {e}")
return []
def list_period_digests(self, chatroom_id: str, wxid: str, digest_type: str,
period_keys: List[str]) -> List[Dict]:
try:
if not period_keys:
return []
placeholders = ", ".join(["%s"] * len(period_keys))
sql = f"""
SELECT *
FROM t_member_digest
WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s AND period_key IN ({placeholders})
ORDER BY period_end ASC, period_key ASC
"""
params = (chatroom_id, wxid, digest_type, *period_keys)
rows = self.execute_query(sql, params) or []
return [self._deserialize_row(row) for row in rows]
except Exception as e:
self.LOG.error(f"批量获取成员分层摘要失败: {e}")
return []
def get_group_state(self, chatroom_id: str) -> Optional[Dict]:
try:
sql = """
SELECT *
FROM t_member_digest_group_state
WHERE chatroom_id = %s
LIMIT 1
"""
row = self.execute_query(sql, (chatroom_id,), fetch_one=True)
if not row:
return None
extra_json = row.get("extra_json")
if extra_json:
try:
row["extra"] = json.loads(extra_json)
except Exception:
row["extra"] = {}
else:
row["extra"] = {}
for key in ("last_bootstrap_at", "create_time", "update_time"):
value = row.get(key)
if isinstance(value, datetime):
row[key] = value.strftime("%Y-%m-%d %H:%M:%S")
return row
except Exception as e:
self.LOG.error(f"获取成员分层摘要群状态失败: {e}")
return None
def save_group_state(self, chatroom_id: str, bootstrap_status: str,
bootstrap_days: int = 0, built_daily_count: int = 0,
touched_member_count: int = 0, extra: Optional[Dict] = None) -> bool:
try:
data = {
"chatroom_id": chatroom_id,
"bootstrap_status": bootstrap_status,
"last_bootstrap_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"bootstrap_days": int(bootstrap_days or 0),
"built_daily_count": int(built_daily_count or 0),
"touched_member_count": int(touched_member_count or 0),
"extra_json": json.dumps(extra or {}, ensure_ascii=False),
}
fields = ", ".join(data.keys())
placeholders = ", ".join(["%s"] * len(data))
update_clause = ", ".join(
[f"{key}=VALUES({key})" for key in data.keys() if key != "chatroom_id"]
)
sql = f"""
INSERT INTO t_member_digest_group_state ({fields})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
return self.execute_update(sql, tuple(data.values()))
except Exception as e:
self.LOG.error(f"保存成员分层摘要群状态失败: {e}")
return False
@staticmethod
def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]:
if not row:
return row
for key in ("structured_json", "meta_json"):
value = row.get(key)
if not value:
row[key] = {}
continue
try:
row[key] = json.loads(value)
except Exception:
row[key] = {}
for key in ("period_start", "period_end", "last_generated_at"):
value = row.get(key)
if isinstance(value, datetime):
row[key] = value.strftime("%Y-%m-%d %H:%M:%S")
row["structured"] = row.get("structured_json", {})
row["meta"] = row.get("meta_json", {})
return row