Files
abot/plugins/member_context/digest_service.py

609 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from loguru import logger
from db.contacts_db import ContactsDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.dify_client import DifyClient
from utils.compress_chat_data import compress_chat_data
class MemberDigestService:
"""成员分层摘要服务"""
def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB,
digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None):
self.contacts_db = contacts_db
self.message_db = message_db
self.digest_db = digest_db
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
self.dify_client = DifyClient(api_config)
self.ai_enabled = self.dify_client.enabled
self.ai_base_url = self.dify_client.base_url
self.ai_api_key = self.dify_client.api_key
self.ai_endpoint = self.dify_client.endpoint
self.ai_timeout = self.dify_client.timeout
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
force: bool = False) -> Dict:
bootstrap_mode = False
if days is None:
bootstrap_mode = self._should_bootstrap_group(chatroom_id)
days = self.bootstrap_days if bootstrap_mode else self.group_digest_days
built_daily = 0
touched_members = set()
processed_dates = []
for offset in range(days):
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
processed_dates.append(target_date)
for digest in digests:
self.digest_db.save_digest(digest)
built_daily += 1
touched_members.add(digest.get("wxid"))
mode = "bootstrap" if bootstrap_mode else "incremental"
if bootstrap_mode:
bootstrap_status = "done" if built_daily > 0 or touched_members else "empty"
self.digest_db.save_group_state(
chatroom_id,
bootstrap_status=bootstrap_status,
bootstrap_days=days,
built_daily_count=built_daily,
touched_member_count=len(touched_members),
extra={"processed_dates": processed_dates},
)
self.LOG.info(
f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, "
f"built_daily={built_daily}, touched_members={len(touched_members)}"
)
return {
"built_daily": built_daily,
"touched_members": list(touched_members),
"days": days,
"mode": mode,
"processed_dates": processed_dates,
}
def _should_bootstrap_group(self, chatroom_id: str) -> bool:
try:
group_state = self.digest_db.get_group_state(chatroom_id)
if group_state and group_state.get("bootstrap_status") in {"done", "empty"}:
return False
sql = """
SELECT 1
FROM t_member_digest
WHERE chatroom_id = %s AND digest_type = 'daily'
LIMIT 1
"""
row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True)
return not bool(row)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}")
return False
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False,
enable_weekly: bool = True, enable_monthly: bool = True) -> Dict:
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
all_daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
if not all_daily_digests:
return {
"display_name": display_name,
"daily_digests": [],
"weekly_digests": [],
"monthly_digests": [],
"all_daily_digests": [],
"all_weekly_digests": [],
"all_monthly_digests": [],
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
}
latest_daily_date = self._extract_latest_daily_date(all_daily_digests)
built_weekly = 0
built_monthly = 0
if enable_weekly and (force or self._should_run_weekly(latest_daily_date)):
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
elif enable_weekly:
self.LOG.debug(
f"[成员交互摘要][周摘要] 本次跳过(未到周处理窗口): "
f"group={chatroom_id}, wxid={wxid}, latest_daily_date={latest_daily_date}"
)
if enable_monthly and (force or self._should_run_monthly(latest_daily_date)):
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
elif enable_monthly:
self.LOG.debug(
f"[成员交互摘要][月摘要] 本次跳过(未到月处理窗口): "
f"group={chatroom_id}, wxid={wxid}, latest_daily_date={latest_daily_date}"
)
all_weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
all_monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=120)
daily_digests = all_daily_digests[:self.final_daily_limit]
weekly_digests = all_weekly_digests[:self.final_weekly_limit]
monthly_digests = all_monthly_digests[:self.final_monthly_limit]
return {
"display_name": display_name,
"daily_digests": daily_digests,
"weekly_digests": weekly_digests,
"monthly_digests": monthly_digests,
"all_daily_digests": all_daily_digests,
"all_weekly_digests": all_weekly_digests,
"all_monthly_digests": all_monthly_digests,
"stats": {
"daily": len(all_daily_digests),
"weekly": len(all_weekly_digests),
"monthly": len(all_monthly_digests),
"active_days": len(all_daily_digests),
"built_daily": 0,
"built_weekly": built_weekly,
"built_monthly": built_monthly,
},
}
def _extract_latest_daily_date(self, daily_digests: List[Dict]) -> Optional[datetime]:
if not daily_digests:
return None
latest_key = daily_digests[0].get("period_key") or daily_digests[0].get("period_end")
return self._parse_period_date(latest_key)
@staticmethod
def _parse_period_date(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
return datetime.strptime(str(value)[:10], "%Y-%m-%d")
except Exception:
return None
def _should_run_weekly(self, latest_daily_date: Optional[datetime]) -> bool:
if not latest_daily_date:
return False
return latest_daily_date.weekday() == 6
def _should_run_monthly(self, latest_daily_date: Optional[datetime]) -> bool:
if not latest_daily_date:
return False
return (latest_daily_date + timedelta(days=1)).day == 1
@staticmethod
def _normalize_profile_item(item: Dict) -> Dict:
normalized = {}
list_keys = {
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
"discussion_scenarios", "problem_solving_signals", "value_preferences", "habit_signals",
"expression_markers", "engagement_traits", "reply_entry_points", "reply_taboos",
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
"problem_solving_profile", "family_profile", "life_stage_profile", "value_profile",
"stable_traits", "habit_patterns", "expression_profile", "reply_entry_profile",
"reply_preferences", "long_term_topics", "long_term_reply_preferences", "common_scenarios",
"phase_state", "recent_state"
}
for key, value in item.items():
if key in list_keys:
if isinstance(value, list):
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
elif value:
normalized[key] = [str(value).strip()]
else:
normalized[key] = []
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
grouped = defaultdict(list)
for item in daily_digests:
week_key, _, _ = self._week_period_bounds(item.get("period_key"))
grouped[week_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
current_week_key, _, _ = self._week_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for week_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and week_key in existing_keys and week_key != current_week_key:
continue
period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key"))
digest = self._build_period_digest(
"weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"week={period_key}, days={len(items)}"
)
return built
def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
grouped = defaultdict(list)
for item in weekly_digests:
month_key, _, _ = self._month_period_bounds(item.get("period_end"))
grouped[month_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
current_month_key, _, _ = self._month_period_bounds((datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"))
built = 0
for month_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
if not force and month_key in existing_keys and month_key != current_month_key:
continue
period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end"))
digest = self._build_period_digest(
"monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"month={period_key}, weeks={len(items)}"
)
return built
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
member_name_map = {}
for member in members:
wxid = member.get("wxid")
if not wxid:
continue
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
if not messages:
return []
sender_messages = defaultdict(list)
for msg in messages:
wxid = msg.get("sender")
if not wxid:
continue
sender_messages[wxid].append(msg)
candidate_wxids = [
wxid for wxid, items in sender_messages.items()
if len(items) >= self.daily_digest_min_messages
]
if not candidate_wxids:
return []
pending_wxids = []
for wxid in candidate_wxids:
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
continue
pending_wxids.append(wxid)
if not pending_wxids:
self.LOG.info(
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
)
return []
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
try:
compact_chat = compress_chat_data(compact_chat)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
digests = []
for wxid in pending_wxids:
parsed = parsed_map.get(wxid)
if not parsed:
self.LOG.warning(
f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): "
f"group={chatroom_id}, date={digest_date}, wxid={wxid}, "
f"source_count={len(sender_messages.get(wxid, []))}"
)
continue
parsed = self._normalize_profile_item(parsed)
digests.append({
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": "daily",
"period_key": digest_date,
"period_start": f"{digest_date} 00:00:00",
"period_end": f"{digest_date} 23:59:59",
"display_name": member_name_map.get(wxid, wxid),
"source_count": len(sender_messages.get(wxid, [])),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_type": "group_daily_messages",
"representative_messages": parsed.get("representative_messages", []),
},
})
self.LOG.info(
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
)
return digests
def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str,
period_key: str, period_start: str, period_end: str,
items: List[Dict]) -> Optional[Dict]:
parsed = self._request_period_json(
digest_type=digest_type,
chatroom_id=chatroom_id,
wxid=wxid,
display_name=display_name,
period_key=period_key,
items=items,
)
if not parsed:
self.LOG.warning(
f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): "
f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}, "
f"last_error={self.dify_client.last_error}"
)
return None
parsed = self._normalize_profile_item(parsed)
return {
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": digest_type,
"period_key": period_key,
"period_start": period_start,
"period_end": period_end,
"display_name": display_name,
"source_count": len(items),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_keys": [item.get("period_key") for item in items],
},
}
def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
response = self.dify_client.run(
prompt=prompt,
user=f"member-digest:{chatroom_id}:{wxid}:{tag}",
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag},
tag=tag,
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if parsed:
parsed["ai_usage"] = response.get("usage", {}) or {}
return parsed
def _request_period_json(self, digest_type: str, chatroom_id: str, wxid: str,
display_name: str, period_key: str, items: List[Dict]) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
inputs = {
"digest_type": digest_type,
"chatroom_id": chatroom_id,
"wxid": wxid,
"display_name": display_name,
"period_key": period_key,
"source_items_json": json.dumps(self._build_period_source_items(items), ensure_ascii=False),
"source_item_count": str(len(items)),
}
response = self.dify_client.run(
prompt="",
user=f"member-digest:{chatroom_id}:{wxid}:{digest_type}:{period_key}",
inputs=inputs,
tag=f"{digest_type}:{period_key}",
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if parsed:
parsed["ai_usage"] = response.get("usage", {}) or {}
return parsed
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
member_labels: List[str], compressed_chat: str) -> List[Dict]:
if not self.dify_client.is_available():
return []
response = self.dify_client.run(
prompt="",
user=f"member-digest:{chatroom_id}:group-daily:{digest_date}",
inputs={
"digest_type": "daily",
"chatroom_id": chatroom_id,
"digest_date": digest_date,
"member_labels": "\n".join(member_labels),
"compressed_chat": compressed_chat,
},
tag=f"group-daily:{digest_date}",
)
if not response:
return []
parsed = self._parse_group_daily_answer(response.get("text", ""))
return parsed
@staticmethod
def _build_period_source_items(items: List[Dict]) -> List[Dict]:
source_items = []
for item in items:
structured = item.get("structured", {}) or {}
source_items.append({
"period_key": item.get("period_key"),
"summary_text": item.get("summary_text", ""),
"topics": structured.get("topics") or structured.get("stable_topics") or structured.get("long_term_topics") or [],
"discussion_scenarios": structured.get("discussion_scenarios") or structured.get("common_scenarios") or [],
"identity_clues": structured.get("identity_clues") or structured.get("identity_traits") or [],
"skill_signals": structured.get("skill_signals") or structured.get("skill_profile") or [],
"problem_solving_signals": structured.get("problem_solving_signals") or structured.get("problem_solving_profile") or [],
"family_signals": structured.get("family_signals") or structured.get("family_profile") or [],
"life_stage_signals": structured.get("life_stage_signals") or structured.get("life_stage_profile") or [],
"value_preferences": structured.get("value_preferences") or structured.get("value_profile") or [],
"habit_signals": structured.get("habit_signals") or structured.get("habit_patterns") or [],
"expression_markers": structured.get("expression_markers") or structured.get("expression_profile") or [],
"engagement_traits": structured.get("engagement_traits") or structured.get("stable_traits") or [],
"reply_entry_points": structured.get("reply_entry_points") or structured.get("reply_entry_profile") or [],
"reply_preferences": structured.get("reply_preferences") or structured.get("long_term_reply_preferences") or [],
"social_role": structured.get("social_role") or structured.get("group_role") or "",
"decision_style": structured.get("decision_style") or structured.get("decision_profile") or "",
"temperament_signal": structured.get("temperament_signal") or structured.get("temperament_tendency") or "",
"recent_state": structured.get("recent_state") or structured.get("phase_state") or [],
})
return source_items
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
normalized = {}
for key, value in data.items():
if isinstance(value, list):
normalized[key] = [str(item).strip() for item in value if str(item).strip()]
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
if not answer:
return []
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
parsed = json.loads(text)
except Exception:
return []
members = parsed.get("members", [])
if not isinstance(members, list):
return []
normalized_map = {}
for item in members:
if not isinstance(item, dict):
continue
normalized_item = self._normalize_profile_item(item)
wxid = normalized_item.get("wxid")
if not wxid:
continue
existing = normalized_map.get(wxid)
if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing):
normalized_map[wxid] = normalized_item
return list(normalized_map.values())
@staticmethod
def _score_profile_item(item: Dict) -> float:
if not item:
return 0.0
score = 0.0
for key, value in item.items():
if key in {"wxid", "display_name"}:
continue
if isinstance(value, list):
score += len([v for v in value if str(v).strip()]) * 1.0
elif isinstance(value, (int, float)):
score += float(value)
elif str(value).strip():
score += 0.8
try:
score += float(item.get("confidence", 0)) * 2
except Exception:
pass
return score
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
if not messages:
return ""
time_groups = defaultdict(lambda: defaultdict(list))
for msg in messages:
timestamp = msg.get("timestamp")
sender = msg.get("sender", "")
content = str(msg.get("content", "")).strip()
if not sender or not content:
continue
try:
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
except Exception:
dt = None
if not dt:
continue
time_key = dt.strftime("%H:%M")
sender_name = member_name_map.get(sender, sender)
time_groups[time_key][sender_name].append(content)
lines = []
for time_key in sorted(time_groups.keys()):
lines.append(f"{time_key}")
for sender_name, contents in time_groups[time_key].items():
for idx, content in enumerate(contents):
if idx == 0:
lines.append(f"{sender_name}{content}")
else:
lines.append(f" {content}")
return "\n".join(lines)
@staticmethod
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
week_start = target_date - timedelta(days=target_date.weekday())
week_end = week_start + timedelta(days=6)
week_key = f"{week_start.strftime('%Y-%m-%d')}"
return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59")
@staticmethod
def _month_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
month_start = target_dt.replace(day=1)
if month_start.month == 12:
next_month = month_start.replace(year=month_start.year + 1, month=1, day=1)
else:
next_month = month_start.replace(month=month_start.month + 1, day=1)
month_end = next_month - timedelta(days=1)
month_key = month_start.strftime("%Y-%m")
return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")