Files
abot/plugins/member_context/digest_service.py
liuwei bfd0dbc15c 接入成员画像 Dify 工作流并清理旧提取逻辑
- 新增 member_context 专用 DifyClient,统一兼容 completion 与 workflow 两种调用模式
- 将成员画像插件默认切换到 Dify workflow 模式,配置改用新的 workflow 应用与 workflows/run 接口
- 生成可直接导入 Dify 的成员画像工作流 DSL 文件,方便后台一键导入和发布
- 补充 Dify 工作流接入说明文档,明确输入字段、输出字段、发布步骤与插件消费方式
- 清理旧的单成员日摘要提取链路,日级画像统一收敛到群日批量提取路径,减少无效分支和历史残留
- 去除 member_context 内部多处旧 requests 直连调用,统一改为通过 DifyClient 调用 AI 服务
- 优化群日批量结果解析逻辑,只按 wxid 作为唯一主键识别成员,不再依赖昵称做唯一判断
- 新增按 wxid 的结果去重与完整度评分逻辑,遇到重复成员结果时优先保留字段更完整、置信度更高的一条
- 保留现有初始化、增量、周/月聚合与最终画像生成链路,同时剔除 workflow 接入后已无效或低价值的旧逻辑
- 为后续继续收紧 fallback 标记、增强后台质量诊断和优化工作流输出稳定性打下基础
2026-04-02 14:25:50 +08:00

