Files
abot/plugins/member_context/service.py
liuwei bfd0dbc15c 接入成员画像 Dify 工作流并清理旧提取逻辑
- 新增 member_context 专用 DifyClient,统一兼容 completion 与 workflow 两种调用模式
- 将成员画像插件默认切换到 Dify workflow 模式,配置改用新的 workflow 应用与 workflows/run 接口
- 生成可直接导入 Dify 的成员画像工作流 DSL 文件,方便后台一键导入和发布
- 补充 Dify 工作流接入说明文档,明确输入字段、输出字段、发布步骤与插件消费方式
- 清理旧的单成员日摘要提取链路,日级画像统一收敛到群日批量提取路径,减少无效分支和历史残留
- 去除 member_context 内部多处旧 requests 直连调用,统一改为通过 DifyClient 调用 AI 服务
- 优化群日批量结果解析逻辑,只按 wxid 作为唯一主键识别成员,不再依赖昵称做唯一判断
- 新增按 wxid 的结果去重与完整度评分逻辑,遇到重复成员结果时优先保留字段更完整、置信度更高的一条
- 保留现有初始化、增量、周/月聚合与最终画像生成链路,同时剔除 workflow 接入后已无效或低价值的旧逻辑
- 为后续继续收紧 fallback 标记、增强后台质量诊断和优化工作流输出稳定性打下基础
2026-04-02 14:25:50 +08:00

