# -*- coding: utf-8 -*- import json import re from collections import Counter from datetime import datetime from typing import Dict, List, Optional from loguru import logger from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator from db.member_context_db import MemberContextDBOperator from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB from plugins.member_context.digest_service import MemberDigestService from plugins.member_context.dify_client import DifyClient from plugins.member_context.prompt_builder import MemberContextPromptBuilder from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus class MemberContextService: """成员交互摘要插件内部服务""" FEATURE_KEY = "MEMBER_CONTEXT_CAPABILITY" STOPWORDS = { "这个", "那个", "就是", "然后", "怎么", "什么", "你们", "我们", "他们", "是不是", "可以", "一下", "一个", "已经", "还有", "没有", "因为", "所以", "如果", "但是", "还是", "今天", "昨天", "现在", "时候", "感觉", "真的", "应该", "知道", "觉得", "问题", "老师", "老板", "群里", "大家", "一下子", "自己", "东西", "这里", "那里", "进行", "需要", "关于" } def __init__(self, db_manager: DBConnectionManager, plugin_config: Optional[Dict] = None): self.db_manager = db_manager self.contacts_db = ContactsDBOperator(self.db_manager) self.message_db = MessageStorageDB(self.db_manager) self.member_context_db = MemberContextDBOperator(self.db_manager) self.member_digest_db = MemberDigestDBOperator(self.db_manager) self.digest_service = MemberDigestService( self.contacts_db, self.message_db, self.member_digest_db, plugin_config or {} ) self.LOG = logger self.plugin_config = plugin_config or {} api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) schedule_config = self.plugin_config.get("schedule", {}) self.dify_client = DifyClient(api_config) self.ai_enabled = self.dify_client.enabled self.ai_base_url = self.dify_client.base_url self.ai_api_key = self.dify_client.api_key self.ai_endpoint = self.dify_client.endpoint self.ai_timeout = self.dify_client.timeout self.sample_days = int(profile_config.get("sample_days", 30)) self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200)) self.active_member_hours = int(profile_config.get("active_member_hours", 72)) self.min_member_messages = int(profile_config.get("min_member_messages", 3)) self.max_members_per_group_per_run = int(profile_config.get("max_members_per_group_per_run", 30)) self.stale_hours = int(profile_config.get("stale_hours", 24)) self.stable_decay = float(profile_config.get("stable_decay", 0.96)) self.stable_max_items = int(profile_config.get("stable_max_items", 6)) self.stable_min_score = float(profile_config.get("stable_min_score", 0.9)) self.stable_ready_days = int(profile_config.get("stable_ready_days", 180)) self.only_recent_active_groups = bool(schedule_config.get("only_recent_active_groups", False)) self.active_hours = int(schedule_config.get("active_hours", 72)) self.min_group_messages = int(schedule_config.get("min_group_messages", 20)) def build_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None, limit: Optional[int] = None, force_digest_rebuild: bool = False, ensure_group_daily: bool = True) -> Dict: days = days or self.sample_days limit = limit or self.refresh_limit_per_member existing_context = self.member_context_db.get_member_context(chatroom_id, wxid) member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} display_name = member.get("display_name") or member.get("nick_name") or wxid group_digest_stats = {"built_daily": 0, "touched_members": []} if ensure_group_daily: group_digest_stats = self.digest_service.ensure_recent_group_daily_digests( chatroom_id, force=force_digest_rebuild ) digest_snapshot = self.digest_service.ensure_member_digest_pipeline( chatroom_id, wxid, force=force_digest_rebuild ) daily_digests = digest_snapshot.get("daily_digests", []) weekly_digests = digest_snapshot.get("weekly_digests", []) monthly_digests = digest_snapshot.get("monthly_digests", []) recent_messages = self.message_db.get_member_recent_messages( chatroom_id, wxid, days=min(days, 7), limit=120, include_today=False, ) monthly_structured = [item.get("structured", {}) or {} for item in monthly_digests] weekly_structured = [item.get("structured", {}) or {} for item in weekly_digests] daily_structured = [item.get("structured", {}) or {} for item in daily_digests] observation_days = self._calc_observation_days(daily_digests) activity_level = self._calc_activity_level(len(recent_messages), max(min(days, 7), 1)) context = { "chatroom_id": chatroom_id, "wxid": wxid, "display_name": display_name, "activity_level": activity_level, "message_pattern": self._best_text( daily_structured, ["message_pattern"], default=self._build_message_pattern(recent_messages) ), "interaction_style": self._best_text( daily_structured, ["interaction_style"], default=self._build_interaction_style(recent_messages) ), "response_style_hint": self._build_response_style_hint_from_digests( daily_structured, weekly_structured, monthly_structured ), "topics_of_interest": self._extract_scored_items( monthly_structured + weekly_structured, ["long_term_topics", "stable_topics", "topics"], limit=5 ), "recent_focus": self._extract_scored_items(daily_structured, ["topics"], limit=4), "summary_text": "", "confidence": self._calc_digest_confidence(monthly_digests, weekly_digests, daily_digests), "source_message_count": len(recent_messages), "source_days": days, "last_profiled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "meta": { "identity_traits": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["identity_traits", "identity_clues"], limit=5 ), "skill_profile": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["skill_profile", "skill_signals"], limit=6 ), "family_profile": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["family_profile", "family_signals"], limit=4 ), "life_stage_profile": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["life_stage_profile", "life_stage_signals"], limit=4 ), "value_profile": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["value_profile", "value_preferences"], limit=5 ), "stable_traits": self._extract_scored_items( monthly_structured + weekly_structured, ["stable_traits", "engagement_traits"], limit=self.stable_max_items ), "habit_patterns": self._extract_scored_items( monthly_structured + weekly_structured + daily_structured, ["habit_patterns", "habit_signals"], limit=self.stable_max_items ), "long_term_reply_preferences": self._extract_scored_items( monthly_structured + weekly_structured, ["long_term_reply_preferences", "reply_preferences"], limit=4 ), "recent_state": self._extract_scored_items( weekly_structured + daily_structured, ["recent_state", "phase_state", "topics"], limit=4 ), "temperament_tendency": self._best_text( monthly_structured + weekly_structured + daily_structured, ["temperament_tendency", "temperament_signal"], default="" ), "group_role": self._best_text( monthly_structured + weekly_structured + daily_structured, ["group_role", "social_role"], default="" ), "decision_profile": self._best_text( monthly_structured + weekly_structured + daily_structured, ["decision_profile", "decision_style"], default="" ), "engagement_traits": self._extract_scored_items( daily_structured + weekly_structured, ["engagement_traits", "stable_traits"], limit=4 ), "reply_taboos": self._extract_scored_items(daily_structured, ["reply_taboos"], limit=3), "observation_days": observation_days, "stable_ready": observation_days >= self.stable_ready_days, "profile_iterations": int(((existing_context or {}).get("meta", {}) or {}).get("profile_iterations", 0)) + 1, "history_message_count": self._sum_digest_source_count(daily_digests), "digest_daily_count": len(daily_digests), "digest_weekly_count": len(weekly_digests), "digest_monthly_count": len(monthly_digests), "last_daily_digest_at": daily_digests[0].get("last_generated_at") if daily_digests else "", "last_weekly_digest_at": weekly_digests[0].get("last_generated_at") if weekly_digests else "", "last_monthly_digest_at": monthly_digests[0].get("last_generated_at") if monthly_digests else "", "refresh_mode": self._build_refresh_mode(existing_context, digest_snapshot, group_digest_stats), }, } ai_context = self._generate_ai_context_from_digests( chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests ) if ai_context: context.update({ "activity_level": ai_context.get("activity_level") or context["activity_level"], "message_pattern": ai_context.get("message_pattern") or context["message_pattern"], "interaction_style": ai_context.get("interaction_style") or context["interaction_style"], "response_style_hint": ai_context.get("response_style_hint") or context["response_style_hint"], "topics_of_interest": ai_context.get("topics_of_interest") or context["topics_of_interest"], "recent_focus": ai_context.get("recent_focus") or context["recent_focus"], "summary_text": ai_context.get("summary_text") or context["summary_text"], "confidence": ai_context.get("confidence", context["confidence"]), }) context["meta"].update(ai_context.get("meta", {})) context = self._merge_with_existing_context(existing_context, context) context["summary_text"] = context.get("summary_text") or self._build_summary_text_from_context(context) return context def refresh_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None, limit: Optional[int] = None) -> Dict: if not self.is_group_enabled(chatroom_id): raise ValueError(f"群 {chatroom_id} 未启用成员交互摘要功能") self.LOG.info(f"[成员交互摘要] 开始刷新单个成员: group={chatroom_id}, wxid={wxid}") context = self.build_member_context(chatroom_id, wxid, days=days, limit=limit) self.member_context_db.save_member_context(context) self.LOG.info( f"[成员交互摘要] 单个成员刷新完成: group={chatroom_id}, wxid={wxid}, " f"display_name={context.get('display_name', wxid)}, messages={context.get('source_message_count', 0)}, " f"mode={context.get('meta', {}).get('refresh_mode', '')}, " f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/" f"{context.get('meta', {}).get('digest_weekly_count', 0)}/" f"{context.get('meta', {}).get('digest_monthly_count', 0)}, " f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" ) return context def refresh_group_contexts(self, chatroom_id: str, days: Optional[int] = None, limit_per_member: Optional[int] = None) -> Dict: days = days or self.sample_days limit_per_member = limit_per_member or self.refresh_limit_per_member if not self.is_group_enabled(chatroom_id): self.LOG.info(f"群 {chatroom_id} 未启用成员交互摘要功能,跳过刷新") return {"refreshed": 0, "skipped": 0, "disabled": True} active_members = self._get_recent_active_members(chatroom_id) members = self.contacts_db.get_chatroom_member_list(chatroom_id) or [] enabled_members = { member.get("wxid"): member for member in members if member.get("status", 1) == 1 and member.get("wxid") } refreshed = 0 skipped = 0 total = len(active_members) self.LOG.info( f"[成员交互摘要] 开始刷新群: group={chatroom_id}, active_candidates={total}, " f"days={days}, limit_per_member={limit_per_member}" ) group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(chatroom_id) self.LOG.info( f"[成员交互摘要] 群日摘要批处理完成: group={chatroom_id}, " f"mode={group_digest_stats.get('mode', '')}, " f"days={group_digest_stats.get('days', 0)}, " f"built_daily={group_digest_stats.get('built_daily', 0)}, " f"touched_members={len(group_digest_stats.get('touched_members', []))}" ) candidate_map = { (member.get("wxid") or ""): dict(member) for member in active_members if member.get("wxid") } if group_digest_stats.get("mode") == "bootstrap": for wxid in group_digest_stats.get("touched_members", []): if not wxid or wxid in candidate_map: continue candidate_map[wxid] = { "wxid": wxid, "msg_count": 0, "latest_message_time": "", } self.LOG.info( f"[成员交互摘要] 初始化模式补充成员: group={chatroom_id}, " f"bootstrap_members={len(group_digest_stats.get('touched_members', []))}, " f"candidate_total={len(candidate_map)}" ) active_members = list(candidate_map.values()) total = len(active_members) if not active_members: self.LOG.info(f"群 {chatroom_id} 初始化/增量后仍无可刷新成员,跳过刷新") return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0} for index, active_member in enumerate(active_members, start=1): wxid = active_member.get("wxid") if wxid not in enabled_members: self.LOG.debug( f"[成员交互摘要] 跳过成员(不在当前在群名单): group={chatroom_id}, " f"index={index}/{total}, wxid={wxid}" ) continue existing_context = self.member_context_db.get_member_context(chatroom_id, wxid) if not self._should_refresh_context(existing_context, active_member): skipped += 1 self.LOG.debug( f"[成员交互摘要] 跳过成员(画像仍新鲜): group={chatroom_id}, " f"index={index}/{total}, wxid={wxid}, latest_message_time={active_member.get('latest_message_time')}, " f"last_profiled_at={(existing_context or {}).get('last_profiled_at')}" ) continue context = self.build_member_context( chatroom_id, wxid, days=days, limit=limit_per_member, ensure_group_daily=False ) if context["source_message_count"] <= 0 and context.get("meta", {}).get("digest_daily_count", 0) <= 0: skipped += 1 self.LOG.debug( f"[成员交互摘要] 跳过成员(样本不足): group={chatroom_id}, " f"index={index}/{total}, wxid={wxid}" ) continue self.member_context_db.save_member_context(context) refreshed += 1 self.LOG.info( f"[成员交互摘要] 刷新成员进度: group={chatroom_id}, index={index}/{total}, " f"wxid={wxid}, display_name={context.get('display_name', wxid)}, " f"messages={context.get('source_message_count', 0)}, " f"activity={context.get('activity_level', '')}, " f"mode={context.get('meta', {}).get('refresh_mode', '')}, " f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/" f"{context.get('meta', {}).get('digest_weekly_count', 0)}/" f"{context.get('meta', {}).get('digest_monthly_count', 0)}, " f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" ) self.LOG.info( f"[成员交互摘要] 群刷新完成: group={chatroom_id}, refreshed={refreshed}, " f"skipped={skipped}, active_candidates={total}" ) return {"refreshed": refreshed, "skipped": skipped, "active_candidates": len(active_members)} def refresh_all_chatrooms(self, days: Optional[int] = None, limit_per_member: Optional[int] = None) -> Dict: days = days or self.sample_days limit_per_member = limit_per_member or self.refresh_limit_per_member groups = self.contacts_db.get_chatroom_list() or [] active_group_ids = self._get_recent_active_chatrooms() if self.only_recent_active_groups else None group_count = 0 member_count = 0 skipped = 0 disabled = 0 inactive = 0 processed_groups = 0 candidate_groups = [ group.get("chatroom_id") for group in groups if group.get("chatroom_id") and (active_group_ids is None or group.get("chatroom_id") in active_group_ids) ] total_groups = len(candidate_groups) self.LOG.info( f"[成员交互摘要] 开始批量刷新: candidate_groups={total_groups}, " f"only_recent_active_groups={self.only_recent_active_groups}, active_hours={self.active_hours}, " f"min_group_messages={self.min_group_messages}" ) for group in groups: chatroom_id = group.get("chatroom_id") if not chatroom_id: continue if active_group_ids is not None and chatroom_id not in active_group_ids: inactive += 1 continue processed_groups += 1 self.LOG.info( f"[成员交互摘要] 批量刷新进度: group_index={processed_groups}/{total_groups}, group={chatroom_id}" ) result = self.refresh_group_contexts(chatroom_id, days=days, limit_per_member=limit_per_member) if result.get("disabled"): disabled += 1 continue group_count += 1 member_count += result["refreshed"] skipped += result["skipped"] self.LOG.info( f"[成员交互摘要] 批量群结果: group={chatroom_id}, refreshed={result.get('refreshed', 0)}, " f"skipped={result.get('skipped', 0)}, active_candidates={result.get('active_candidates', 0)}" ) self.LOG.info( f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, " f"未启用群={disabled}, 非活跃群={inactive}" ) return {"groups": group_count, "members": member_count, "skipped": skipped, "disabled_groups": disabled, "inactive_groups": inactive} def is_group_enabled(self, chatroom_id: str) -> bool: feature = Feature.get_feature(self.FEATURE_KEY) if feature is None: return True return GroupBotManager.get_group_permission(chatroom_id, feature) == PermissionStatus.ENABLED def _get_recent_active_members(self, chatroom_id: str) -> List[Dict]: sql = """ SELECT sender AS wxid, COUNT(*) AS msg_count, MAX(timestamp) AS latest_message_time FROM messages WHERE group_id = %s AND sender IS NOT NULL AND sender <> '' AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR) AND timestamp < CURDATE() AND message_type IN (1, 49) GROUP BY sender HAVING COUNT(*) >= %s ORDER BY latest_message_time DESC, msg_count DESC LIMIT %s """ rows = self.message_db.execute_query( sql, (chatroom_id, self.active_member_hours, self.min_member_messages, self.max_members_per_group_per_run) ) or [] for row in rows: latest_time = row.get("latest_message_time") if isinstance(latest_time, datetime): row["latest_message_time"] = latest_time.strftime("%Y-%m-%d %H:%M:%S") return rows def _should_refresh_context(self, existing_context: Optional[Dict], active_member: Dict) -> bool: if not existing_context: return True latest_message_time = active_member.get("latest_message_time") context_time = existing_context.get("last_profiled_at") latest_dt = self._parse_datetime(latest_message_time) context_dt = self._parse_datetime(context_time) if not latest_dt or not context_dt: return True if latest_dt > context_dt and (latest_dt - context_dt).total_seconds() >= self.stale_hours * 3600: return True if (datetime.now() - context_dt).total_seconds() >= self.stale_hours * 3600 * 2: return True return False @staticmethod def _parse_datetime(value) -> Optional[datetime]: if isinstance(value, datetime): return value if not value: return None try: return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S") except Exception: try: return datetime.strptime(str(value)[:10], "%Y-%m-%d") except Exception: return None def _get_recent_active_chatrooms(self) -> set: sql = """ SELECT group_id, COUNT(*) AS msg_count FROM messages WHERE group_id LIKE %s AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR) AND timestamp < CURDATE() GROUP BY group_id HAVING COUNT(*) >= %s """ rows = self.message_db.execute_query(sql, ("%@chatroom", self.active_hours, self.min_group_messages)) or [] return {row.get("group_id") for row in rows if row.get("group_id")} def _generate_ai_context_from_digests(self, chatroom_id: str, wxid: str, display_name: str, monthly_digests: List[Dict], weekly_digests: List[Dict], daily_digests: List[Dict]) -> Optional[Dict]: if not self.dify_client.is_available(): return None if len(daily_digests) < 2 and len(weekly_digests) < 1 and len(monthly_digests) < 1: return None prompt = MemberContextPromptBuilder.build_final_context_prompt( chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests ) response = self.dify_client.run( prompt=prompt, user=f"member-context-final:{chatroom_id}:{wxid}", inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid}, tag=f"final:{wxid}", ) if not response: return None parsed = self._parse_ai_answer(response.get("text", "")) if not parsed: self.LOG.warning( f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, " f"answer_preview={(response.get('text', '') or '')[:200]}" ) return None usage = response.get("usage", {}) or {} parsed_meta = parsed.get("meta", {}) or {} parsed_meta.update({ "ai_provider": "dify", "ai_mode": self.dify_client.mode, "ai_tokens": usage.get("total_tokens"), "ai_latency": usage.get("latency"), }) parsed["meta"] = parsed_meta return parsed def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: return None text = answer.strip() match = re.search(r"\{.*\}", text, re.S) if match: text = match.group(0) try: data = json.loads(text) except Exception: return None def norm_list(value, limit): if not isinstance(value, list): return [] return [str(item).strip() for item in value[:limit] if str(item).strip()] try: confidence = float(data.get("confidence", 0)) except Exception: confidence = 0.0 return { "activity_level": str(data.get("activity_level", "")).strip(), "message_pattern": str(data.get("message_pattern", "")).strip(), "interaction_style": str(data.get("interaction_style", "")).strip(), "response_style_hint": str(data.get("response_style_hint", "")).strip(), "topics_of_interest": norm_list(data.get("topics_of_interest"), 5), "recent_focus": norm_list(data.get("recent_focus"), 4), "summary_text": str(data.get("summary_text", "")).strip(), "confidence": max(0.0, min(1.0, confidence)), "meta": { "identity_traits": norm_list(data.get("identity_traits"), 5), "skill_profile": norm_list(data.get("skill_profile"), 6), "family_profile": norm_list(data.get("family_profile"), 4), "life_stage_profile": norm_list(data.get("life_stage_profile"), 4), "value_profile": norm_list(data.get("value_profile"), 5), "stable_traits": norm_list(data.get("stable_traits"), self.stable_max_items), "habit_patterns": norm_list(data.get("habit_patterns"), self.stable_max_items), "long_term_reply_preferences": norm_list(data.get("long_term_reply_preferences"), 4), "group_role": str(data.get("group_role", "")).strip(), "decision_profile": str(data.get("decision_profile", "")).strip(), "recent_state": norm_list(data.get("recent_state"), 4), "temperament_tendency": str(data.get("temperament_tendency", "")).strip(), "engagement_traits": norm_list(data.get("engagement_traits"), 4), "reply_taboos": norm_list(data.get("reply_taboos"), 3), } } def _merge_with_existing_context(self, existing_context: Optional[Dict], current_context: Dict) -> Dict: existing_context = existing_context or {} existing_meta = existing_context.get("meta", {}) or {} meta = current_context.get("meta", {}) or {} observation_days = max( int(meta.get("observation_days", 0)), int(existing_meta.get("observation_days", 0)), ) meta["observation_days"] = observation_days meta["stable_ready"] = observation_days >= self.stable_ready_days merged_topic_scores = self._merge_scored_items( existing_meta.get("topic_scores", {}), current_context.get("topics_of_interest", []), current_context.get("confidence", 0), ) merged_trait_scores = self._merge_scored_items( existing_meta.get("stable_trait_scores", {}), meta.get("stable_traits", []), current_context.get("confidence", 0), ) merged_habit_scores = self._merge_scored_items( existing_meta.get("habit_pattern_scores", {}), meta.get("habit_patterns", []), current_context.get("confidence", 0), ) merged_identity_scores = self._merge_scored_items( existing_meta.get("identity_trait_scores", {}), meta.get("identity_traits", []), current_context.get("confidence", 0) * 0.75, ) merged_skill_scores = self._merge_scored_items( existing_meta.get("skill_profile_scores", {}), meta.get("skill_profile", []), current_context.get("confidence", 0) * 0.85, ) merged_family_scores = self._merge_scored_items( existing_meta.get("family_profile_scores", {}), meta.get("family_profile", []), current_context.get("confidence", 0) * 0.55, ) merged_life_stage_scores = self._merge_scored_items( existing_meta.get("life_stage_profile_scores", {}), meta.get("life_stage_profile", []), current_context.get("confidence", 0) * 0.65, ) merged_value_scores = self._merge_scored_items( existing_meta.get("value_profile_scores", {}), meta.get("value_profile", []), current_context.get("confidence", 0) * 0.75, ) merged_reply_pref_scores = self._merge_scored_items( existing_meta.get("long_term_reply_preference_scores", {}), meta.get("long_term_reply_preferences", []), current_context.get("confidence", 0), ) merged_temperament_scores = self._merge_scored_items( existing_meta.get("temperament_tendency_scores", {}), [meta.get("temperament_tendency")] if meta.get("temperament_tendency") else [], current_context.get("confidence", 0) * 0.9, ) meta["topic_scores"] = merged_topic_scores meta["stable_trait_scores"] = merged_trait_scores meta["habit_pattern_scores"] = merged_habit_scores meta["identity_trait_scores"] = merged_identity_scores meta["skill_profile_scores"] = merged_skill_scores meta["family_profile_scores"] = merged_family_scores meta["life_stage_profile_scores"] = merged_life_stage_scores meta["value_profile_scores"] = merged_value_scores meta["long_term_reply_preference_scores"] = merged_reply_pref_scores meta["temperament_tendency_scores"] = merged_temperament_scores meta["identity_traits"] = self._top_scored_items(merged_identity_scores, limit=5) meta["skill_profile"] = self._top_scored_items(merged_skill_scores, limit=6) meta["family_profile"] = self._top_scored_items(merged_family_scores, limit=4) meta["life_stage_profile"] = self._top_scored_items(merged_life_stage_scores, limit=4) meta["value_profile"] = self._top_scored_items(merged_value_scores, limit=5) meta["stable_traits"] = self._top_scored_items(merged_trait_scores, limit=self.stable_max_items) meta["habit_patterns"] = self._top_scored_items(merged_habit_scores, limit=self.stable_max_items) meta["long_term_reply_preferences"] = self._top_scored_items(merged_reply_pref_scores, limit=4) temperament = self._top_scored_items(merged_temperament_scores, limit=1) meta["temperament_tendency"] = temperament[0] if temperament else meta.get("temperament_tendency", "") if not meta["identity_traits"]: meta["identity_traits"] = (existing_meta.get("identity_traits") or [])[:5] if not meta["skill_profile"]: meta["skill_profile"] = (existing_meta.get("skill_profile") or [])[:6] if not meta["family_profile"]: meta["family_profile"] = (existing_meta.get("family_profile") or [])[:4] if not meta["life_stage_profile"]: meta["life_stage_profile"] = (existing_meta.get("life_stage_profile") or [])[:4] if not meta["value_profile"]: meta["value_profile"] = (existing_meta.get("value_profile") or [])[:5] meta["group_role"] = meta.get("group_role") or existing_meta.get("group_role") or "" meta["decision_profile"] = meta.get("decision_profile") or existing_meta.get("decision_profile") or "" meta["engagement_traits"] = (meta.get("engagement_traits") or existing_meta.get("engagement_traits") or [])[:4] meta["reply_taboos"] = (meta.get("reply_taboos") or existing_meta.get("reply_taboos") or [])[:3] meta["recent_state"] = (meta.get("recent_state") or existing_meta.get("recent_state") or [])[:4] meta["profile_iterations"] = max( int(meta.get("profile_iterations", 0)), int(existing_meta.get("profile_iterations", 0)), ) meta["history_message_count"] = max( int(meta.get("history_message_count", 0)), int(existing_meta.get("history_message_count", 0)), ) current_context["topics_of_interest"] = self._top_scored_items(merged_topic_scores, limit=5) or current_context.get("topics_of_interest", []) current_context["recent_focus"] = (current_context.get("recent_focus") or existing_context.get("recent_focus") or [])[:4] current_context["response_style_hint"] = current_context.get("response_style_hint") or existing_context.get("response_style_hint") or "" current_context["meta"] = meta return current_context def _extract_scored_items(self, items: List[Dict], keys: List[str], limit: int) -> List[str]: scores = {} for index, item in enumerate(items): weight = max(0.5, 1.2 - index * 0.08) for key in keys: values = item.get(key, []) if not isinstance(values, list): continue for value in values: normalized = str(value).strip() if not normalized: continue scores[normalized] = scores.get(normalized, 0.0) + weight return [key for key, _ in sorted(scores.items(), key=lambda pair: pair[1], reverse=True)[:limit]] def _best_text(self, items: List[Dict], keys: List[str], default: str = "") -> str: counter = Counter() for item in items: for key in keys: value = str(item.get(key, "")).strip() if value: counter[value] += 1 if counter: return counter.most_common(1)[0][0] return default def _build_response_style_hint_from_digests(self, daily_structured: List[Dict], weekly_structured: List[Dict], monthly_structured: List[Dict]) -> str: hint = self._best_text(daily_structured, ["response_style_hint"]) if hint: return hint preferences = self._extract_scored_items( monthly_structured + weekly_structured, ["long_term_reply_preferences", "reply_preferences"], limit=3, ) if preferences: return "更适合:" + "、".join(preferences[:3]) return "保持自然口语化,结论和解释尽量平衡" def _calc_digest_confidence(self, monthly_digests: List[Dict], weekly_digests: List[Dict], daily_digests: List[Dict]) -> float: base = 0.25 base += min(0.35, len(monthly_digests) * 0.08) base += min(0.2, len(weekly_digests) * 0.04) base += min(0.15, len(daily_digests) * 0.02) return round(min(0.95, base), 2) def _calc_observation_days(self, daily_digests: List[Dict]) -> int: if not daily_digests: return 0 end_dt = self._parse_datetime(daily_digests[0].get("period_end")) start_dt = self._parse_datetime(daily_digests[-1].get("period_start")) if not start_dt or not end_dt: return 0 return max(0, (end_dt - start_dt).days) @staticmethod def _sum_digest_source_count(daily_digests: List[Dict]) -> int: return sum(int(item.get("source_count", 0)) for item in daily_digests) def _build_refresh_mode(self, existing_context: Optional[Dict], digest_snapshot: Dict, group_digest_stats: Optional[Dict] = None) -> str: if not existing_context: return "bootstrap" if (group_digest_stats or {}).get("built_daily", 0) > 0: return "daily_rollup" if (digest_snapshot.get("stats", {}) or {}).get("built_monthly", 0) > 0: return "recalibration" return "incremental" def _build_summary_text_from_context(self, context: Dict) -> str: meta = context.get("meta", {}) or {} parts = [] if meta.get("temperament_tendency"): label = "长期沟通倾向" if meta.get("stable_ready") else "阶段性沟通倾向" parts.append(f"{label}:{meta.get('temperament_tendency')}") if meta.get("stable_traits"): parts.append(f"长期特征:{'、'.join(meta.get('stable_traits')[:3])}") if meta.get("identity_traits"): parts.append(f"身份线索:{'、'.join(meta.get('identity_traits')[:2])}") if meta.get("skill_profile"): parts.append(f"技能画像:{'、'.join(meta.get('skill_profile')[:3])}") if meta.get("value_profile"): parts.append(f"判断偏好:{'、'.join(meta.get('value_profile')[:2])}") if meta.get("habit_patterns"): parts.append(f"习惯模式:{'、'.join(meta.get('habit_patterns')[:3])}") if meta.get("recent_state"): parts.append(f"近期状态:{'、'.join(meta.get('recent_state')[:3])}") if context.get("response_style_hint"): parts.append(f"回复建议:{context.get('response_style_hint')}") return ";".join(parts[:5]) def _merge_scored_items(self, existing_scores: Dict, current_items: List[str], confidence: float) -> Dict[str, float]: merged = {} for key, value in (existing_scores or {}).items(): try: score = float(value) * self.stable_decay except Exception: continue if score >= 0.2: merged[str(key).strip()] = round(score, 4) boost = max(0.6, min(1.8, 0.8 + confidence)) for item in current_items or []: normalized = str(item).strip() if not normalized: continue merged[normalized] = round(merged.get(normalized, 0.0) + boost, 4) return merged def _top_scored_items(self, scores: Dict, limit: int) -> List[str]: ordered = sorted( ((str(key).strip(), float(value)) for key, value in (scores or {}).items() if str(key).strip()), key=lambda item: item[1], reverse=True, ) return [key for key, value in ordered if value >= self.stable_min_score][:limit] def _calc_activity_level(self, message_count: int, days: int) -> str: daily_avg = message_count / max(days, 1) if message_count >= 80 or daily_avg >= 3: return "高活跃" if message_count >= 25 or daily_avg >= 1: return "中活跃" if message_count > 0: return "低活跃" return "观察中" def _build_message_pattern(self, messages: List[Dict]) -> str: if not messages: return "样本较少,暂不做明显模式判断" contents = [m.get("content", "") for m in messages if m.get("content")] if not contents: return "样本较少,暂不做明显模式判断" avg_len = sum(len(c) for c in contents) / len(contents) question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / len(contents) link_ratio = sum(1 for c in contents if "http://" in c or "https://" in c) / len(contents) traits = [] if avg_len <= 12: traits.append("短句居多") elif avg_len >= 35: traits.append("表达较完整") else: traits.append("表达中等长度") if question_ratio >= 0.35: traits.append("问题导向明显") elif question_ratio >= 0.15: traits.append("偶尔连续追问") if link_ratio >= 0.15: traits.append("常分享链接或资料") return ",".join(traits or ["发言较平稳"]) def _build_interaction_style(self, messages: List[Dict]) -> str: if not messages: return "互动样本较少" contents = [m.get("content", "") for m in messages if m.get("content")] question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / max(len(contents), 1) emoji_ratio = sum(1 for c in contents if re.search(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]", c)) / max(len(contents), 1) mention_ratio = sum(1 for c in contents if "@" in c) / max(len(contents), 1) parts = [] if question_ratio >= 0.3: parts.append("偏提问推进") if emoji_ratio >= 0.15: parts.append("表情互动感较强") if mention_ratio >= 0.1: parts.append("会主动点名互动") return ",".join(parts or ["自然跟随式互动"])