587 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from loguru import logger
from db.contacts_db import ContactsDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.dify_client import DifyClient
from plugins.member_context.prompt_builder import MemberContextPromptBuilder
from utils.compress_chat_data import compress_chat_data
class MemberDigestService:
"""成员分层摘要服务"""
def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB,
digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None):
self.contacts_db = contacts_db
self.message_db = message_db
self.digest_db = digest_db
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
self.dify_client = DifyClient(api_config)
self.ai_enabled = self.dify_client.enabled
self.ai_base_url = self.dify_client.base_url
self.ai_api_key = self.dify_client.api_key
self.ai_endpoint = self.dify_client.endpoint
self.ai_timeout = self.dify_client.timeout
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
force: bool = False) -> Dict:
bootstrap_mode = False
if days is None:
bootstrap_mode = self._should_bootstrap_group(chatroom_id)
days = self.bootstrap_days if bootstrap_mode else self.group_digest_days
built_daily = 0
touched_members = set()
processed_dates = []
for offset in range(days):
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
processed_dates.append(target_date)
for digest in digests:
self.digest_db.save_digest(digest)
built_daily += 1
touched_members.add(digest.get("wxid"))
mode = "bootstrap" if bootstrap_mode else "incremental"
if bootstrap_mode:
bootstrap_status = "done" if built_daily > 0 or touched_members else "empty"
self.digest_db.save_group_state(
chatroom_id,
bootstrap_status=bootstrap_status,
bootstrap_days=days,
built_daily_count=built_daily,
touched_member_count=len(touched_members),
extra={"processed_dates": processed_dates},
)
self.LOG.info(
f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, "
f"built_daily={built_daily}, touched_members={len(touched_members)}"
)
return {
"built_daily": built_daily,
"touched_members": list(touched_members),
"days": days,
"mode": mode,
"processed_dates": processed_dates,
}
def _should_bootstrap_group(self, chatroom_id: str) -> bool:
try:
group_state = self.digest_db.get_group_state(chatroom_id)
if group_state and group_state.get("bootstrap_status") in {"done", "empty"}:
return False
sql = """
SELECT 1
FROM t_member_digest
WHERE chatroom_id = %s AND digest_type = 'daily'
LIMIT 1
"""
row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True)
return not bool(row)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}")
return False
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict:
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
if not daily_digests:
return {
"display_name": display_name,
"daily_digests": [],
"weekly_digests": [],
"monthly_digests": [],
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
}
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=self.final_daily_limit)
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=self.final_weekly_limit)
monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=self.final_monthly_limit)
return {
"display_name": display_name,
"daily_digests": daily_digests,
"weekly_digests": weekly_digests,
"monthly_digests": monthly_digests,
"stats": {
"daily": len(daily_digests),
"weekly": len(weekly_digests),
"monthly": len(monthly_digests),
"active_days": len(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily")),
"built_daily": 0,
"built_weekly": built_weekly,
"built_monthly": built_monthly,
},
}
@staticmethod
def _normalize_profile_item(item: Dict) -> Dict:
normalized = {}
list_keys = {
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
"value_preferences", "habit_signals", "engagement_traits", "reply_taboos",
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
"family_profile", "life_stage_profile", "value_profile", "stable_traits",
"habit_patterns", "reply_preferences", "long_term_topics", "long_term_reply_preferences",
"phase_state", "recent_state"
}
for key, value in item.items():
if key in list_keys:
if isinstance(value, list):
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
elif value:
normalized[key] = [str(value).strip()]
else:
normalized[key] = []
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
grouped = defaultdict(list)
for item in daily_digests:
week_key, _, _ = self._week_period_bounds(item.get("period_key"))
grouped[week_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
current_week_key, _, _ = self._week_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for week_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and week_key in existing_keys and week_key != current_week_key:
continue
period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key"))
digest = self._build_period_digest(
"weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"week={period_key}, days={len(items)}"
)
return built
def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
grouped = defaultdict(list)
for item in weekly_digests:
month_key, _, _ = self._month_period_bounds(item.get("period_end"))
grouped[month_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
current_month_key, _, _ = self._month_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for month_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and month_key in existing_keys and month_key != current_month_key:
continue
period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end"))
digest = self._build_period_digest(
"monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"month={period_key}, weeks={len(items)}"
)
return built
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
member_name_map = {}
for member in members:
wxid = member.get("wxid")
if not wxid:
continue
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
if not messages:
return []
sender_messages = defaultdict(list)
for msg in messages:
wxid = msg.get("sender")
if not wxid:
continue
sender_messages[wxid].append(msg)
candidate_wxids = [
wxid for wxid, items in sender_messages.items()
if len(items) >= self.daily_digest_min_messages
]
if not candidate_wxids:
return []
pending_wxids = []
for wxid in candidate_wxids:
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
continue
pending_wxids.append(wxid)
if not pending_wxids:
self.LOG.info(
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
)
return []
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
try:
compact_chat = compress_chat_data(compact_chat)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
digests = []
for wxid in pending_wxids:
parsed = parsed_map.get(wxid) or self._build_daily_digest_fallback(sender_messages.get(wxid, []))
if not parsed:
continue
parsed = self._normalize_profile_item(parsed)
digests.append({
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": "daily",
"period_key": digest_date,
"period_start": f"{digest_date} 00:00:00",
"period_end": f"{digest_date} 23:59:59",
"display_name": member_name_map.get(wxid, wxid),
"source_count": len(sender_messages.get(wxid, [])),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_type": "group_daily_messages",
"representative_messages": parsed.get("representative_messages", []),
},
})
self.LOG.info(
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
)
return digests
def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str,
period_key: str, period_start: str, period_end: str,
items: List[Dict]) -> Optional[Dict]:
prompt = MemberContextPromptBuilder.build_period_digest_prompt(
digest_type, chatroom_id, wxid, display_name, period_key, items
)
parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid)
if not parsed:
parsed = self._build_period_digest_fallback(digest_type, items)
if not parsed:
return None
return {
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": digest_type,
"period_key": period_key,
"period_start": period_start,
"period_end": period_end,
"display_name": display_name,
"source_count": len(items),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_keys": [item.get("period_key") for item in items],
},
}
def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
response = self.dify_client.run(
prompt=prompt,
user=f"member-digest:{chatroom_id}:{wxid}:{tag}",
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag},
tag=tag,
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if parsed:
parsed["ai_usage"] = response.get("usage", {}) or {}
return parsed
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
member_labels: List[str], compressed_chat: str) -> List[Dict]:
if not self.dify_client.is_available():
return []
prompt = MemberContextPromptBuilder.build_group_daily_digest_prompt(
chatroom_id, digest_date, member_labels, compressed_chat
)
response = self.dify_client.run(
prompt=prompt,
user=f"member-digest:{chatroom_id}:group-daily:{digest_date}",
inputs={
"query": prompt,
"chatroom_id": chatroom_id,
"digest_date": digest_date,
"member_labels": "\n".join(member_labels),
"compressed_chat": compressed_chat,
},
tag=f"group-daily:{digest_date}",
)
if not response:
return []
parsed = self._parse_group_daily_answer(response.get("text", ""))
return parsed
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
normalized = {}
for key, value in data.items():
if isinstance(value, list):
normalized[key] = [str(item).strip() for item in value if str(item).strip()]
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
if not answer:
return []
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
parsed = json.loads(text)
except Exception:
return []
members = parsed.get("members", [])
if not isinstance(members, list):
return []
normalized_map = {}
for item in members:
if not isinstance(item, dict):
continue
normalized_item = self._normalize_profile_item(item)
wxid = normalized_item.get("wxid")
if not wxid:
continue
existing = normalized_map.get(wxid)
if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing):
normalized_map[wxid] = normalized_item
return list(normalized_map.values())
@staticmethod
def _score_profile_item(item: Dict) -> float:
if not item:
return 0.0
score = 0.0
for key, value in item.items():
if key in {"wxid", "display_name"}:
continue
if isinstance(value, list):
score += len([v for v in value if str(v).strip()]) * 1.0
elif isinstance(value, (int, float)):
score += float(value)
elif str(value).strip():
score += 0.8
try:
score += float(item.get("confidence", 0)) * 2
except Exception:
pass
return score
def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]:
if not messages:
return None
contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")]
if not contents:
return None
short_samples = [content[:60] for content in contents[:3]]
avg_len = sum(len(content) for content in contents) / max(len(contents), 1)
message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度"
return {
"topics": [],
"identity_clues": [],
"skill_signals": [],
"family_signals": [],
"life_stage_signals": [],
"value_preferences": [],
"interaction_style": "自然跟随式互动",
"message_pattern": message_pattern,
"response_style_hint": "保持简洁自然,先回应核心点",
"habit_signals": [],
"engagement_traits": [],
"decision_style": "",
"social_role": "",
"reply_taboos": [],
"temperament_signal": "当天样本有限,暂以中性沟通观察为主",
"summary_text": f"当日消息约{len(messages)}条,{message_pattern}",
"representative_messages": short_samples,
"confidence": 0.35,
}
def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]:
if not items:
return None
topic_counts = defaultdict(int)
trait_counts = defaultdict(int)
habit_counts = defaultdict(int)
reply_counts = defaultdict(int)
temperament_values = []
for item in items:
structured = item.get("structured", {}) or {}
for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []):
topic_counts[topic] += 1
for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []):
trait_counts[trait] += 1
for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []):
habit_counts[habit] += 1
for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []):
reply_counts[pref] += 1
if structured.get("temperament_signal"):
temperament_values.append(structured.get("temperament_signal"))
if structured.get("temperament_tendency"):
temperament_values.append(structured.get("temperament_tendency"))
top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]]
temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征"
if digest_type == "weekly":
return {
"stable_topics": top_topics,
"identity_traits": [],
"skill_profile": [],
"family_profile": [],
"life_stage_profile": [],
"value_profile": [],
"stable_traits": top_traits,
"habit_patterns": top_habits,
"reply_preferences": top_reply,
"group_role": "",
"decision_profile": "",
"recent_state": top_topics[:3],
"temperament_tendency": temperament,
"summary_text": "本周沟通特征已按重复信号汇总。",
"confidence": 0.45,
}
return {
"long_term_topics": top_topics,
"identity_traits": [],
"skill_profile": [],
"family_profile": [],
"life_stage_profile": [],
"value_profile": [],
"stable_traits": top_traits,
"habit_patterns": top_habits,
"long_term_reply_preferences": top_reply,
"group_role": "",
"decision_profile": "",
"phase_state": top_topics[:3],
"temperament_tendency": temperament,
"summary_text": "本月沟通特征已按周摘要汇总。",
"confidence": 0.5,
}
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
if not messages:
return ""
time_groups = defaultdict(lambda: defaultdict(list))
for msg in messages:
timestamp = msg.get("timestamp")
sender = msg.get("sender", "")
content = str(msg.get("content", "")).strip()
if not sender or not content:
continue
try:
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
except Exception:
dt = None
if not dt:
continue
time_key = dt.strftime("%H:%M")
sender_name = member_name_map.get(sender, sender)
time_groups[time_key][sender_name].append(content)
lines = []
for time_key in sorted(time_groups.keys()):
lines.append(f"{time_key}")
for sender_name, contents in time_groups[time_key].items():
for idx, content in enumerate(contents):
if idx == 0:
lines.append(f"{sender_name}{content}")
else:
lines.append(f" {content}")
return "\n".join(lines)
@staticmethod
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
week_start = target_date - timedelta(days=target_date.weekday())
week_end = week_start + timedelta(days=6)
week_key = f"{week_start.strftime('%Y-%m-%d')}"
return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59")
@staticmethod
def _month_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
month_start = target_dt.replace(day=1)
if month_start.month == 12:
next_month = month_start.replace(year=month_start.year + 1, month=1, day=1)
else:
next_month = month_start.replace(month=month_start.month + 1, day=1)
month_end = next_month - timedelta(days=1)
month_key = month_start.strftime("%Y-%m")
return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")