完善成员画像插件的日/周/月分层提取与已结束日期处理逻辑
- 将成员画像能力进一步收敛到插件内部,强化按群启用、后台异步刷新、后台查看的完整链路 - 新增群维度按日批量提取能力:以群为单位按天处理一次,统一提取当天活跃成员的日级画像摘要 - 日级画像输出扩展为更适合长期累计的结构化信号,补充身份线索、技能信号、家庭线索、阶段线索、价值偏好、群内角色、决策风格等字段 - 优化提示词设计,明确要求优先提取可复用、可累计、可验证的行为线索,减少一次性情绪和短期噪声对长期画像的干扰 - 打通日 -> 周 -> 月 -> 最终画像 的分层汇总链路,让后续月度画像直接消费日/周级结构化摘要,而不是重复回扫长窗口原始消息 - 新增/完善画像融合策略:identity_traits、skill_profile、family_profile、life_stage_profile、value_profile 也纳入长期分数累计,不再仅依赖最近一次结果覆盖旧结果 - 将活跃群、活跃成员、辅助消息样本等口径统一调整为只处理已结束日期,避免当天未完结数据进入画像计算 - 调整日级批处理逻辑,默认只处理昨天及更早日期,确保不会处理当天消息 - 修复重复执行时仍然先调用 AI 再跳过的问题,改为先检查当天候选成员是否已完成生成,全部已存在时直接跳过,减少无效 AI 请求和耗时 - 增加群日批处理、周摘要、月摘要、群刷新进度等日志,方便后台定位当前刷新到哪些群、哪些成员、进度如何 - 丰富后台画像展示字段,支持查看更完整的长期画像维度与摘要统计 - 更新插件配置默认值,收敛为近 60 天启动窗口、每日滚动处理与群级日摘要模式 - 补充 message_storage 读取能力,支持按群按日提取消息,为群日批量画像与后续周期汇总提供底层数据支撑
This commit is contained in:
@@ -12,6 +12,7 @@ from db.contacts_db import ContactsDBOperator
|
||||
from db.member_digest_db import MemberDigestDBOperator
|
||||
from db.message_storage import MessageStorageDB
|
||||
from plugins.member_context.prompt_builder import MemberContextPromptBuilder
|
||||
from utils.compress_chat_data import compress_chat_data
|
||||
|
||||
|
||||
class MemberDigestService:
|
||||
@@ -37,28 +38,47 @@ class MemberDigestService:
|
||||
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
|
||||
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
|
||||
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
|
||||
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 45))
|
||||
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
|
||||
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
|
||||
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
|
||||
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
|
||||
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
|
||||
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
|
||||
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
|
||||
|
||||
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
|
||||
force: bool = False) -> Dict:
|
||||
days = days or self.group_digest_days
|
||||
built_daily = 0
|
||||
touched_members = set()
|
||||
|
||||
for offset in range(days):
|
||||
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
|
||||
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
|
||||
for digest in digests:
|
||||
self.digest_db.save_digest(digest)
|
||||
built_daily += 1
|
||||
touched_members.add(digest.get("wxid"))
|
||||
|
||||
return {
|
||||
"built_daily": built_daily,
|
||||
"touched_members": list(touched_members),
|
||||
}
|
||||
|
||||
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict:
|
||||
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
|
||||
display_name = member.get("display_name") or member.get("nick_name") or wxid
|
||||
|
||||
active_dates = self.message_db.get_member_active_dates(chatroom_id, wxid, days=self.bootstrap_days)
|
||||
if not active_dates:
|
||||
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
|
||||
if not daily_digests:
|
||||
return {
|
||||
"display_name": display_name,
|
||||
"daily_digests": [],
|
||||
"weekly_digests": [],
|
||||
"monthly_digests": [],
|
||||
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0},
|
||||
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
|
||||
}
|
||||
|
||||
built_daily = self._ensure_daily_digests(chatroom_id, wxid, display_name, active_dates, force=force)
|
||||
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
|
||||
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
|
||||
|
||||
@@ -75,45 +95,37 @@ class MemberDigestService:
|
||||
"daily": len(daily_digests),
|
||||
"weekly": len(weekly_digests),
|
||||
"monthly": len(monthly_digests),
|
||||
"active_days": len(active_dates),
|
||||
"built_daily": built_daily,
|
||||
"active_days": len(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily")),
|
||||
"built_daily": 0,
|
||||
"built_weekly": built_weekly,
|
||||
"built_monthly": built_monthly,
|
||||
},
|
||||
}
|
||||
|
||||
def _ensure_daily_digests(self, chatroom_id: str, wxid: str, display_name: str,
|
||||
active_dates: List[Dict], force: bool = False) -> int:
|
||||
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily"))
|
||||
built = 0
|
||||
processed = 0
|
||||
sorted_dates = sorted(active_dates, key=lambda item: str(item.get("message_date")))
|
||||
current_day = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
for item in sorted_dates:
|
||||
period_key = str(item.get("message_date"))
|
||||
msg_count = int(item.get("msg_count", 0))
|
||||
if msg_count < self.daily_digest_min_messages:
|
||||
continue
|
||||
if not force and period_key in existing_keys and period_key != current_day:
|
||||
continue
|
||||
messages = self.message_db.get_member_messages_on_date(
|
||||
chatroom_id, wxid, period_key, limit=self.daily_message_limit
|
||||
)
|
||||
if len(messages) < self.daily_digest_min_messages:
|
||||
continue
|
||||
digest = self._build_daily_digest(chatroom_id, wxid, display_name, period_key, messages)
|
||||
if digest:
|
||||
self.digest_db.save_digest(digest)
|
||||
built += 1
|
||||
processed += 1
|
||||
self.LOG.info(
|
||||
f"[成员交互摘要][日摘要] 完成: group={chatroom_id}, wxid={wxid}, "
|
||||
f"date={period_key}, messages={len(messages)}"
|
||||
)
|
||||
if not force and processed >= self.max_daily_digests_per_run:
|
||||
break
|
||||
return built
|
||||
@staticmethod
|
||||
def _normalize_profile_item(item: Dict) -> Dict:
|
||||
normalized = {}
|
||||
list_keys = {
|
||||
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
|
||||
"value_preferences", "habit_signals", "engagement_traits", "reply_taboos",
|
||||
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
|
||||
"family_profile", "life_stage_profile", "value_profile", "stable_traits",
|
||||
"habit_patterns", "reply_preferences", "long_term_topics", "long_term_reply_preferences",
|
||||
"phase_state", "recent_state"
|
||||
}
|
||||
for key, value in item.items():
|
||||
if key in list_keys:
|
||||
if isinstance(value, list):
|
||||
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
|
||||
elif value:
|
||||
normalized[key] = [str(value).strip()]
|
||||
else:
|
||||
normalized[key] = []
|
||||
elif isinstance(value, (int, float)):
|
||||
normalized[key] = value
|
||||
else:
|
||||
normalized[key] = str(value).strip()
|
||||
return normalized
|
||||
|
||||
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
|
||||
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
|
||||
@@ -123,7 +135,7 @@ class MemberDigestService:
|
||||
grouped[week_key].append(item)
|
||||
|
||||
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
|
||||
current_week_key, _, _ = self._week_period_bounds(datetime.now().strftime("%Y-%m-%d"))
|
||||
current_week_key, _, _ = self._week_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
|
||||
built = 0
|
||||
for week_key, items in sorted(grouped.items()):
|
||||
if len(items) < 2:
|
||||
@@ -151,7 +163,7 @@ class MemberDigestService:
|
||||
grouped[month_key].append(item)
|
||||
|
||||
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
|
||||
current_month_key, _, _ = self._month_period_bounds(datetime.now().strftime("%Y-%m-%d"))
|
||||
current_month_key, _, _ = self._month_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
|
||||
built = 0
|
||||
for month_key, items in sorted(grouped.items()):
|
||||
if len(items) < 2:
|
||||
@@ -171,6 +183,83 @@ class MemberDigestService:
|
||||
)
|
||||
return built
|
||||
|
||||
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
|
||||
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
|
||||
member_name_map = {}
|
||||
for member in members:
|
||||
wxid = member.get("wxid")
|
||||
if not wxid:
|
||||
continue
|
||||
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
|
||||
|
||||
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
|
||||
if not messages:
|
||||
return []
|
||||
|
||||
sender_messages = defaultdict(list)
|
||||
for msg in messages:
|
||||
wxid = msg.get("sender")
|
||||
if not wxid:
|
||||
continue
|
||||
sender_messages[wxid].append(msg)
|
||||
|
||||
candidate_wxids = [
|
||||
wxid for wxid, items in sender_messages.items()
|
||||
if len(items) >= self.daily_digest_min_messages
|
||||
]
|
||||
if not candidate_wxids:
|
||||
return []
|
||||
|
||||
pending_wxids = []
|
||||
for wxid in candidate_wxids:
|
||||
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
|
||||
continue
|
||||
pending_wxids.append(wxid)
|
||||
if not pending_wxids:
|
||||
self.LOG.info(
|
||||
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
|
||||
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
|
||||
)
|
||||
return []
|
||||
|
||||
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
|
||||
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
|
||||
try:
|
||||
compact_chat = compress_chat_data(compact_chat)
|
||||
except Exception as e:
|
||||
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
|
||||
|
||||
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
|
||||
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
|
||||
|
||||
digests = []
|
||||
for wxid in pending_wxids:
|
||||
parsed = parsed_map.get(wxid) or self._build_daily_digest_fallback(sender_messages.get(wxid, []))
|
||||
if not parsed:
|
||||
continue
|
||||
parsed = self._normalize_profile_item(parsed)
|
||||
digests.append({
|
||||
"chatroom_id": chatroom_id,
|
||||
"wxid": wxid,
|
||||
"digest_type": "daily",
|
||||
"period_key": digest_date,
|
||||
"period_start": f"{digest_date} 00:00:00",
|
||||
"period_end": f"{digest_date} 23:59:59",
|
||||
"display_name": member_name_map.get(wxid, wxid),
|
||||
"source_count": len(sender_messages.get(wxid, [])),
|
||||
"summary_text": parsed.get("summary_text", ""),
|
||||
"structured": parsed,
|
||||
"meta": {
|
||||
"source_type": "group_daily_messages",
|
||||
"representative_messages": parsed.get("representative_messages", []),
|
||||
},
|
||||
})
|
||||
self.LOG.info(
|
||||
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
|
||||
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
|
||||
)
|
||||
return digests
|
||||
|
||||
def _build_daily_digest(self, chatroom_id: str, wxid: str, display_name: str,
|
||||
digest_date: str, messages: List[Dict]) -> Optional[Dict]:
|
||||
prompt = MemberContextPromptBuilder.build_daily_digest_prompt(
|
||||
@@ -256,6 +345,39 @@ class MemberDigestService:
|
||||
self.LOG.warning(f"[成员交互摘要][AI] 摘要请求失败: group={chatroom_id}, wxid={wxid}, tag={tag}, error={e}")
|
||||
return None
|
||||
|
||||
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
|
||||
member_labels: List[str], compressed_chat: str) -> List[Dict]:
|
||||
if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key:
|
||||
return []
|
||||
prompt = MemberContextPromptBuilder.build_group_daily_digest_prompt(
|
||||
chatroom_id, digest_date, member_labels, compressed_chat
|
||||
)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.ai_api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
payload = {
|
||||
"inputs": {"query": prompt},
|
||||
"response_mode": "blocking",
|
||||
"user": f"member-digest:{chatroom_id}:group-daily:{digest_date}",
|
||||
}
|
||||
url = f"{self.ai_base_url}/{self.ai_endpoint}"
|
||||
try:
|
||||
self.LOG.info(
|
||||
f"[成员交互摘要][AI] 发起群日批量摘要请求: group={chatroom_id}, "
|
||||
f"date={digest_date}, members={len(member_labels)}"
|
||||
)
|
||||
response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
parsed = self._parse_group_daily_answer(data.get("answer", ""))
|
||||
return parsed
|
||||
except Exception as e:
|
||||
self.LOG.warning(
|
||||
f"[成员交互摘要][AI] 群日批量摘要失败: group={chatroom_id}, date={digest_date}, error={e}"
|
||||
)
|
||||
return []
|
||||
|
||||
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
|
||||
if not answer:
|
||||
return None
|
||||
@@ -277,6 +399,29 @@ class MemberDigestService:
|
||||
normalized[key] = str(value).strip()
|
||||
return normalized
|
||||
|
||||
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
|
||||
if not answer:
|
||||
return []
|
||||
text = answer.strip()
|
||||
match = re.search(r"\{.*\}", text, re.S)
|
||||
if match:
|
||||
text = match.group(0)
|
||||
try:
|
||||
parsed = json.loads(text)
|
||||
except Exception:
|
||||
return []
|
||||
members = parsed.get("members", [])
|
||||
if not isinstance(members, list):
|
||||
return []
|
||||
normalized = []
|
||||
for item in members:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
normalized_item = self._normalize_profile_item(item)
|
||||
if normalized_item.get("wxid"):
|
||||
normalized.append(normalized_item)
|
||||
return normalized
|
||||
|
||||
def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]:
|
||||
if not messages:
|
||||
return None
|
||||
@@ -288,11 +433,18 @@ class MemberDigestService:
|
||||
message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度"
|
||||
return {
|
||||
"topics": [],
|
||||
"identity_clues": [],
|
||||
"skill_signals": [],
|
||||
"family_signals": [],
|
||||
"life_stage_signals": [],
|
||||
"value_preferences": [],
|
||||
"interaction_style": "自然跟随式互动",
|
||||
"message_pattern": message_pattern,
|
||||
"response_style_hint": "保持简洁自然,先回应核心点",
|
||||
"habit_signals": [],
|
||||
"engagement_traits": [],
|
||||
"decision_style": "",
|
||||
"social_role": "",
|
||||
"reply_taboos": [],
|
||||
"temperament_signal": "当天样本有限,暂以中性沟通观察为主",
|
||||
"summary_text": f"当日消息约{len(messages)}条,{message_pattern}。",
|
||||
@@ -332,9 +484,16 @@ class MemberDigestService:
|
||||
if digest_type == "weekly":
|
||||
return {
|
||||
"stable_topics": top_topics,
|
||||
"identity_traits": [],
|
||||
"skill_profile": [],
|
||||
"family_profile": [],
|
||||
"life_stage_profile": [],
|
||||
"value_profile": [],
|
||||
"stable_traits": top_traits,
|
||||
"habit_patterns": top_habits,
|
||||
"reply_preferences": top_reply,
|
||||
"group_role": "",
|
||||
"decision_profile": "",
|
||||
"recent_state": top_topics[:3],
|
||||
"temperament_tendency": temperament,
|
||||
"summary_text": "本周沟通特征已按重复信号汇总。",
|
||||
@@ -343,15 +502,53 @@ class MemberDigestService:
|
||||
|
||||
return {
|
||||
"long_term_topics": top_topics,
|
||||
"identity_traits": [],
|
||||
"skill_profile": [],
|
||||
"family_profile": [],
|
||||
"life_stage_profile": [],
|
||||
"value_profile": [],
|
||||
"stable_traits": top_traits,
|
||||
"habit_patterns": top_habits,
|
||||
"long_term_reply_preferences": top_reply,
|
||||
"group_role": "",
|
||||
"decision_profile": "",
|
||||
"phase_state": top_topics[:3],
|
||||
"temperament_tendency": temperament,
|
||||
"summary_text": "本月沟通特征已按周摘要汇总。",
|
||||
"confidence": 0.5,
|
||||
}
|
||||
|
||||
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
|
||||
if not messages:
|
||||
return ""
|
||||
time_groups = defaultdict(lambda: defaultdict(list))
|
||||
for msg in messages:
|
||||
timestamp = msg.get("timestamp")
|
||||
sender = msg.get("sender", "")
|
||||
content = str(msg.get("content", "")).strip()
|
||||
if not sender or not content:
|
||||
continue
|
||||
try:
|
||||
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
|
||||
except Exception:
|
||||
dt = None
|
||||
if not dt:
|
||||
continue
|
||||
time_key = dt.strftime("%H:%M")
|
||||
sender_name = member_name_map.get(sender, sender)
|
||||
time_groups[time_key][sender_name].append(content)
|
||||
|
||||
lines = []
|
||||
for time_key in sorted(time_groups.keys()):
|
||||
lines.append(f"【{time_key}】")
|
||||
for sender_name, contents in time_groups[time_key].items():
|
||||
for idx, content in enumerate(contents):
|
||||
if idx == 0:
|
||||
lines.append(f"{sender_name}:{content}")
|
||||
else:
|
||||
lines.append(f" {content}")
|
||||
return "\n".join(lines)
|
||||
|
||||
@staticmethod
|
||||
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
|
||||
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
|
||||
|
||||
Reference in New Issue
Block a user