Files
abot/plugins/member_context/digest_service.py
liuwei 75dc3b289d 优化成员画像初始化状态管理,避免空群重复回扫历史窗口
- 新增成员摘要群级状态表,记录每个群的初始化状态、最近一次初始化时间、初始化窗口天数、生成摘要数量与触达成员数量
- 将成员画像群日摘要逻辑拆分为初始化模式与日常增量模式
- 首次运行且群内尚无日摘要时,自动按 bootstrap_days 回补最近已结束日期
- 初始化完成后写入群级状态:有产出记为 done,无产出记为 empty
- 后续调度或手动刷新时,若群级状态已是 done/empty,则不再重复按 60 天历史窗口回扫,改为仅按日常增量窗口处理前一天数据
- 解决冷群、空群、长期低活跃群在每天定时任务中反复初始化扫描的问题,减少无意义数据库扫描与等待时间
- 调整刷新群画像逻辑,即使最近 72 小时无活跃成员,首次初始化也能先尝试补历史摘要,再决定是否需要刷新成员画像
- 初始化模式下,会把历史窗口中真正产出过日摘要的成员补充进候选刷新集合,避免只依赖最近 72 小时活跃成员导致历史初始化不完整
2026-04-02 13:54:24 +08:00

612 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import requests
from loguru import logger
from db.contacts_db import ContactsDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.prompt_builder import MemberContextPromptBuilder
from utils.compress_chat_data import compress_chat_data
class MemberDigestService:
"""成员分层摘要服务"""
def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB,
digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None):
self.contacts_db = contacts_db
self.message_db = message_db
self.digest_db = digest_db
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False)))
self.ai_base_url = (api_config.get("base_url") or "").rstrip("/")
self.ai_api_key = api_config.get("api_key", "")
self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/")
self.ai_timeout = int(api_config.get("request_timeout", 60))
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
force: bool = False) -> Dict:
bootstrap_mode = False
if days is None:
bootstrap_mode = self._should_bootstrap_group(chatroom_id)
days = self.bootstrap_days if bootstrap_mode else self.group_digest_days
built_daily = 0
touched_members = set()
processed_dates = []
for offset in range(days):
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
processed_dates.append(target_date)
for digest in digests:
self.digest_db.save_digest(digest)
built_daily += 1
touched_members.add(digest.get("wxid"))
mode = "bootstrap" if bootstrap_mode else "incremental"
if bootstrap_mode:
bootstrap_status = "done" if built_daily > 0 or touched_members else "empty"
self.digest_db.save_group_state(
chatroom_id,
bootstrap_status=bootstrap_status,
bootstrap_days=days,
built_daily_count=built_daily,
touched_member_count=len(touched_members),
extra={"processed_dates": processed_dates},
)
self.LOG.info(
f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, "
f"built_daily={built_daily}, touched_members={len(touched_members)}"
)
return {
"built_daily": built_daily,
"touched_members": list(touched_members),
"days": days,
"mode": mode,
"processed_dates": processed_dates,
}
def _should_bootstrap_group(self, chatroom_id: str) -> bool:
try:
group_state = self.digest_db.get_group_state(chatroom_id)
if group_state and group_state.get("bootstrap_status") in {"done", "empty"}:
return False
sql = """
SELECT 1
FROM t_member_digest
WHERE chatroom_id = %s AND digest_type = 'daily'
LIMIT 1
"""
row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True)
return not bool(row)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}")
return False
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict:
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
if not daily_digests:
return {
"display_name": display_name,
"daily_digests": [],
"weekly_digests": [],
"monthly_digests": [],
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
}
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=self.final_daily_limit)
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=self.final_weekly_limit)
monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=self.final_monthly_limit)
return {
"display_name": display_name,
"daily_digests": daily_digests,
"weekly_digests": weekly_digests,
"monthly_digests": monthly_digests,
"stats": {
"daily": len(daily_digests),
"weekly": len(weekly_digests),
"monthly": len(monthly_digests),
"active_days": len(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily")),
"built_daily": 0,
"built_weekly": built_weekly,
"built_monthly": built_monthly,
},
}
@staticmethod
def _normalize_profile_item(item: Dict) -> Dict:
normalized = {}
list_keys = {
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
"value_preferences", "habit_signals", "engagement_traits", "reply_taboos",
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
"family_profile", "life_stage_profile", "value_profile", "stable_traits",
"habit_patterns", "reply_preferences", "long_term_topics", "long_term_reply_preferences",
"phase_state", "recent_state"
}
for key, value in item.items():
if key in list_keys:
if isinstance(value, list):
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
elif value:
normalized[key] = [str(value).strip()]
else:
normalized[key] = []
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
grouped = defaultdict(list)
for item in daily_digests:
week_key, _, _ = self._week_period_bounds(item.get("period_key"))
grouped[week_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
current_week_key, _, _ = self._week_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for week_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and week_key in existing_keys and week_key != current_week_key:
continue
period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key"))
digest = self._build_period_digest(
"weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"week={period_key}, days={len(items)}"
)
return built
def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
grouped = defaultdict(list)
for item in weekly_digests:
month_key, _, _ = self._month_period_bounds(item.get("period_end"))
grouped[month_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
current_month_key, _, _ = self._month_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for month_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and month_key in existing_keys and month_key != current_month_key:
continue
period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end"))
digest = self._build_period_digest(
"monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"month={period_key}, weeks={len(items)}"
)
return built
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
member_name_map = {}
for member in members:
wxid = member.get("wxid")
if not wxid:
continue
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
if not messages:
return []
sender_messages = defaultdict(list)
for msg in messages:
wxid = msg.get("sender")
if not wxid:
continue
sender_messages[wxid].append(msg)
candidate_wxids = [
wxid for wxid, items in sender_messages.items()
if len(items) >= self.daily_digest_min_messages
]
if not candidate_wxids:
return []
pending_wxids = []
for wxid in candidate_wxids:
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
continue
pending_wxids.append(wxid)
if not pending_wxids:
self.LOG.info(
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
)
return []
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
try:
compact_chat = compress_chat_data(compact_chat)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
digests = []
for wxid in pending_wxids:
parsed = parsed_map.get(wxid) or self._build_daily_digest_fallback(sender_messages.get(wxid, []))
if not parsed:
continue
parsed = self._normalize_profile_item(parsed)
digests.append({
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": "daily",
"period_key": digest_date,
"period_start": f"{digest_date} 00:00:00",
"period_end": f"{digest_date} 23:59:59",
"display_name": member_name_map.get(wxid, wxid),
"source_count": len(sender_messages.get(wxid, [])),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_type": "group_daily_messages",
"representative_messages": parsed.get("representative_messages", []),
},
})
self.LOG.info(
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
)
return digests
def _build_daily_digest(self, chatroom_id: str, wxid: str, display_name: str,
digest_date: str, messages: List[Dict]) -> Optional[Dict]:
prompt = MemberContextPromptBuilder.build_daily_digest_prompt(
chatroom_id, wxid, display_name, digest_date, messages
)
parsed = self._request_ai_json(prompt, tag=f"daily:{digest_date}", chatroom_id=chatroom_id, wxid=wxid)
if not parsed:
parsed = self._build_daily_digest_fallback(messages)
if not parsed:
return None
period_start = f"{digest_date} 00:00:00"
period_end = f"{digest_date} 23:59:59"
return {
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": "daily",
"period_key": digest_date,
"period_start": period_start,
"period_end": period_end,
"display_name": display_name,
"source_count": len(messages),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_type": "messages",
"representative_messages": parsed.get("representative_messages", []),
},
}
def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str,
period_key: str, period_start: str, period_end: str,
items: List[Dict]) -> Optional[Dict]:
prompt = MemberContextPromptBuilder.build_period_digest_prompt(
digest_type, chatroom_id, wxid, display_name, period_key, items
)
parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid)
if not parsed:
parsed = self._build_period_digest_fallback(digest_type, items)
if not parsed:
return None
return {
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": digest_type,
"period_key": period_key,
"period_start": period_start,
"period_end": period_end,
"display_name": display_name,
"source_count": len(items),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_keys": [item.get("period_key") for item in items],
},
}
def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]:
if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key:
return None
headers = {
"Authorization": f"Bearer {self.ai_api_key}",
"Content-Type": "application/json",
}
payload = {
"inputs": {"query": prompt},
"response_mode": "blocking",
"user": f"member-digest:{chatroom_id}:{wxid}:{tag}",
}
url = f"{self.ai_base_url}/{self.ai_endpoint}"
try:
self.LOG.info(f"[成员交互摘要][AI] 发起摘要请求: group={chatroom_id}, wxid={wxid}, tag={tag}")
response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout)
response.raise_for_status()
data = response.json()
parsed = self._parse_ai_answer(data.get("answer", ""))
if parsed:
usage = (data.get("metadata") or {}).get("usage", {}) or {}
parsed["ai_usage"] = usage
return parsed
except Exception as e:
self.LOG.warning(f"[成员交互摘要][AI] 摘要请求失败: group={chatroom_id}, wxid={wxid}, tag={tag}, error={e}")
return None
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
member_labels: List[str], compressed_chat: str) -> List[Dict]:
if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key:
return []
prompt = MemberContextPromptBuilder.build_group_daily_digest_prompt(
chatroom_id, digest_date, member_labels, compressed_chat
)
headers = {
"Authorization": f"Bearer {self.ai_api_key}",
"Content-Type": "application/json",
}
payload = {
"inputs": {"query": prompt},
"response_mode": "blocking",
"user": f"member-digest:{chatroom_id}:group-daily:{digest_date}",
}
url = f"{self.ai_base_url}/{self.ai_endpoint}"
try:
self.LOG.info(
f"[成员交互摘要][AI] 发起群日批量摘要请求: group={chatroom_id}, "
f"date={digest_date}, members={len(member_labels)}"
)
response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout)
response.raise_for_status()
data = response.json()
parsed = self._parse_group_daily_answer(data.get("answer", ""))
return parsed
except Exception as e:
self.LOG.warning(
f"[成员交互摘要][AI] 群日批量摘要失败: group={chatroom_id}, date={digest_date}, error={e}"
)
return []
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
normalized = {}
for key, value in data.items():
if isinstance(value, list):
normalized[key] = [str(item).strip() for item in value if str(item).strip()]
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
if not answer:
return []
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
parsed = json.loads(text)
except Exception:
return []
members = parsed.get("members", [])
if not isinstance(members, list):
return []
normalized = []
for item in members:
if not isinstance(item, dict):
continue
normalized_item = self._normalize_profile_item(item)
if normalized_item.get("wxid"):
normalized.append(normalized_item)
return normalized
def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]:
if not messages:
return None
contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")]
if not contents:
return None
short_samples = [content[:60] for content in contents[:3]]
avg_len = sum(len(content) for content in contents) / max(len(contents), 1)
message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度"
return {
"topics": [],
"identity_clues": [],
"skill_signals": [],
"family_signals": [],
"life_stage_signals": [],
"value_preferences": [],
"interaction_style": "自然跟随式互动",
"message_pattern": message_pattern,
"response_style_hint": "保持简洁自然,先回应核心点",
"habit_signals": [],
"engagement_traits": [],
"decision_style": "",
"social_role": "",
"reply_taboos": [],
"temperament_signal": "当天样本有限,暂以中性沟通观察为主",
"summary_text": f"当日消息约{len(messages)}条,{message_pattern}",
"representative_messages": short_samples,
"confidence": 0.35,
}
def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]:
if not items:
return None
topic_counts = defaultdict(int)
trait_counts = defaultdict(int)
habit_counts = defaultdict(int)
reply_counts = defaultdict(int)
temperament_values = []
for item in items:
structured = item.get("structured", {}) or {}
for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []):
topic_counts[topic] += 1
for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []):
trait_counts[trait] += 1
for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []):
habit_counts[habit] += 1
for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []):
reply_counts[pref] += 1
if structured.get("temperament_signal"):
temperament_values.append(structured.get("temperament_signal"))
if structured.get("temperament_tendency"):
temperament_values.append(structured.get("temperament_tendency"))
top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]]
top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]]
temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征"
if digest_type == "weekly":
return {
"stable_topics": top_topics,
"identity_traits": [],
"skill_profile": [],
"family_profile": [],
"life_stage_profile": [],
"value_profile": [],
"stable_traits": top_traits,
"habit_patterns": top_habits,
"reply_preferences": top_reply,
"group_role": "",
"decision_profile": "",
"recent_state": top_topics[:3],
"temperament_tendency": temperament,
"summary_text": "本周沟通特征已按重复信号汇总。",
"confidence": 0.45,
}
return {
"long_term_topics": top_topics,
"identity_traits": [],
"skill_profile": [],
"family_profile": [],
"life_stage_profile": [],
"value_profile": [],
"stable_traits": top_traits,
"habit_patterns": top_habits,
"long_term_reply_preferences": top_reply,
"group_role": "",
"decision_profile": "",
"phase_state": top_topics[:3],
"temperament_tendency": temperament,
"summary_text": "本月沟通特征已按周摘要汇总。",
"confidence": 0.5,
}
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
if not messages:
return ""
time_groups = defaultdict(lambda: defaultdict(list))
for msg in messages:
timestamp = msg.get("timestamp")
sender = msg.get("sender", "")
content = str(msg.get("content", "")).strip()
if not sender or not content:
continue
try:
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
except Exception:
dt = None
if not dt:
continue
time_key = dt.strftime("%H:%M")
sender_name = member_name_map.get(sender, sender)
time_groups[time_key][sender_name].append(content)
lines = []
for time_key in sorted(time_groups.keys()):
lines.append(f"{time_key}")
for sender_name, contents in time_groups[time_key].items():
for idx, content in enumerate(contents):
if idx == 0:
lines.append(f"{sender_name}{content}")
else:
lines.append(f" {content}")
return "\n".join(lines)
@staticmethod
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
week_start = target_date - timedelta(days=target_date.weekday())
week_end = week_start + timedelta(days=6)
week_key = f"{week_start.strftime('%Y-%m-%d')}"
return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59")
@staticmethod
def _month_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
month_start = target_dt.replace(day=1)
if month_start.month == 12:
next_month = month_start.replace(year=month_start.year + 1, month=1, day=1)
else:
next_month = month_start.replace(month=month_start.month + 1, day=1)
month_end = next_month - timedelta(days=1)
month_key = month_start.strftime("%Y-%m")
return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")