diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index e49ae17..7d8f999 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -53,6 +53,17 @@ def send_message_in_thread(func, *args, **kwargs): message_thread_pool.submit(run) +def run_member_context_refresh_in_thread(func, *args, **kwargs): + """在线程池中异步刷新成员交互摘要,避免阻塞请求线程""" + def run(): + try: + func(*args, **kwargs) + except Exception as e: + logger.error(f"成员交互摘要后台刷新失败: {e}") + + message_thread_pool.submit(run) + + # 联系人管理页面 @contacts_bp.route('/') @login_required @@ -272,17 +283,17 @@ def api_refresh_group_member_context(): if roomid and wxid: if not server.member_context_service.is_group_enabled(roomid): return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403 - context = server.member_context_service.refresh_member_context(roomid, wxid) - return jsonify({"success": True, "data": {"context": context}}) + run_member_context_refresh_in_thread(server.member_context_service.refresh_member_context, roomid, wxid) + return jsonify({"success": True, "message": "成员交互摘要刷新任务已提交"}) if roomid: if not server.member_context_service.is_group_enabled(roomid): return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403 - result = server.member_context_service.refresh_group_contexts(roomid) - return jsonify({"success": True, "data": result}) + run_member_context_refresh_in_thread(server.member_context_service.refresh_group_contexts, roomid) + return jsonify({"success": True, "message": "本群成员交互摘要刷新任务已提交"}) - result = server.member_context_service.refresh_all_chatrooms() - return jsonify({"success": True, "data": result}) + run_member_context_refresh_in_thread(server.member_context_service.refresh_all_chatrooms) + return jsonify({"success": True, "message": "全量成员交互摘要刷新任务已提交"}) except Exception as e: logger.error(f"刷新群成员交互摘要失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 diff --git a/admin/dashboard/templates/contacts_management.html b/admin/dashboard/templates/contacts_management.html index 8f57cc9..548d97a 100644 --- a/admin/dashboard/templates/contacts_management.html +++ b/admin/dashboard/templates/contacts_management.html @@ -462,8 +462,8 @@ axios.post('/contacts/api/group_member_context/refresh', { roomid: this.currentGroup.wxid }) .then(response => { if (response.data.success) { - this.$message.success('本群成员交互摘要已刷新'); - this.loadGroupMembers(this.currentGroup.wxid); + this.$message.success(response.data.message || '本群成员交互摘要刷新任务已提交'); + setTimeout(() => this.loadGroupMembers(this.currentGroup.wxid), 2500); } else { this.$message.error('刷新本群成员交互摘要失败'); } @@ -509,8 +509,8 @@ wxid: this.currentContextMember.wxid }).then(response => { if (response.data.success) { - this.memberContext = response.data.data.context; - this.$message.success('成员交互摘要已刷新'); + this.$message.success(response.data.message || '成员交互摘要刷新任务已提交'); + setTimeout(() => this.loadMemberContext(), 2500); } else { this.$message.error('刷新成员交互摘要失败'); } diff --git a/plugins/member_context/service.py b/plugins/member_context/service.py index 707776b..5feadd4 100644 --- a/plugins/member_context/service.py +++ b/plugins/member_context/service.py @@ -47,6 +47,11 @@ class MemberContextService: self.sample_days = int(profile_config.get("sample_days", 30)) self.ai_sample_limit = int(profile_config.get("sample_message_limit", 80)) self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200)) + self.ai_min_member_messages = int(profile_config.get("ai_min_member_messages", 12)) + self.active_member_hours = int(profile_config.get("active_member_hours", 72)) + self.min_member_messages = int(profile_config.get("min_member_messages", 3)) + self.max_members_per_group_per_run = int(profile_config.get("max_members_per_group_per_run", 30)) + self.stale_hours = int(profile_config.get("stale_hours", 24)) schedule_config = self.plugin_config.get("schedule", {}) self.only_recent_active_groups = bool(schedule_config.get("only_recent_active_groups", False)) self.active_hours = int(schedule_config.get("active_hours", 72)) @@ -106,8 +111,14 @@ class MemberContextService: limit: Optional[int] = None) -> Dict: if not self.is_group_enabled(chatroom_id): raise ValueError(f"群 {chatroom_id} 未启用成员交互摘要功能") + self.LOG.info(f"[成员交互摘要] 开始刷新单个成员: group={chatroom_id}, wxid={wxid}") context = self.build_member_context(chatroom_id, wxid, days=days, limit=limit) self.member_context_db.save_member_context(context) + self.LOG.info( + f"[成员交互摘要] 单个成员刷新完成: group={chatroom_id}, wxid={wxid}, " + f"display_name={context.get('display_name', wxid)}, messages={context.get('source_message_count', 0)}, " + f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" + ) return context def refresh_group_contexts(self, chatroom_id: str, days: Optional[int] = None, @@ -119,24 +130,65 @@ class MemberContextService: self.LOG.info(f"群 {chatroom_id} 未启用成员交互摘要功能,跳过刷新") return {"refreshed": 0, "skipped": 0, "disabled": True} + active_members = self._get_recent_active_members(chatroom_id) + if not active_members: + self.LOG.info(f"群 {chatroom_id} 最近没有满足条件的活跃成员,跳过刷新") + return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0} + members = self.contacts_db.get_chatroom_member_list(chatroom_id) or [] + enabled_members = { + member.get("wxid"): member for member in members + if member.get("status", 1) == 1 and member.get("wxid") + } refreshed = 0 skipped = 0 + total = len(active_members) - for member in members: - if member.get("status", 1) != 1: + self.LOG.info( + f"[成员交互摘要] 开始刷新群: group={chatroom_id}, active_candidates={total}, " + f"days={days}, limit_per_member={limit_per_member}" + ) + + for index, active_member in enumerate(active_members, start=1): + wxid = active_member.get("wxid") + if wxid not in enabled_members: + self.LOG.debug( + f"[成员交互摘要] 跳过成员(不在当前在群名单): group={chatroom_id}, " + f"index={index}/{total}, wxid={wxid}" + ) continue - wxid = member.get("wxid") - if not wxid: + existing_context = self.member_context_db.get_member_context(chatroom_id, wxid) + if not self._should_refresh_context(existing_context, active_member): + skipped += 1 + self.LOG.debug( + f"[成员交互摘要] 跳过成员(画像仍新鲜): group={chatroom_id}, " + f"index={index}/{total}, wxid={wxid}, latest_message_time={active_member.get('latest_message_time')}, " + f"last_profiled_at={(existing_context or {}).get('last_profiled_at')}" + ) continue context = self.build_member_context(chatroom_id, wxid, days=days, limit=limit_per_member) if context["source_message_count"] <= 0: skipped += 1 + self.LOG.debug( + f"[成员交互摘要] 跳过成员(样本不足): group={chatroom_id}, " + f"index={index}/{total}, wxid={wxid}" + ) continue self.member_context_db.save_member_context(context) refreshed += 1 + self.LOG.info( + f"[成员交互摘要] 刷新成员进度: group={chatroom_id}, index={index}/{total}, " + f"wxid={wxid}, display_name={context.get('display_name', wxid)}, " + f"messages={context.get('source_message_count', 0)}, " + f"activity={context.get('activity_level', '')}, " + f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" + ) - return {"refreshed": refreshed, "skipped": skipped} + self.LOG.info( + f"[成员交互摘要] 群刷新完成: group={chatroom_id}, refreshed={refreshed}, " + f"skipped={skipped}, active_candidates={total}" + ) + return {"refreshed": refreshed, "skipped": skipped, "active_candidates": len(active_members)} def refresh_all_chatrooms(self, days: Optional[int] = None, limit_per_member: Optional[int] = None) -> Dict: days = days or self.sample_days @@ -149,6 +201,19 @@ class MemberContextService: skipped = 0 disabled = 0 inactive = 0 + processed_groups = 0 + + candidate_groups = [ + group.get("chatroom_id") for group in groups + if group.get("chatroom_id") and (active_group_ids is None or group.get("chatroom_id") in active_group_ids) + ] + total_groups = len(candidate_groups) + + self.LOG.info( + f"[成员交互摘要] 开始批量刷新: candidate_groups={total_groups}, " + f"only_recent_active_groups={self.only_recent_active_groups}, active_hours={self.active_hours}, " + f"min_group_messages={self.min_group_messages}" + ) for group in groups: chatroom_id = group.get("chatroom_id") @@ -157,6 +222,10 @@ class MemberContextService: if active_group_ids is not None and chatroom_id not in active_group_ids: inactive += 1 continue + processed_groups += 1 + self.LOG.info( + f"[成员交互摘要] 批量刷新进度: group_index={processed_groups}/{total_groups}, group={chatroom_id}" + ) result = self.refresh_group_contexts(chatroom_id, days=days, limit_per_member=limit_per_member) if result.get("disabled"): disabled += 1 @@ -164,6 +233,10 @@ class MemberContextService: group_count += 1 member_count += result["refreshed"] skipped += result["skipped"] + self.LOG.info( + f"[成员交互摘要] 批量群结果: group={chatroom_id}, refreshed={result.get('refreshed', 0)}, " + f"skipped={result.get('skipped', 0)}, active_candidates={result.get('active_candidates', 0)}" + ) self.LOG.info(f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, 未启用群={disabled}, 非活跃群={inactive}") return {"groups": group_count, "members": member_count, "skipped": skipped, "disabled_groups": disabled, "inactive_groups": inactive} @@ -297,6 +370,63 @@ class MemberContextService: "latest_message_time": latest_time, } + def _get_recent_active_members(self, chatroom_id: str) -> List[Dict]: + sql = """ + SELECT + sender AS wxid, + COUNT(*) AS msg_count, + MAX(timestamp) AS latest_message_time + FROM messages + WHERE group_id = %s + AND sender IS NOT NULL + AND sender <> '' + AND timestamp >= DATE_SUB(NOW(), INTERVAL %s HOUR) + AND message_type IN (1, 49) + GROUP BY sender + HAVING COUNT(*) >= %s + ORDER BY latest_message_time DESC, msg_count DESC + LIMIT %s + """ + rows = self.message_db.execute_query( + sql, + (chatroom_id, self.active_member_hours, self.min_member_messages, self.max_members_per_group_per_run) + ) or [] + for row in rows: + latest_time = row.get("latest_message_time") + if isinstance(latest_time, datetime): + row["latest_message_time"] = latest_time.strftime("%Y-%m-%d %H:%M:%S") + return rows + + def _should_refresh_context(self, existing_context: Optional[Dict], active_member: Dict) -> bool: + if not existing_context: + return True + + latest_message_time = active_member.get("latest_message_time") + context_time = existing_context.get("last_profiled_at") + latest_dt = self._parse_datetime(latest_message_time) + context_dt = self._parse_datetime(context_time) + if not latest_dt or not context_dt: + return True + + if latest_dt > context_dt and (latest_dt - context_dt).total_seconds() >= self.stale_hours * 3600: + return True + + if (datetime.now() - context_dt).total_seconds() >= self.stale_hours * 3600 * 2: + return True + + return False + + @staticmethod + def _parse_datetime(value) -> Optional[datetime]: + if isinstance(value, datetime): + return value + if not value: + return None + try: + return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S") + except Exception: + return None + def _get_recent_active_chatrooms(self) -> set: sql = """ SELECT group_id, COUNT(*) AS msg_count @@ -313,7 +443,7 @@ class MemberContextService: base_context: Dict, messages: List[Dict]) -> Optional[Dict]: if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: return None - if len(messages) < 8: + if len(messages) < self.ai_min_member_messages: return None prompt = self._build_ai_prompt(chatroom_id, wxid, display_name, base_context, messages[-self.ai_sample_limit:])