diff --git a/db/member_digest_db.py b/db/member_digest_db.py index 854a8cc..1c087d4 100644 --- a/db/member_digest_db.py +++ b/db/member_digest_db.py @@ -37,6 +37,21 @@ class MemberDigestDBOperator(BaseDBOperator): KEY idx_digest_lookup (chatroom_id, wxid, digest_type, period_end) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='成员分层摘要表'; """) + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_member_digest_group_state ( + id INT AUTO_INCREMENT PRIMARY KEY, + chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', + bootstrap_status VARCHAR(16) NOT NULL DEFAULT 'pending' COMMENT '初始化状态 pending|done|empty', + last_bootstrap_at DATETIME NULL COMMENT '最近一次初始化尝试时间', + bootstrap_days INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化窗口天数', + built_daily_count INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化生成的日摘要数量', + touched_member_count INT NOT NULL DEFAULT 0 COMMENT '最近一次初始化触达的成员数量', + extra_json LONGTEXT NULL COMMENT '附加状态信息', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY idx_digest_group_state (chatroom_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='成员分层摘要群级状态表'; + """) except Exception as e: self.LOG.error(f"创建成员分层摘要表失败: {e}") @@ -132,6 +147,62 @@ class MemberDigestDBOperator(BaseDBOperator): self.LOG.error(f"批量获取成员分层摘要失败: {e}") return [] + def get_group_state(self, chatroom_id: str) -> Optional[Dict]: + try: + sql = """ + SELECT * + FROM t_member_digest_group_state + WHERE chatroom_id = %s + LIMIT 1 + """ + row = self.execute_query(sql, (chatroom_id,), fetch_one=True) + if not row: + return None + extra_json = row.get("extra_json") + if extra_json: + try: + row["extra"] = json.loads(extra_json) + except Exception: + row["extra"] = {} + else: + row["extra"] = {} + for key in ("last_bootstrap_at", "create_time", "update_time"): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") + return row + except Exception as e: + self.LOG.error(f"获取成员分层摘要群状态失败: {e}") + return None + + def save_group_state(self, chatroom_id: str, bootstrap_status: str, + bootstrap_days: int = 0, built_daily_count: int = 0, + touched_member_count: int = 0, extra: Optional[Dict] = None) -> bool: + try: + data = { + "chatroom_id": chatroom_id, + "bootstrap_status": bootstrap_status, + "last_bootstrap_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "bootstrap_days": int(bootstrap_days or 0), + "built_daily_count": int(built_daily_count or 0), + "touched_member_count": int(touched_member_count or 0), + "extra_json": json.dumps(extra or {}, ensure_ascii=False), + } + fields = ", ".join(data.keys()) + placeholders = ", ".join(["%s"] * len(data)) + update_clause = ", ".join( + [f"{key}=VALUES({key})" for key in data.keys() if key != "chatroom_id"] + ) + sql = f""" + INSERT INTO t_member_digest_group_state ({fields}) + VALUES ({placeholders}) + ON DUPLICATE KEY UPDATE {update_clause} + """ + return self.execute_update(sql, tuple(data.values())) + except Exception as e: + self.LOG.error(f"保存成员分层摘要群状态失败: {e}") + return False + @staticmethod def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]: if not row: diff --git a/plugins/member_context/digest_service.py b/plugins/member_context/digest_service.py index 87b8dfc..8cfa4f7 100644 --- a/plugins/member_context/digest_service.py +++ b/plugins/member_context/digest_service.py @@ -48,23 +48,64 @@ class MemberDigestService: def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None, force: bool = False) -> Dict: - days = days or self.group_digest_days + bootstrap_mode = False + if days is None: + bootstrap_mode = self._should_bootstrap_group(chatroom_id) + days = self.bootstrap_days if bootstrap_mode else self.group_digest_days + built_daily = 0 touched_members = set() + processed_dates = [] for offset in range(days): target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d") digests = self._build_group_daily_digests(chatroom_id, target_date, force=force) + processed_dates.append(target_date) for digest in digests: self.digest_db.save_digest(digest) built_daily += 1 touched_members.add(digest.get("wxid")) + mode = "bootstrap" if bootstrap_mode else "incremental" + if bootstrap_mode: + bootstrap_status = "done" if built_daily > 0 or touched_members else "empty" + self.digest_db.save_group_state( + chatroom_id, + bootstrap_status=bootstrap_status, + bootstrap_days=days, + built_daily_count=built_daily, + touched_member_count=len(touched_members), + extra={"processed_dates": processed_dates}, + ) + self.LOG.info( + f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, " + f"built_daily={built_daily}, touched_members={len(touched_members)}" + ) return { "built_daily": built_daily, "touched_members": list(touched_members), + "days": days, + "mode": mode, + "processed_dates": processed_dates, } + def _should_bootstrap_group(self, chatroom_id: str) -> bool: + try: + group_state = self.digest_db.get_group_state(chatroom_id) + if group_state and group_state.get("bootstrap_status") in {"done", "empty"}: + return False + sql = """ + SELECT 1 + FROM t_member_digest + WHERE chatroom_id = %s AND digest_type = 'daily' + LIMIT 1 + """ + row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True) + return not bool(row) + except Exception as e: + self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}") + return False + def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict: member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} display_name = member.get("display_name") or member.get("nick_name") or wxid diff --git a/plugins/member_context/service.py b/plugins/member_context/service.py index a4b7775..087b712 100644 --- a/plugins/member_context/service.py +++ b/plugins/member_context/service.py @@ -236,9 +236,6 @@ class MemberContextService: return {"refreshed": 0, "skipped": 0, "disabled": True} active_members = self._get_recent_active_members(chatroom_id) - if not active_members: - self.LOG.info(f"群 {chatroom_id} 最近没有满足条件的活跃成员,跳过刷新") - return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0} members = self.contacts_db.get_chatroom_member_list(chatroom_id) or [] enabled_members = { @@ -257,10 +254,38 @@ class MemberContextService: group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(chatroom_id) self.LOG.info( f"[成员交互摘要] 群日摘要批处理完成: group={chatroom_id}, " + f"mode={group_digest_stats.get('mode', '')}, " + f"days={group_digest_stats.get('days', 0)}, " f"built_daily={group_digest_stats.get('built_daily', 0)}, " f"touched_members={len(group_digest_stats.get('touched_members', []))}" ) + candidate_map = { + (member.get("wxid") or ""): dict(member) + for member in active_members + if member.get("wxid") + } + if group_digest_stats.get("mode") == "bootstrap": + for wxid in group_digest_stats.get("touched_members", []): + if not wxid or wxid in candidate_map: + continue + candidate_map[wxid] = { + "wxid": wxid, + "msg_count": 0, + "latest_message_time": "", + } + self.LOG.info( + f"[成员交互摘要] 初始化模式补充成员: group={chatroom_id}, " + f"bootstrap_members={len(group_digest_stats.get('touched_members', []))}, " + f"candidate_total={len(candidate_map)}" + ) + + active_members = list(candidate_map.values()) + total = len(active_members) + if not active_members: + self.LOG.info(f"群 {chatroom_id} 初始化/增量后仍无可刷新成员,跳过刷新") + return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0} + for index, active_member in enumerate(active_members, start=1): wxid = active_member.get("wxid") if wxid not in enabled_members: