Files
abot/plugins/member_context/service.py

932 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import Counter
from datetime import datetime
from typing import Dict, List, Optional
from loguru import logger
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from db.member_context_db import MemberContextDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.digest_service import MemberDigestService
from plugins.member_context.dify_client import DifyClient
from plugins.member_context.prompt_builder import MemberContextPromptBuilder
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
class MemberContextService:
"""成员交互摘要插件内部服务"""
FEATURE_KEY = "MEMBER_CONTEXT_CAPABILITY"
STOPWORDS = {
"这个", "那个", "就是", "然后", "怎么", "什么", "你们", "我们", "他们", "是不是", "可以",
"一下", "一个", "已经", "还有", "没有", "因为", "所以", "如果", "但是", "还是", "今天",
"昨天", "现在", "时候", "感觉", "真的", "应该", "知道", "觉得", "问题", "老师", "老板",
"群里", "大家", "一下子", "自己", "东西", "这里", "那里", "进行", "需要", "关于"
}
def __init__(self, db_manager: DBConnectionManager, plugin_config: Optional[Dict] = None):
self.db_manager = db_manager
self.contacts_db = ContactsDBOperator(self.db_manager)
self.message_db = MessageStorageDB(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.member_digest_db = MemberDigestDBOperator(self.db_manager)
self.digest_service = MemberDigestService(
self.contacts_db, self.message_db, self.member_digest_db, plugin_config or {}
)
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
schedule_config = self.plugin_config.get("schedule", {})
self.dify_client = DifyClient(api_config)
self.ai_enabled = self.dify_client.enabled
self.ai_base_url = self.dify_client.base_url
self.ai_api_key = self.dify_client.api_key
self.ai_endpoint = self.dify_client.endpoint
self.ai_timeout = self.dify_client.timeout
self.sample_days = int(profile_config.get("sample_days", 30))
self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200))
self.active_member_hours = int(profile_config.get("active_member_hours", 72))
self.min_member_messages = int(profile_config.get("min_member_messages", 3))
self.max_members_per_group_per_run = int(profile_config.get("max_members_per_group_per_run", 30))
self.stale_hours = int(profile_config.get("stale_hours", 24))
self.stable_decay = float(profile_config.get("stable_decay", 0.96))
self.stable_max_items = int(profile_config.get("stable_max_items", 6))
self.stable_min_score = float(profile_config.get("stable_min_score", 0.9))
self.stable_ready_days = int(profile_config.get("stable_ready_days", 180))
self.only_recent_active_groups = bool(schedule_config.get("only_recent_active_groups", False))
self.active_hours = int(schedule_config.get("active_hours", 72))
self.min_group_messages = int(schedule_config.get("min_group_messages", 20))
def build_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None,
limit: Optional[int] = None, force_digest_rebuild: bool = False,
ensure_group_daily: bool = True, enable_weekly_digest: bool = True,
enable_monthly_digest: bool = True) -> Dict:
days = days or self.sample_days
limit = limit or self.refresh_limit_per_member
existing_context = self.member_context_db.get_member_context(chatroom_id, wxid)
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
group_digest_stats = {"built_daily": 0, "touched_members": []}
if ensure_group_daily:
group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(
chatroom_id, force=force_digest_rebuild
)
digest_snapshot = self.digest_service.ensure_member_digest_pipeline(
chatroom_id, wxid, force=force_digest_rebuild,
enable_weekly=enable_weekly_digest, enable_monthly=enable_monthly_digest
)
daily_digests = digest_snapshot.get("daily_digests", [])
weekly_digests = digest_snapshot.get("weekly_digests", [])
monthly_digests = digest_snapshot.get("monthly_digests", [])
all_daily_digests = digest_snapshot.get("all_daily_digests", daily_digests)
all_weekly_digests = digest_snapshot.get("all_weekly_digests", weekly_digests)
all_monthly_digests = digest_snapshot.get("all_monthly_digests", monthly_digests)
recent_messages = self.message_db.get_member_recent_messages(
chatroom_id,
wxid,
days=min(days, 7),
limit=120,
include_today=False,
)
monthly_structured = [item.get("structured", {}) or {} for item in monthly_digests]
weekly_structured = [item.get("structured", {}) or {} for item in weekly_digests]
daily_structured = [item.get("structured", {}) or {} for item in daily_digests]
observation_days = self._calc_observation_days(all_daily_digests)
activity_level = self._calc_activity_level(len(recent_messages), max(min(days, 7), 1))
context = {
"chatroom_id": chatroom_id,
"wxid": wxid,
"display_name": display_name,
"activity_level": activity_level,
"message_pattern": self._best_text(
daily_structured, ["message_pattern"], default=self._build_message_pattern(recent_messages)
),
"interaction_style": self._best_text(
daily_structured, ["interaction_style"], default=self._build_interaction_style(recent_messages)
),
"response_style_hint": self._build_response_style_hint_from_digests(
daily_structured, weekly_structured, monthly_structured
),
"topics_of_interest": self._extract_scored_items(
monthly_structured + weekly_structured, ["long_term_topics", "stable_topics", "topics"], limit=5
),
"recent_focus": self._extract_scored_items(daily_structured, ["topics"], limit=4),
"summary_text": "",
"confidence": self._calc_digest_confidence(all_monthly_digests, all_weekly_digests, all_daily_digests),
"source_message_count": len(recent_messages),
"source_days": days,
"last_profiled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"meta": {
"identity_traits": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["identity_traits", "identity_clues"], limit=5
),
"common_scenarios": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["common_scenarios", "discussion_scenarios"], limit=5
),
"skill_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["skill_profile", "skill_signals"], limit=6
),
"problem_solving_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["problem_solving_profile", "problem_solving_signals"], limit=5
),
"family_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["family_profile", "family_signals"], limit=4
),
"life_stage_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["life_stage_profile", "life_stage_signals"], limit=4
),
"value_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["value_profile", "value_preferences"], limit=5
),
"stable_traits": self._extract_scored_items(
monthly_structured + weekly_structured, ["stable_traits", "engagement_traits"], limit=self.stable_max_items
),
"habit_patterns": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["habit_patterns", "habit_signals"], limit=self.stable_max_items
),
"expression_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["expression_profile", "expression_markers"], limit=5
),
"reply_entry_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["reply_entry_profile", "reply_entry_points"], limit=4
),
"long_term_reply_preferences": self._extract_scored_items(
monthly_structured + weekly_structured, ["long_term_reply_preferences", "reply_preferences"], limit=4
),
"recent_state": self._extract_scored_items(
weekly_structured + daily_structured, ["recent_state", "phase_state", "topics"], limit=4
),
"temperament_tendency": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["temperament_tendency", "temperament_signal"], default=""
),
"group_role": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["group_role", "social_role"], default=""
),
"decision_profile": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["decision_profile", "decision_style"], default=""
),
"engagement_traits": self._extract_scored_items(
daily_structured + weekly_structured, ["engagement_traits", "stable_traits"], limit=4
),
"reply_taboos": self._extract_scored_items(daily_structured, ["reply_taboos"], limit=3),
"observation_days": observation_days,
"stable_ready": observation_days >= self.stable_ready_days,
"profile_iterations": int(((existing_context or {}).get("meta", {}) or {}).get("profile_iterations", 0)) + 1,
"history_message_count": self._sum_digest_source_count(all_daily_digests),
"digest_daily_count": len(all_daily_digests),
"digest_weekly_count": len(all_weekly_digests),
"digest_monthly_count": len(all_monthly_digests),
"last_daily_digest_at": daily_digests[0].get("last_generated_at") if daily_digests else "",
"last_weekly_digest_at": weekly_digests[0].get("last_generated_at") if weekly_digests else "",
"last_monthly_digest_at": monthly_digests[0].get("last_generated_at") if monthly_digests else "",
"refresh_mode": self._build_refresh_mode(existing_context, digest_snapshot, group_digest_stats),
},
}
ai_context = self._generate_ai_context_from_digests(
chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests
)
if ai_context:
context.update({
"activity_level": ai_context.get("activity_level") or context["activity_level"],
"message_pattern": ai_context.get("message_pattern") or context["message_pattern"],
"interaction_style": ai_context.get("interaction_style") or context["interaction_style"],
"response_style_hint": ai_context.get("response_style_hint") or context["response_style_hint"],
"topics_of_interest": ai_context.get("topics_of_interest") or context["topics_of_interest"],
"recent_focus": ai_context.get("recent_focus") or context["recent_focus"],
"summary_text": ai_context.get("summary_text") or context["summary_text"],
"confidence": ai_context.get("confidence", context["confidence"]),
})
context["meta"].update(ai_context.get("meta", {}))
context = self._merge_with_existing_context(existing_context, context)
context["summary_text"] = context.get("summary_text") or self._build_summary_text_from_context(context)
return context
def refresh_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None,
limit: Optional[int] = None, enable_weekly_digest: bool = True,
enable_monthly_digest: bool = True) -> Dict:
if not self.is_group_enabled(chatroom_id):
raise ValueError(f"{chatroom_id} 未启用成员交互摘要功能")
self.LOG.info(f"[成员交互摘要] 开始刷新单个成员: group={chatroom_id}, wxid={wxid}")
context = self.build_member_context(
chatroom_id, wxid, days=days, limit=limit,
enable_weekly_digest=enable_weekly_digest, enable_monthly_digest=enable_monthly_digest
)
self.member_context_db.save_member_context(context)
self.LOG.info(
f"[成员交互摘要] 单个成员刷新完成: group={chatroom_id}, wxid={wxid}, "
f"display_name={context.get('display_name', wxid)}, messages={context.get('source_message_count', 0)}, "
f"mode={context.get('meta', {}).get('refresh_mode', '')}, "
f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/"
f"{context.get('meta', {}).get('digest_weekly_count', 0)}/"
f"{context.get('meta', {}).get('digest_monthly_count', 0)}, "
f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}"
)
return context
def refresh_group_contexts(self, chatroom_id: str, days: Optional[int] = None,
limit_per_member: Optional[int] = None, enable_weekly_digest: bool = True,
enable_monthly_digest: bool = True) -> Dict:
days = days or self.sample_days
limit_per_member = limit_per_member or self.refresh_limit_per_member
if not self.is_group_enabled(chatroom_id):
self.LOG.info(f"{chatroom_id} 未启用成员交互摘要功能,跳过刷新")
return {"refreshed": 0, "skipped": 0, "disabled": True}
active_members = self._get_recent_active_members(chatroom_id)
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
enabled_members = {
member.get("wxid"): member for member in members
if member.get("status", 1) == 1 and member.get("wxid")
}
refreshed = 0
skipped = 0
total = len(active_members)
self.LOG.info(
f"[成员交互摘要] 开始刷新群: group={chatroom_id}, active_candidates={total}, "
f"days={days}, limit_per_member={limit_per_member}"
)
group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(chatroom_id)
self.LOG.info(
f"[成员交互摘要] 群日摘要批处理完成: group={chatroom_id}, "
f"mode={group_digest_stats.get('mode', '')}, "
f"days={group_digest_stats.get('days', 0)}, "
f"built_daily={group_digest_stats.get('built_daily', 0)}, "
f"touched_members={len(group_digest_stats.get('touched_members', []))}"
)
candidate_map = {
(member.get("wxid") or ""): dict(member)
for member in active_members
if member.get("wxid")
}
if group_digest_stats.get("mode") == "bootstrap":
for wxid in group_digest_stats.get("touched_members", []):
if not wxid or wxid in candidate_map:
continue
candidate_map[wxid] = {
"wxid": wxid,
"msg_count": 0,
"latest_message_time": "",
}
self.LOG.info(
f"[成员交互摘要] 初始化模式补充成员: group={chatroom_id}, "
f"bootstrap_members={len(group_digest_stats.get('touched_members', []))}, "
f"candidate_total={len(candidate_map)}"
)
active_members = list(candidate_map.values())
total = len(active_members)
if not active_members:
self.LOG.info(f"{chatroom_id} 初始化/增量后仍无可刷新成员,跳过刷新")
return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0}
for index, active_member in enumerate(active_members, start=1):
wxid = active_member.get("wxid")
if wxid not in enabled_members:
self.LOG.debug(
f"[成员交互摘要] 跳过成员(不在当前在群名单): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}"
)
continue
existing_context = self.member_context_db.get_member_context(chatroom_id, wxid)
# 周/月刷新不能再完全复用“画像是否新鲜”的跳过逻辑:
# 1. 现在 weekly_refresh/monthly_refresh 虽然被调度执行,但如果画像 24h 内刚更新过,
# 成员会在这里直接跳过,后面的 digest pipeline 根本不会运行;
# 2. 这会导致 t_member_digest 的 weekly/monthly 长期不产出,体感上就像“定时任务没跑”;
# 3. 因此这里把“是否需要刷新画像”和“是否需要推进周/月摘要流水线”拆开判断。
force_digest_pipeline = bool(enable_weekly_digest or enable_monthly_digest)
if not force_digest_pipeline and not self._should_refresh_context(existing_context, active_member):
skipped += 1
self.LOG.debug(
f"[成员交互摘要] 跳过成员(画像仍新鲜): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}, latest_message_time={active_member.get('latest_message_time')}, "
f"last_profiled_at={(existing_context or {}).get('last_profiled_at')}"
)
continue
context = self.build_member_context(
chatroom_id, wxid, days=days, limit=limit_per_member, ensure_group_daily=False,
enable_weekly_digest=enable_weekly_digest, enable_monthly_digest=enable_monthly_digest
)
if context["source_message_count"] <= 0 and context.get("meta", {}).get("digest_daily_count", 0) <= 0:
skipped += 1
self.LOG.debug(
f"[成员交互摘要] 跳过成员(样本不足): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}"
)
continue
self.member_context_db.save_member_context(context)
refreshed += 1
self.LOG.info(
f"[成员交互摘要] 刷新成员进度: group={chatroom_id}, index={index}/{total}, "
f"wxid={wxid}, display_name={context.get('display_name', wxid)}, "
f"messages={context.get('source_message_count', 0)}, "
f"activity={context.get('activity_level', '')}, "
f"mode={context.get('meta', {}).get('refresh_mode', '')}, "
f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/"
f"{context.get('meta', {}).get('digest_weekly_count', 0)}/"
f"{context.get('meta', {}).get('digest_monthly_count', 0)}, "
f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}"
)
self.LOG.info(
f"[成员交互摘要] 群刷新完成: group={chatroom_id}, refreshed={refreshed}, "
f"skipped={skipped}, active_candidates={total}"
)
return {"refreshed": refreshed, "skipped": skipped, "active_candidates": len(active_members)}
def refresh_all_chatrooms(self, days: Optional[int] = None, limit_per_member: Optional[int] = None,
enable_weekly_digest: bool = True, enable_monthly_digest: bool = True) -> Dict:
days = days or self.sample_days
limit_per_member = limit_per_member or self.refresh_limit_per_member
groups = self.contacts_db.get_chatroom_list() or []
active_group_ids = self._get_recent_active_chatrooms() if self.only_recent_active_groups else None
group_count = 0
member_count = 0
skipped = 0
disabled = 0
inactive = 0
processed_groups = 0
candidate_groups = [
group.get("chatroom_id") for group in groups
if group.get("chatroom_id") and (active_group_ids is None or group.get("chatroom_id") in active_group_ids)
]
total_groups = len(candidate_groups)
self.LOG.info(
f"[成员交互摘要] 开始批量刷新: candidate_groups={total_groups}, "
f"only_recent_active_groups={self.only_recent_active_groups}, active_hours={self.active_hours}, "
f"min_group_messages={self.min_group_messages}"
)
for group in groups:
chatroom_id = group.get("chatroom_id")
if not chatroom_id:
continue
if active_group_ids is not None and chatroom_id not in active_group_ids:
inactive += 1
continue
processed_groups += 1
self.LOG.info(
f"[成员交互摘要] 批量刷新进度: group_index={processed_groups}/{total_groups}, group={chatroom_id}"
)
result = self.refresh_group_contexts(
chatroom_id,
days=days,
limit_per_member=limit_per_member,
enable_weekly_digest=enable_weekly_digest,
enable_monthly_digest=enable_monthly_digest,
)
if result.get("disabled"):
disabled += 1
continue
group_count += 1
member_count += result["refreshed"]
skipped += result["skipped"]
self.LOG.info(
f"[成员交互摘要] 批量群结果: group={chatroom_id}, refreshed={result.get('refreshed', 0)}, "
f"skipped={result.get('skipped', 0)}, active_candidates={result.get('active_candidates', 0)}"
)
self.LOG.info(
f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, "
f"未启用群={disabled}, 非活跃群={inactive}"
)
return {"groups": group_count, "members": member_count, "skipped": skipped, "disabled_groups": disabled, "inactive_groups": inactive}
def is_group_enabled(self, chatroom_id: str) -> bool:
feature = Feature.get_feature(self.FEATURE_KEY)
if feature is None:
return True
return GroupBotManager.get_group_permission(chatroom_id, feature) == PermissionStatus.ENABLED
def _get_recent_active_members(self, chatroom_id: str) -> List[Dict]:
sql = """
SELECT
sender AS wxid,
COUNT(*) AS msg_count,
MAX(timestamp) AS latest_message_time
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR)
AND timestamp < CURDATE()
AND message_type IN (1, 49)
GROUP BY sender
HAVING COUNT(*) >= %s
ORDER BY latest_message_time DESC, msg_count DESC
LIMIT %s
"""
rows = self.message_db.execute_query(
sql,
(chatroom_id, self.active_member_hours, self.min_member_messages, self.max_members_per_group_per_run)
) or []
for row in rows:
latest_time = row.get("latest_message_time")
if isinstance(latest_time, datetime):
row["latest_message_time"] = latest_time.strftime("%Y-%m-%d %H:%M:%S")
return rows
def _should_refresh_context(self, existing_context: Optional[Dict], active_member: Dict) -> bool:
if not existing_context:
return True
latest_message_time = active_member.get("latest_message_time")
context_time = existing_context.get("last_profiled_at")
latest_dt = self._parse_datetime(latest_message_time)
context_dt = self._parse_datetime(context_time)
if not latest_dt or not context_dt:
return True
if latest_dt > context_dt and (latest_dt - context_dt).total_seconds() >= self.stale_hours * 3600:
return True
if (datetime.now() - context_dt).total_seconds() >= self.stale_hours * 3600 * 2:
return True
return False
@staticmethod
def _parse_datetime(value) -> Optional[datetime]:
if isinstance(value, datetime):
return value
if not value:
return None
try:
return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S")
except Exception:
try:
return datetime.strptime(str(value)[:10], "%Y-%m-%d")
except Exception:
return None
def _get_recent_active_chatrooms(self) -> set:
sql = """
SELECT group_id, COUNT(*) AS msg_count
FROM messages
WHERE group_id LIKE %s
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR)
AND timestamp < CURDATE()
GROUP BY group_id
HAVING COUNT(*) >= %s
"""
rows = self.message_db.execute_query(sql, ("%@chatroom", self.active_hours, self.min_group_messages)) or []
return {row.get("group_id") for row in rows if row.get("group_id")}
def _generate_ai_context_from_digests(self, chatroom_id: str, wxid: str, display_name: str,
monthly_digests: List[Dict], weekly_digests: List[Dict],
daily_digests: List[Dict]) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
if len(daily_digests) < 2 and len(weekly_digests) < 1 and len(monthly_digests) < 1:
return None
prompt = MemberContextPromptBuilder.build_final_context_prompt(
chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests
)
response = self.dify_client.run(
prompt=prompt,
user=f"member-context-final:{chatroom_id}:{wxid}",
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid},
tag=f"final:{wxid}",
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if not parsed:
self.LOG.warning(
f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, "
f"answer_preview={(response.get('text', '') or '')[:200]}"
)
return None
usage = response.get("usage", {}) or {}
parsed_meta = parsed.get("meta", {}) or {}
parsed_meta.update({
"ai_provider": self.dify_client.provider,
"ai_mode": self.dify_client.mode,
"ai_tokens": usage.get("total_tokens"),
"ai_latency": usage.get("latency"),
})
parsed["meta"] = parsed_meta
return parsed
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
def norm_list(value, limit):
if not isinstance(value, list):
return []
return [str(item).strip() for item in value[:limit] if str(item).strip()]
try:
confidence = float(data.get("confidence", 0))
except Exception:
confidence = 0.0
return {
"activity_level": str(data.get("activity_level", "")).strip(),
"message_pattern": str(data.get("message_pattern", "")).strip(),
"interaction_style": str(data.get("interaction_style", "")).strip(),
"response_style_hint": str(data.get("response_style_hint", "")).strip(),
"topics_of_interest": norm_list(data.get("topics_of_interest"), 5),
"recent_focus": norm_list(data.get("recent_focus"), 4),
"summary_text": str(data.get("summary_text", "")).strip(),
"confidence": max(0.0, min(1.0, confidence)),
"meta": {
"identity_traits": norm_list(data.get("identity_traits"), 5),
"common_scenarios": norm_list(data.get("common_scenarios"), 5),
"skill_profile": norm_list(data.get("skill_profile"), 6),
"problem_solving_profile": norm_list(data.get("problem_solving_profile"), 5),
"family_profile": norm_list(data.get("family_profile"), 4),
"life_stage_profile": norm_list(data.get("life_stage_profile"), 4),
"value_profile": norm_list(data.get("value_profile"), 5),
"stable_traits": norm_list(data.get("stable_traits"), self.stable_max_items),
"habit_patterns": norm_list(data.get("habit_patterns"), self.stable_max_items),
"expression_profile": norm_list(data.get("expression_profile"), 5),
"reply_entry_profile": norm_list(data.get("reply_entry_profile"), 4),
"long_term_reply_preferences": norm_list(data.get("long_term_reply_preferences"), 4),
"group_role": str(data.get("group_role", "")).strip(),
"decision_profile": str(data.get("decision_profile", "")).strip(),
"recent_state": norm_list(data.get("recent_state"), 4),
"temperament_tendency": str(data.get("temperament_tendency", "")).strip(),
"engagement_traits": norm_list(data.get("engagement_traits"), 4),
"reply_taboos": norm_list(data.get("reply_taboos"), 3),
}
}
def _merge_with_existing_context(self, existing_context: Optional[Dict], current_context: Dict) -> Dict:
existing_context = existing_context or {}
existing_meta = existing_context.get("meta", {}) or {}
meta = current_context.get("meta", {}) or {}
observation_days = max(
int(meta.get("observation_days", 0)),
int(existing_meta.get("observation_days", 0)),
)
meta["observation_days"] = observation_days
meta["stable_ready"] = observation_days >= self.stable_ready_days
merged_topic_scores = self._merge_scored_items(
existing_meta.get("topic_scores", {}),
current_context.get("topics_of_interest", []),
current_context.get("confidence", 0),
)
merged_trait_scores = self._merge_scored_items(
existing_meta.get("stable_trait_scores", {}),
meta.get("stable_traits", []),
current_context.get("confidence", 0),
)
merged_habit_scores = self._merge_scored_items(
existing_meta.get("habit_pattern_scores", {}),
meta.get("habit_patterns", []),
current_context.get("confidence", 0),
)
merged_identity_scores = self._merge_scored_items(
existing_meta.get("identity_trait_scores", {}),
meta.get("identity_traits", []),
current_context.get("confidence", 0) * 0.75,
)
merged_common_scenario_scores = self._merge_scored_items(
existing_meta.get("common_scenario_scores", {}),
meta.get("common_scenarios", []),
current_context.get("confidence", 0) * 0.85,
)
merged_skill_scores = self._merge_scored_items(
existing_meta.get("skill_profile_scores", {}),
meta.get("skill_profile", []),
current_context.get("confidence", 0) * 0.85,
)
merged_problem_solving_scores = self._merge_scored_items(
existing_meta.get("problem_solving_profile_scores", {}),
meta.get("problem_solving_profile", []),
current_context.get("confidence", 0) * 0.9,
)
merged_family_scores = self._merge_scored_items(
existing_meta.get("family_profile_scores", {}),
meta.get("family_profile", []),
current_context.get("confidence", 0) * 0.55,
)
merged_life_stage_scores = self._merge_scored_items(
existing_meta.get("life_stage_profile_scores", {}),
meta.get("life_stage_profile", []),
current_context.get("confidence", 0) * 0.65,
)
merged_value_scores = self._merge_scored_items(
existing_meta.get("value_profile_scores", {}),
meta.get("value_profile", []),
current_context.get("confidence", 0) * 0.75,
)
merged_expression_scores = self._merge_scored_items(
existing_meta.get("expression_profile_scores", {}),
meta.get("expression_profile", []),
current_context.get("confidence", 0) * 0.95,
)
merged_reply_entry_scores = self._merge_scored_items(
existing_meta.get("reply_entry_profile_scores", {}),
meta.get("reply_entry_profile", []),
current_context.get("confidence", 0),
)
merged_reply_pref_scores = self._merge_scored_items(
existing_meta.get("long_term_reply_preference_scores", {}),
meta.get("long_term_reply_preferences", []),
current_context.get("confidence", 0),
)
merged_temperament_scores = self._merge_scored_items(
existing_meta.get("temperament_tendency_scores", {}),
[meta.get("temperament_tendency")] if meta.get("temperament_tendency") else [],
current_context.get("confidence", 0) * 0.9,
)
meta["topic_scores"] = merged_topic_scores
meta["stable_trait_scores"] = merged_trait_scores
meta["habit_pattern_scores"] = merged_habit_scores
meta["identity_trait_scores"] = merged_identity_scores
meta["common_scenario_scores"] = merged_common_scenario_scores
meta["skill_profile_scores"] = merged_skill_scores
meta["problem_solving_profile_scores"] = merged_problem_solving_scores
meta["family_profile_scores"] = merged_family_scores
meta["life_stage_profile_scores"] = merged_life_stage_scores
meta["value_profile_scores"] = merged_value_scores
meta["expression_profile_scores"] = merged_expression_scores
meta["reply_entry_profile_scores"] = merged_reply_entry_scores
meta["long_term_reply_preference_scores"] = merged_reply_pref_scores
meta["temperament_tendency_scores"] = merged_temperament_scores
meta["identity_traits"] = self._top_scored_items(merged_identity_scores, limit=5)
meta["common_scenarios"] = self._top_scored_items(merged_common_scenario_scores, limit=5)
meta["skill_profile"] = self._top_scored_items(merged_skill_scores, limit=6)
meta["problem_solving_profile"] = self._top_scored_items(merged_problem_solving_scores, limit=5)
meta["family_profile"] = self._top_scored_items(merged_family_scores, limit=4)
meta["life_stage_profile"] = self._top_scored_items(merged_life_stage_scores, limit=4)
meta["value_profile"] = self._top_scored_items(merged_value_scores, limit=5)
meta["stable_traits"] = self._top_scored_items(merged_trait_scores, limit=self.stable_max_items)
meta["habit_patterns"] = self._top_scored_items(merged_habit_scores, limit=self.stable_max_items)
meta["expression_profile"] = self._top_scored_items(merged_expression_scores, limit=5)
meta["reply_entry_profile"] = self._top_scored_items(merged_reply_entry_scores, limit=4)
meta["long_term_reply_preferences"] = self._top_scored_items(merged_reply_pref_scores, limit=4)
temperament = self._top_scored_items(merged_temperament_scores, limit=1)
meta["temperament_tendency"] = temperament[0] if temperament else meta.get("temperament_tendency", "")
if not meta["identity_traits"]:
meta["identity_traits"] = (existing_meta.get("identity_traits") or [])[:5]
if not meta["common_scenarios"]:
meta["common_scenarios"] = (existing_meta.get("common_scenarios") or [])[:5]
if not meta["skill_profile"]:
meta["skill_profile"] = (existing_meta.get("skill_profile") or [])[:6]
if not meta["problem_solving_profile"]:
meta["problem_solving_profile"] = (existing_meta.get("problem_solving_profile") or [])[:5]
if not meta["family_profile"]:
meta["family_profile"] = (existing_meta.get("family_profile") or [])[:4]
if not meta["life_stage_profile"]:
meta["life_stage_profile"] = (existing_meta.get("life_stage_profile") or [])[:4]
if not meta["value_profile"]:
meta["value_profile"] = (existing_meta.get("value_profile") or [])[:5]
if not meta["expression_profile"]:
meta["expression_profile"] = (existing_meta.get("expression_profile") or [])[:5]
if not meta["reply_entry_profile"]:
meta["reply_entry_profile"] = (existing_meta.get("reply_entry_profile") or [])[:4]
meta["group_role"] = meta.get("group_role") or existing_meta.get("group_role") or ""
meta["decision_profile"] = meta.get("decision_profile") or existing_meta.get("decision_profile") or ""
meta["engagement_traits"] = (meta.get("engagement_traits") or existing_meta.get("engagement_traits") or [])[:4]
meta["reply_taboos"] = (meta.get("reply_taboos") or existing_meta.get("reply_taboos") or [])[:3]
meta["recent_state"] = (meta.get("recent_state") or existing_meta.get("recent_state") or [])[:4]
meta["profile_iterations"] = max(
int(meta.get("profile_iterations", 0)),
int(existing_meta.get("profile_iterations", 0)),
)
meta["history_message_count"] = max(
int(meta.get("history_message_count", 0)),
int(existing_meta.get("history_message_count", 0)),
)
current_context["topics_of_interest"] = self._top_scored_items(merged_topic_scores, limit=5) or current_context.get("topics_of_interest", [])
current_context["recent_focus"] = (current_context.get("recent_focus") or existing_context.get("recent_focus") or [])[:4]
current_context["response_style_hint"] = current_context.get("response_style_hint") or existing_context.get("response_style_hint") or ""
current_context["meta"] = meta
return current_context
def _extract_scored_items(self, items: List[Dict], keys: List[str], limit: int) -> List[str]:
scores = {}
for index, item in enumerate(items):
weight = max(0.5, 1.2 - index * 0.08)
for key in keys:
values = item.get(key, [])
if not isinstance(values, list):
continue
for value in values:
normalized = str(value).strip()
if not normalized:
continue
scores[normalized] = scores.get(normalized, 0.0) + weight
return [key for key, _ in sorted(scores.items(), key=lambda pair: pair[1], reverse=True)[:limit]]
def _best_text(self, items: List[Dict], keys: List[str], default: str = "") -> str:
counter = Counter()
for item in items:
for key in keys:
value = str(item.get(key, "")).strip()
if value:
counter[value] += 1
if counter:
return counter.most_common(1)[0][0]
return default
def _build_response_style_hint_from_digests(self, daily_structured: List[Dict],
weekly_structured: List[Dict],
monthly_structured: List[Dict]) -> str:
hint = self._best_text(daily_structured, ["response_style_hint"])
if hint:
return hint
preferences = self._extract_scored_items(
monthly_structured + weekly_structured,
["long_term_reply_preferences", "reply_preferences"],
limit=3,
)
if preferences:
return "更适合:" + "".join(preferences[:3])
entry_points = self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["reply_entry_profile", "reply_entry_points"],
limit=3,
)
if entry_points:
return "优先从这些点接话:" + "".join(entry_points[:3])
return "保持自然口语化,结论和解释尽量平衡"
def _calc_digest_confidence(self, monthly_digests: List[Dict], weekly_digests: List[Dict],
daily_digests: List[Dict]) -> float:
base = 0.25
base += min(0.35, len(monthly_digests) * 0.08)
base += min(0.2, len(weekly_digests) * 0.04)
base += min(0.15, len(daily_digests) * 0.02)
return round(min(0.95, base), 2)
def _calc_observation_days(self, daily_digests: List[Dict]) -> int:
if not daily_digests:
return 0
end_dt = self._parse_datetime(daily_digests[0].get("period_end"))
start_dt = self._parse_datetime(daily_digests[-1].get("period_start"))
if not start_dt or not end_dt:
return 0
return max(0, (end_dt - start_dt).days)
@staticmethod
def _sum_digest_source_count(daily_digests: List[Dict]) -> int:
return sum(int(item.get("source_count", 0)) for item in daily_digests)
def _build_refresh_mode(self, existing_context: Optional[Dict], digest_snapshot: Dict,
group_digest_stats: Optional[Dict] = None) -> str:
if not existing_context:
return "bootstrap"
if (group_digest_stats or {}).get("built_daily", 0) > 0:
return "daily_rollup"
if (digest_snapshot.get("stats", {}) or {}).get("built_monthly", 0) > 0:
return "recalibration"
return "incremental"
def _build_summary_text_from_context(self, context: Dict) -> str:
meta = context.get("meta", {}) or {}
parts = []
if meta.get("temperament_tendency"):
label = "长期沟通倾向" if meta.get("stable_ready") else "阶段性沟通倾向"
parts.append(f"{label}{meta.get('temperament_tendency')}")
if meta.get("common_scenarios"):
parts.append(f"常见场景:{''.join(meta.get('common_scenarios')[:3])}")
if meta.get("problem_solving_profile"):
parts.append(f"处理方式:{''.join(meta.get('problem_solving_profile')[:3])}")
if meta.get("stable_traits"):
parts.append(f"长期特征:{''.join(meta.get('stable_traits')[:3])}")
if meta.get("identity_traits"):
parts.append(f"身份线索:{''.join(meta.get('identity_traits')[:2])}")
if meta.get("skill_profile"):
parts.append(f"技能画像:{''.join(meta.get('skill_profile')[:3])}")
if meta.get("expression_profile"):
parts.append(f"表达标记:{''.join(meta.get('expression_profile')[:3])}")
if meta.get("value_profile"):
parts.append(f"判断偏好:{''.join(meta.get('value_profile')[:2])}")
if meta.get("habit_patterns"):
parts.append(f"习惯模式:{''.join(meta.get('habit_patterns')[:3])}")
if meta.get("recent_state"):
parts.append(f"近期状态:{''.join(meta.get('recent_state')[:3])}")
if context.get("response_style_hint"):
parts.append(f"回复建议:{context.get('response_style_hint')}")
return "".join(parts[:5])
def _merge_scored_items(self, existing_scores: Dict, current_items: List[str], confidence: float) -> Dict[str, float]:
merged = {}
for key, value in (existing_scores or {}).items():
try:
score = float(value) * self.stable_decay
except Exception:
continue
if score >= 0.2:
merged[str(key).strip()] = round(score, 4)
boost = max(0.6, min(1.8, 0.8 + confidence))
for item in current_items or []:
normalized = str(item).strip()
if not normalized:
continue
merged[normalized] = round(merged.get(normalized, 0.0) + boost, 4)
return merged
def _top_scored_items(self, scores: Dict, limit: int) -> List[str]:
ordered = sorted(
((str(key).strip(), float(value)) for key, value in (scores or {}).items() if str(key).strip()),
key=lambda item: item[1],
reverse=True,
)
return [key for key, value in ordered if value >= self.stable_min_score][:limit]
def _calc_activity_level(self, message_count: int, days: int) -> str:
daily_avg = message_count / max(days, 1)
if message_count >= 80 or daily_avg >= 3:
return "高活跃"
if message_count >= 25 or daily_avg >= 1:
return "中活跃"
if message_count > 0:
return "低活跃"
return "观察中"
def _build_message_pattern(self, messages: List[Dict]) -> str:
if not messages:
return "样本较少,暂不做明显模式判断"
contents = [m.get("content", "") for m in messages if m.get("content")]
if not contents:
return "样本较少,暂不做明显模式判断"
avg_len = sum(len(c) for c in contents) / len(contents)
question_ratio = sum(1 for c in contents if "?" in c or "" in c) / len(contents)
link_ratio = sum(1 for c in contents if "http://" in c or "https://" in c) / len(contents)
traits = []
if avg_len <= 12:
traits.append("短句居多")
elif avg_len >= 35:
traits.append("表达较完整")
else:
traits.append("表达中等长度")
if question_ratio >= 0.35:
traits.append("问题导向明显")
elif question_ratio >= 0.15:
traits.append("偶尔连续追问")
if link_ratio >= 0.15:
traits.append("常分享链接或资料")
return "".join(traits or ["发言较平稳"])
def _build_interaction_style(self, messages: List[Dict]) -> str:
if not messages:
return "互动样本较少"
contents = [m.get("content", "") for m in messages if m.get("content")]
question_ratio = sum(1 for c in contents if "?" in c or "" in c) / max(len(contents), 1)
emoji_ratio = sum(1 for c in contents if re.search(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]", c)) / max(len(contents), 1)
mention_ratio = sum(1 for c in contents if "@" in c) / max(len(contents), 1)
parts = []
if question_ratio >= 0.3:
parts.append("偏提问推进")
if emoji_ratio >= 0.15:
parts.append("表情互动感较强")
if mention_ratio >= 0.1:
parts.append("会主动点名互动")
return "".join(parts or ["自然跟随式互动"])