839 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import Counter
from datetime import datetime
from typing import Dict, List, Optional
from loguru import logger
from db.connection import DBConnectionManager
from db.contacts_db import ContactsDBOperator
from db.member_context_db import MemberContextDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.digest_service import MemberDigestService
from plugins.member_context.dify_client import DifyClient
from plugins.member_context.prompt_builder import MemberContextPromptBuilder
from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus
class MemberContextService:
"""成员交互摘要插件内部服务"""
FEATURE_KEY = "MEMBER_CONTEXT_CAPABILITY"
STOPWORDS = {
"这个", "那个", "就是", "然后", "怎么", "什么", "你们", "我们", "他们", "是不是", "可以",
"一下", "一个", "已经", "还有", "没有", "因为", "所以", "如果", "但是", "还是", "今天",
"昨天", "现在", "时候", "感觉", "真的", "应该", "知道", "觉得", "问题", "老师", "老板",
"群里", "大家", "一下子", "自己", "东西", "这里", "那里", "进行", "需要", "关于"
}
def __init__(self, db_manager: DBConnectionManager, plugin_config: Optional[Dict] = None):
self.db_manager = db_manager
self.contacts_db = ContactsDBOperator(self.db_manager)
self.message_db = MessageStorageDB(self.db_manager)
self.member_context_db = MemberContextDBOperator(self.db_manager)
self.member_digest_db = MemberDigestDBOperator(self.db_manager)
self.digest_service = MemberDigestService(
self.contacts_db, self.message_db, self.member_digest_db, plugin_config or {}
)
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
schedule_config = self.plugin_config.get("schedule", {})
self.dify_client = DifyClient(api_config)
self.ai_enabled = self.dify_client.enabled
self.ai_base_url = self.dify_client.base_url
self.ai_api_key = self.dify_client.api_key
self.ai_endpoint = self.dify_client.endpoint
self.ai_timeout = self.dify_client.timeout
self.sample_days = int(profile_config.get("sample_days", 30))
self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200))
self.active_member_hours = int(profile_config.get("active_member_hours", 72))
self.min_member_messages = int(profile_config.get("min_member_messages", 3))
self.max_members_per_group_per_run = int(profile_config.get("max_members_per_group_per_run", 30))
self.stale_hours = int(profile_config.get("stale_hours", 24))
self.stable_decay = float(profile_config.get("stable_decay", 0.96))
self.stable_max_items = int(profile_config.get("stable_max_items", 6))
self.stable_min_score = float(profile_config.get("stable_min_score", 0.9))
self.stable_ready_days = int(profile_config.get("stable_ready_days", 180))
self.only_recent_active_groups = bool(schedule_config.get("only_recent_active_groups", False))
self.active_hours = int(schedule_config.get("active_hours", 72))
self.min_group_messages = int(schedule_config.get("min_group_messages", 20))
def build_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None,
limit: Optional[int] = None, force_digest_rebuild: bool = False,
ensure_group_daily: bool = True) -> Dict:
days = days or self.sample_days
limit = limit or self.refresh_limit_per_member
existing_context = self.member_context_db.get_member_context(chatroom_id, wxid)
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
group_digest_stats = {"built_daily": 0, "touched_members": []}
if ensure_group_daily:
group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(
chatroom_id, force=force_digest_rebuild
)
digest_snapshot = self.digest_service.ensure_member_digest_pipeline(
chatroom_id, wxid, force=force_digest_rebuild
)
daily_digests = digest_snapshot.get("daily_digests", [])
weekly_digests = digest_snapshot.get("weekly_digests", [])
monthly_digests = digest_snapshot.get("monthly_digests", [])
recent_messages = self.message_db.get_member_recent_messages(
chatroom_id,
wxid,
days=min(days, 7),
limit=120,
include_today=False,
)
monthly_structured = [item.get("structured", {}) or {} for item in monthly_digests]
weekly_structured = [item.get("structured", {}) or {} for item in weekly_digests]
daily_structured = [item.get("structured", {}) or {} for item in daily_digests]
observation_days = self._calc_observation_days(daily_digests)
activity_level = self._calc_activity_level(len(recent_messages), max(min(days, 7), 1))
context = {
"chatroom_id": chatroom_id,
"wxid": wxid,
"display_name": display_name,
"activity_level": activity_level,
"message_pattern": self._best_text(
daily_structured, ["message_pattern"], default=self._build_message_pattern(recent_messages)
),
"interaction_style": self._best_text(
daily_structured, ["interaction_style"], default=self._build_interaction_style(recent_messages)
),
"response_style_hint": self._build_response_style_hint_from_digests(
daily_structured, weekly_structured, monthly_structured
),
"topics_of_interest": self._extract_scored_items(
monthly_structured + weekly_structured, ["long_term_topics", "stable_topics", "topics"], limit=5
),
"recent_focus": self._extract_scored_items(daily_structured, ["topics"], limit=4),
"summary_text": "",
"confidence": self._calc_digest_confidence(monthly_digests, weekly_digests, daily_digests),
"source_message_count": len(recent_messages),
"source_days": days,
"last_profiled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"meta": {
"identity_traits": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["identity_traits", "identity_clues"], limit=5
),
"skill_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["skill_profile", "skill_signals"], limit=6
),
"family_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["family_profile", "family_signals"], limit=4
),
"life_stage_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["life_stage_profile", "life_stage_signals"], limit=4
),
"value_profile": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["value_profile", "value_preferences"], limit=5
),
"stable_traits": self._extract_scored_items(
monthly_structured + weekly_structured, ["stable_traits", "engagement_traits"], limit=self.stable_max_items
),
"habit_patterns": self._extract_scored_items(
monthly_structured + weekly_structured + daily_structured,
["habit_patterns", "habit_signals"], limit=self.stable_max_items
),
"long_term_reply_preferences": self._extract_scored_items(
monthly_structured + weekly_structured, ["long_term_reply_preferences", "reply_preferences"], limit=4
),
"recent_state": self._extract_scored_items(
weekly_structured + daily_structured, ["recent_state", "phase_state", "topics"], limit=4
),
"temperament_tendency": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["temperament_tendency", "temperament_signal"], default=""
),
"group_role": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["group_role", "social_role"], default=""
),
"decision_profile": self._best_text(
monthly_structured + weekly_structured + daily_structured,
["decision_profile", "decision_style"], default=""
),
"engagement_traits": self._extract_scored_items(
daily_structured + weekly_structured, ["engagement_traits", "stable_traits"], limit=4
),
"reply_taboos": self._extract_scored_items(daily_structured, ["reply_taboos"], limit=3),
"observation_days": observation_days,
"stable_ready": observation_days >= self.stable_ready_days,
"profile_iterations": int(((existing_context or {}).get("meta", {}) or {}).get("profile_iterations", 0)) + 1,
"history_message_count": self._sum_digest_source_count(daily_digests),
"digest_daily_count": len(daily_digests),
"digest_weekly_count": len(weekly_digests),
"digest_monthly_count": len(monthly_digests),
"last_daily_digest_at": daily_digests[0].get("last_generated_at") if daily_digests else "",
"last_weekly_digest_at": weekly_digests[0].get("last_generated_at") if weekly_digests else "",
"last_monthly_digest_at": monthly_digests[0].get("last_generated_at") if monthly_digests else "",
"refresh_mode": self._build_refresh_mode(existing_context, digest_snapshot, group_digest_stats),
},
}
ai_context = self._generate_ai_context_from_digests(
chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests
)
if ai_context:
context.update({
"activity_level": ai_context.get("activity_level") or context["activity_level"],
"message_pattern": ai_context.get("message_pattern") or context["message_pattern"],
"interaction_style": ai_context.get("interaction_style") or context["interaction_style"],
"response_style_hint": ai_context.get("response_style_hint") or context["response_style_hint"],
"topics_of_interest": ai_context.get("topics_of_interest") or context["topics_of_interest"],
"recent_focus": ai_context.get("recent_focus") or context["recent_focus"],
"summary_text": ai_context.get("summary_text") or context["summary_text"],
"confidence": ai_context.get("confidence", context["confidence"]),
})
context["meta"].update(ai_context.get("meta", {}))
context = self._merge_with_existing_context(existing_context, context)
context["summary_text"] = context.get("summary_text") or self._build_summary_text_from_context(context)
return context
def refresh_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None,
limit: Optional[int] = None) -> Dict:
if not self.is_group_enabled(chatroom_id):
raise ValueError(f"{chatroom_id} 未启用成员交互摘要功能")
self.LOG.info(f"[成员交互摘要] 开始刷新单个成员: group={chatroom_id}, wxid={wxid}")
context = self.build_member_context(chatroom_id, wxid, days=days, limit=limit)
self.member_context_db.save_member_context(context)
self.LOG.info(
f"[成员交互摘要] 单个成员刷新完成: group={chatroom_id}, wxid={wxid}, "
f"display_name={context.get('display_name', wxid)}, messages={context.get('source_message_count', 0)}, "
f"mode={context.get('meta', {}).get('refresh_mode', '')}, "
f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/"
f"{context.get('meta', {}).get('digest_weekly_count', 0)}/"
f"{context.get('meta', {}).get('digest_monthly_count', 0)}, "
f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}"
)
return context
def refresh_group_contexts(self, chatroom_id: str, days: Optional[int] = None,
limit_per_member: Optional[int] = None) -> Dict:
days = days or self.sample_days
limit_per_member = limit_per_member or self.refresh_limit_per_member
if not self.is_group_enabled(chatroom_id):
self.LOG.info(f"{chatroom_id} 未启用成员交互摘要功能,跳过刷新")
return {"refreshed": 0, "skipped": 0, "disabled": True}
active_members = self._get_recent_active_members(chatroom_id)
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
enabled_members = {
member.get("wxid"): member for member in members
if member.get("status", 1) == 1 and member.get("wxid")
}
refreshed = 0
skipped = 0
total = len(active_members)
self.LOG.info(
f"[成员交互摘要] 开始刷新群: group={chatroom_id}, active_candidates={total}, "
f"days={days}, limit_per_member={limit_per_member}"
)
group_digest_stats = self.digest_service.ensure_recent_group_daily_digests(chatroom_id)
self.LOG.info(
f"[成员交互摘要] 群日摘要批处理完成: group={chatroom_id}, "
f"mode={group_digest_stats.get('mode', '')}, "
f"days={group_digest_stats.get('days', 0)}, "
f"built_daily={group_digest_stats.get('built_daily', 0)}, "
f"touched_members={len(group_digest_stats.get('touched_members', []))}"
)
candidate_map = {
(member.get("wxid") or ""): dict(member)
for member in active_members
if member.get("wxid")
}
if group_digest_stats.get("mode") == "bootstrap":
for wxid in group_digest_stats.get("touched_members", []):
if not wxid or wxid in candidate_map:
continue
candidate_map[wxid] = {
"wxid": wxid,
"msg_count": 0,
"latest_message_time": "",
}
self.LOG.info(
f"[成员交互摘要] 初始化模式补充成员: group={chatroom_id}, "
f"bootstrap_members={len(group_digest_stats.get('touched_members', []))}, "
f"candidate_total={len(candidate_map)}"
)
active_members = list(candidate_map.values())
total = len(active_members)
if not active_members:
self.LOG.info(f"{chatroom_id} 初始化/增量后仍无可刷新成员,跳过刷新")
return {"refreshed": 0, "skipped": 0, "disabled": False, "active_candidates": 0}
for index, active_member in enumerate(active_members, start=1):
wxid = active_member.get("wxid")
if wxid not in enabled_members:
self.LOG.debug(
f"[成员交互摘要] 跳过成员(不在当前在群名单): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}"
)
continue
existing_context = self.member_context_db.get_member_context(chatroom_id, wxid)
if not self._should_refresh_context(existing_context, active_member):
skipped += 1
self.LOG.debug(
f"[成员交互摘要] 跳过成员(画像仍新鲜): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}, latest_message_time={active_member.get('latest_message_time')}, "
f"last_profiled_at={(existing_context or {}).get('last_profiled_at')}"
)
continue
context = self.build_member_context(
chatroom_id, wxid, days=days, limit=limit_per_member, ensure_group_daily=False
)
if context["source_message_count"] <= 0 and context.get("meta", {}).get("digest_daily_count", 0) <= 0:
skipped += 1
self.LOG.debug(
f"[成员交互摘要] 跳过成员(样本不足): group={chatroom_id}, "
f"index={index}/{total}, wxid={wxid}"
)
continue
self.member_context_db.save_member_context(context)
refreshed += 1
self.LOG.info(
f"[成员交互摘要] 刷新成员进度: group={chatroom_id}, index={index}/{total}, "
f"wxid={wxid}, display_name={context.get('display_name', wxid)}, "
f"messages={context.get('source_message_count', 0)}, "
f"activity={context.get('activity_level', '')}, "
f"mode={context.get('meta', {}).get('refresh_mode', '')}, "
f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/"
f"{context.get('meta', {}).get('digest_weekly_count', 0)}/"
f"{context.get('meta', {}).get('digest_monthly_count', 0)}, "
f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}"
)
self.LOG.info(
f"[成员交互摘要] 群刷新完成: group={chatroom_id}, refreshed={refreshed}, "
f"skipped={skipped}, active_candidates={total}"
)
return {"refreshed": refreshed, "skipped": skipped, "active_candidates": len(active_members)}
def refresh_all_chatrooms(self, days: Optional[int] = None, limit_per_member: Optional[int] = None) -> Dict:
days = days or self.sample_days
limit_per_member = limit_per_member or self.refresh_limit_per_member
groups = self.contacts_db.get_chatroom_list() or []
active_group_ids = self._get_recent_active_chatrooms() if self.only_recent_active_groups else None
group_count = 0
member_count = 0
skipped = 0
disabled = 0
inactive = 0
processed_groups = 0
candidate_groups = [
group.get("chatroom_id") for group in groups
if group.get("chatroom_id") and (active_group_ids is None or group.get("chatroom_id") in active_group_ids)
]
total_groups = len(candidate_groups)
self.LOG.info(
f"[成员交互摘要] 开始批量刷新: candidate_groups={total_groups}, "
f"only_recent_active_groups={self.only_recent_active_groups}, active_hours={self.active_hours}, "
f"min_group_messages={self.min_group_messages}"
)
for group in groups:
chatroom_id = group.get("chatroom_id")
if not chatroom_id:
continue
if active_group_ids is not None and chatroom_id not in active_group_ids:
inactive += 1
continue
processed_groups += 1
self.LOG.info(
f"[成员交互摘要] 批量刷新进度: group_index={processed_groups}/{total_groups}, group={chatroom_id}"
)
result = self.refresh_group_contexts(chatroom_id, days=days, limit_per_member=limit_per_member)
if result.get("disabled"):
disabled += 1
continue
group_count += 1
member_count += result["refreshed"]
skipped += result["skipped"]
self.LOG.info(
f"[成员交互摘要] 批量群结果: group={chatroom_id}, refreshed={result.get('refreshed', 0)}, "
f"skipped={result.get('skipped', 0)}, active_candidates={result.get('active_candidates', 0)}"
)
self.LOG.info(
f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, "
f"未启用群={disabled}, 非活跃群={inactive}"
)
return {"groups": group_count, "members": member_count, "skipped": skipped, "disabled_groups": disabled, "inactive_groups": inactive}
def is_group_enabled(self, chatroom_id: str) -> bool:
feature = Feature.get_feature(self.FEATURE_KEY)
if feature is None:
return True
return GroupBotManager.get_group_permission(chatroom_id, feature) == PermissionStatus.ENABLED
def _get_recent_active_members(self, chatroom_id: str) -> List[Dict]:
sql = """
SELECT
sender AS wxid,
COUNT(*) AS msg_count,
MAX(timestamp) AS latest_message_time
FROM messages
WHERE group_id = %s
AND sender IS NOT NULL
AND sender <> ''
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR)
AND timestamp < CURDATE()
AND message_type IN (1, 49)
GROUP BY sender
HAVING COUNT(*) >= %s
ORDER BY latest_message_time DESC, msg_count DESC
LIMIT %s
"""
rows = self.message_db.execute_query(
sql,
(chatroom_id, self.active_member_hours, self.min_member_messages, self.max_members_per_group_per_run)
) or []
for row in rows:
latest_time = row.get("latest_message_time")
if isinstance(latest_time, datetime):
row["latest_message_time"] = latest_time.strftime("%Y-%m-%d %H:%M:%S")
return rows
def _should_refresh_context(self, existing_context: Optional[Dict], active_member: Dict) -> bool:
if not existing_context:
return True
latest_message_time = active_member.get("latest_message_time")
context_time = existing_context.get("last_profiled_at")
latest_dt = self._parse_datetime(latest_message_time)
context_dt = self._parse_datetime(context_time)
if not latest_dt or not context_dt:
return True
if latest_dt > context_dt and (latest_dt - context_dt).total_seconds() >= self.stale_hours * 3600:
return True
if (datetime.now() - context_dt).total_seconds() >= self.stale_hours * 3600 * 2:
return True
return False
@staticmethod
def _parse_datetime(value) -> Optional[datetime]:
if isinstance(value, datetime):
return value
if not value:
return None
try:
return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S")
except Exception:
try:
return datetime.strptime(str(value)[:10], "%Y-%m-%d")
except Exception:
return None
def _get_recent_active_chatrooms(self) -> set:
sql = """
SELECT group_id, COUNT(*) AS msg_count
FROM messages
WHERE group_id LIKE %s
AND timestamp >= DATE_SUB(CURDATE(), INTERVAL %s HOUR)
AND timestamp < CURDATE()
GROUP BY group_id
HAVING COUNT(*) >= %s
"""
rows = self.message_db.execute_query(sql, ("%@chatroom", self.active_hours, self.min_group_messages)) or []
return {row.get("group_id") for row in rows if row.get("group_id")}
def _generate_ai_context_from_digests(self, chatroom_id: str, wxid: str, display_name: str,
monthly_digests: List[Dict], weekly_digests: List[Dict],
daily_digests: List[Dict]) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
if len(daily_digests) < 2 and len(weekly_digests) < 1 and len(monthly_digests) < 1:
return None
prompt = MemberContextPromptBuilder.build_final_context_prompt(
chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests
)
response = self.dify_client.run(
prompt=prompt,
user=f"member-context-final:{chatroom_id}:{wxid}",
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid},
tag=f"final:{wxid}",
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if not parsed:
self.LOG.warning(
f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, "
f"answer_preview={(response.get('text', '') or '')[:200]}"
)
return None
usage = response.get("usage", {}) or {}
parsed_meta = parsed.get("meta", {}) or {}
parsed_meta.update({
"ai_provider": "dify",
"ai_mode": self.dify_client.mode,
"ai_tokens": usage.get("total_tokens"),
"ai_latency": usage.get("latency"),
})
parsed["meta"] = parsed_meta
return parsed
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
def norm_list(value, limit):
if not isinstance(value, list):
return []
return [str(item).strip() for item in value[:limit] if str(item).strip()]
try:
confidence = float(data.get("confidence", 0))
except Exception:
confidence = 0.0
return {
"activity_level": str(data.get("activity_level", "")).strip(),
"message_pattern": str(data.get("message_pattern", "")).strip(),
"interaction_style": str(data.get("interaction_style", "")).strip(),
"response_style_hint": str(data.get("response_style_hint", "")).strip(),
"topics_of_interest": norm_list(data.get("topics_of_interest"), 5),
"recent_focus": norm_list(data.get("recent_focus"), 4),
"summary_text": str(data.get("summary_text", "")).strip(),
"confidence": max(0.0, min(1.0, confidence)),
"meta": {
"identity_traits": norm_list(data.get("identity_traits"), 5),
"skill_profile": norm_list(data.get("skill_profile"), 6),
"family_profile": norm_list(data.get("family_profile"), 4),
"life_stage_profile": norm_list(data.get("life_stage_profile"), 4),
"value_profile": norm_list(data.get("value_profile"), 5),
"stable_traits": norm_list(data.get("stable_traits"), self.stable_max_items),
"habit_patterns": norm_list(data.get("habit_patterns"), self.stable_max_items),
"long_term_reply_preferences": norm_list(data.get("long_term_reply_preferences"), 4),
"group_role": str(data.get("group_role", "")).strip(),
"decision_profile": str(data.get("decision_profile", "")).strip(),
"recent_state": norm_list(data.get("recent_state"), 4),
"temperament_tendency": str(data.get("temperament_tendency", "")).strip(),
"engagement_traits": norm_list(data.get("engagement_traits"), 4),
"reply_taboos": norm_list(data.get("reply_taboos"), 3),
}
}
def _merge_with_existing_context(self, existing_context: Optional[Dict], current_context: Dict) -> Dict:
existing_context = existing_context or {}
existing_meta = existing_context.get("meta", {}) or {}
meta = current_context.get("meta", {}) or {}
observation_days = max(
int(meta.get("observation_days", 0)),
int(existing_meta.get("observation_days", 0)),
)
meta["observation_days"] = observation_days
meta["stable_ready"] = observation_days >= self.stable_ready_days
merged_topic_scores = self._merge_scored_items(
existing_meta.get("topic_scores", {}),
current_context.get("topics_of_interest", []),
current_context.get("confidence", 0),
)
merged_trait_scores = self._merge_scored_items(
existing_meta.get("stable_trait_scores", {}),
meta.get("stable_traits", []),
current_context.get("confidence", 0),
)
merged_habit_scores = self._merge_scored_items(
existing_meta.get("habit_pattern_scores", {}),
meta.get("habit_patterns", []),
current_context.get("confidence", 0),
)
merged_identity_scores = self._merge_scored_items(
existing_meta.get("identity_trait_scores", {}),
meta.get("identity_traits", []),
current_context.get("confidence", 0) * 0.75,
)
merged_skill_scores = self._merge_scored_items(
existing_meta.get("skill_profile_scores", {}),
meta.get("skill_profile", []),
current_context.get("confidence", 0) * 0.85,
)
merged_family_scores = self._merge_scored_items(
existing_meta.get("family_profile_scores", {}),
meta.get("family_profile", []),
current_context.get("confidence", 0) * 0.55,
)
merged_life_stage_scores = self._merge_scored_items(
existing_meta.get("life_stage_profile_scores", {}),
meta.get("life_stage_profile", []),
current_context.get("confidence", 0) * 0.65,
)
merged_value_scores = self._merge_scored_items(
existing_meta.get("value_profile_scores", {}),
meta.get("value_profile", []),
current_context.get("confidence", 0) * 0.75,
)
merged_reply_pref_scores = self._merge_scored_items(
existing_meta.get("long_term_reply_preference_scores", {}),
meta.get("long_term_reply_preferences", []),
current_context.get("confidence", 0),
)
merged_temperament_scores = self._merge_scored_items(
existing_meta.get("temperament_tendency_scores", {}),
[meta.get("temperament_tendency")] if meta.get("temperament_tendency") else [],
current_context.get("confidence", 0) * 0.9,
)
meta["topic_scores"] = merged_topic_scores
meta["stable_trait_scores"] = merged_trait_scores
meta["habit_pattern_scores"] = merged_habit_scores
meta["identity_trait_scores"] = merged_identity_scores
meta["skill_profile_scores"] = merged_skill_scores
meta["family_profile_scores"] = merged_family_scores
meta["life_stage_profile_scores"] = merged_life_stage_scores
meta["value_profile_scores"] = merged_value_scores
meta["long_term_reply_preference_scores"] = merged_reply_pref_scores
meta["temperament_tendency_scores"] = merged_temperament_scores
meta["identity_traits"] = self._top_scored_items(merged_identity_scores, limit=5)
meta["skill_profile"] = self._top_scored_items(merged_skill_scores, limit=6)
meta["family_profile"] = self._top_scored_items(merged_family_scores, limit=4)
meta["life_stage_profile"] = self._top_scored_items(merged_life_stage_scores, limit=4)
meta["value_profile"] = self._top_scored_items(merged_value_scores, limit=5)
meta["stable_traits"] = self._top_scored_items(merged_trait_scores, limit=self.stable_max_items)
meta["habit_patterns"] = self._top_scored_items(merged_habit_scores, limit=self.stable_max_items)
meta["long_term_reply_preferences"] = self._top_scored_items(merged_reply_pref_scores, limit=4)
temperament = self._top_scored_items(merged_temperament_scores, limit=1)
meta["temperament_tendency"] = temperament[0] if temperament else meta.get("temperament_tendency", "")
if not meta["identity_traits"]:
meta["identity_traits"] = (existing_meta.get("identity_traits") or [])[:5]
if not meta["skill_profile"]:
meta["skill_profile"] = (existing_meta.get("skill_profile") or [])[:6]
if not meta["family_profile"]:
meta["family_profile"] = (existing_meta.get("family_profile") or [])[:4]
if not meta["life_stage_profile"]:
meta["life_stage_profile"] = (existing_meta.get("life_stage_profile") or [])[:4]
if not meta["value_profile"]:
meta["value_profile"] = (existing_meta.get("value_profile") or [])[:5]
meta["group_role"] = meta.get("group_role") or existing_meta.get("group_role") or ""
meta["decision_profile"] = meta.get("decision_profile") or existing_meta.get("decision_profile") or ""
meta["engagement_traits"] = (meta.get("engagement_traits") or existing_meta.get("engagement_traits") or [])[:4]
meta["reply_taboos"] = (meta.get("reply_taboos") or existing_meta.get("reply_taboos") or [])[:3]
meta["recent_state"] = (meta.get("recent_state") or existing_meta.get("recent_state") or [])[:4]
meta["profile_iterations"] = max(
int(meta.get("profile_iterations", 0)),
int(existing_meta.get("profile_iterations", 0)),
)
meta["history_message_count"] = max(
int(meta.get("history_message_count", 0)),
int(existing_meta.get("history_message_count", 0)),
)
current_context["topics_of_interest"] = self._top_scored_items(merged_topic_scores, limit=5) or current_context.get("topics_of_interest", [])
current_context["recent_focus"] = (current_context.get("recent_focus") or existing_context.get("recent_focus") or [])[:4]
current_context["response_style_hint"] = current_context.get("response_style_hint") or existing_context.get("response_style_hint") or ""
current_context["meta"] = meta
return current_context
def _extract_scored_items(self, items: List[Dict], keys: List[str], limit: int) -> List[str]:
scores = {}
for index, item in enumerate(items):
weight = max(0.5, 1.2 - index * 0.08)
for key in keys:
values = item.get(key, [])
if not isinstance(values, list):
continue
for value in values:
normalized = str(value).strip()
if not normalized:
continue
scores[normalized] = scores.get(normalized, 0.0) + weight
return [key for key, _ in sorted(scores.items(), key=lambda pair: pair[1], reverse=True)[:limit]]
def _best_text(self, items: List[Dict], keys: List[str], default: str = "") -> str:
counter = Counter()
for item in items:
for key in keys:
value = str(item.get(key, "")).strip()
if value:
counter[value] += 1
if counter:
return counter.most_common(1)[0][0]
return default
def _build_response_style_hint_from_digests(self, daily_structured: List[Dict],
weekly_structured: List[Dict],
monthly_structured: List[Dict]) -> str:
hint = self._best_text(daily_structured, ["response_style_hint"])
if hint:
return hint
preferences = self._extract_scored_items(
monthly_structured + weekly_structured,
["long_term_reply_preferences", "reply_preferences"],
limit=3,
)
if preferences:
return "更适合:" + "".join(preferences[:3])
return "保持自然口语化,结论和解释尽量平衡"
def _calc_digest_confidence(self, monthly_digests: List[Dict], weekly_digests: List[Dict],
daily_digests: List[Dict]) -> float:
base = 0.25
base += min(0.35, len(monthly_digests) * 0.08)
base += min(0.2, len(weekly_digests) * 0.04)
base += min(0.15, len(daily_digests) * 0.02)
return round(min(0.95, base), 2)
def _calc_observation_days(self, daily_digests: List[Dict]) -> int:
if not daily_digests:
return 0
end_dt = self._parse_datetime(daily_digests[0].get("period_end"))
start_dt = self._parse_datetime(daily_digests[-1].get("period_start"))
if not start_dt or not end_dt:
return 0
return max(0, (end_dt - start_dt).days)
@staticmethod
def _sum_digest_source_count(daily_digests: List[Dict]) -> int:
return sum(int(item.get("source_count", 0)) for item in daily_digests)
def _build_refresh_mode(self, existing_context: Optional[Dict], digest_snapshot: Dict,
group_digest_stats: Optional[Dict] = None) -> str:
if not existing_context:
return "bootstrap"
if (group_digest_stats or {}).get("built_daily", 0) > 0:
return "daily_rollup"
if (digest_snapshot.get("stats", {}) or {}).get("built_monthly", 0) > 0:
return "recalibration"
return "incremental"
def _build_summary_text_from_context(self, context: Dict) -> str:
meta = context.get("meta", {}) or {}
parts = []
if meta.get("temperament_tendency"):
label = "长期沟通倾向" if meta.get("stable_ready") else "阶段性沟通倾向"
parts.append(f"{label}{meta.get('temperament_tendency')}")
if meta.get("stable_traits"):
parts.append(f"长期特征:{''.join(meta.get('stable_traits')[:3])}")
if meta.get("identity_traits"):
parts.append(f"身份线索:{''.join(meta.get('identity_traits')[:2])}")
if meta.get("skill_profile"):
parts.append(f"技能画像:{''.join(meta.get('skill_profile')[:3])}")
if meta.get("value_profile"):
parts.append(f"判断偏好:{''.join(meta.get('value_profile')[:2])}")
if meta.get("habit_patterns"):
parts.append(f"习惯模式:{''.join(meta.get('habit_patterns')[:3])}")
if meta.get("recent_state"):
parts.append(f"近期状态:{''.join(meta.get('recent_state')[:3])}")
if context.get("response_style_hint"):
parts.append(f"回复建议:{context.get('response_style_hint')}")
return "".join(parts[:5])
def _merge_scored_items(self, existing_scores: Dict, current_items: List[str], confidence: float) -> Dict[str, float]:
merged = {}
for key, value in (existing_scores or {}).items():
try:
score = float(value) * self.stable_decay
except Exception:
continue
if score >= 0.2:
merged[str(key).strip()] = round(score, 4)
boost = max(0.6, min(1.8, 0.8 + confidence))
for item in current_items or []:
normalized = str(item).strip()
if not normalized:
continue
merged[normalized] = round(merged.get(normalized, 0.0) + boost, 4)
return merged
def _top_scored_items(self, scores: Dict, limit: int) -> List[str]:
ordered = sorted(
((str(key).strip(), float(value)) for key, value in (scores or {}).items() if str(key).strip()),
key=lambda item: item[1],
reverse=True,
)
return [key for key, value in ordered if value >= self.stable_min_score][:limit]
def _calc_activity_level(self, message_count: int, days: int) -> str:
daily_avg = message_count / max(days, 1)
if message_count >= 80 or daily_avg >= 3:
return "高活跃"
if message_count >= 25 or daily_avg >= 1:
return "中活跃"
if message_count > 0:
return "低活跃"
return "观察中"
def _build_message_pattern(self, messages: List[Dict]) -> str:
if not messages:
return "样本较少,暂不做明显模式判断"
contents = [m.get("content", "") for m in messages if m.get("content")]
if not contents:
return "样本较少,暂不做明显模式判断"
avg_len = sum(len(c) for c in contents) / len(contents)
question_ratio = sum(1 for c in contents if "?" in c or "" in c) / len(contents)
link_ratio = sum(1 for c in contents if "http://" in c or "https://" in c) / len(contents)
traits = []
if avg_len <= 12:
traits.append("短句居多")
elif avg_len >= 35:
traits.append("表达较完整")
else:
traits.append("表达中等长度")
if question_ratio >= 0.35:
traits.append("问题导向明显")
elif question_ratio >= 0.15:
traits.append("偶尔连续追问")
if link_ratio >= 0.15:
traits.append("常分享链接或资料")
return "".join(traits or ["发言较平稳"])
def _build_interaction_style(self, messages: List[Dict]) -> str:
if not messages:
return "互动样本较少"
contents = [m.get("content", "") for m in messages if m.get("content")]
question_ratio = sum(1 for c in contents if "?" in c or "" in c) / max(len(contents), 1)
emoji_ratio = sum(1 for c in contents if re.search(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]", c)) / max(len(contents), 1)
mention_ratio = sum(1 for c in contents if "@" in c) / max(len(contents), 1)
parts = []
if question_ratio >= 0.3:
parts.append("偏提问推进")
if emoji_ratio >= 0.15:
parts.append("表情互动感较强")
if mention_ratio >= 0.1:
parts.append("会主动点名互动")
return "".join(parts or ["自然跟随式互动"])