Files
abot/plugins/member_context/digest_service.py

605 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import json
import re
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from loguru import logger
from db.contacts_db import ContactsDBOperator
from db.member_digest_db import MemberDigestDBOperator
from db.message_storage import MessageStorageDB
from plugins.member_context.dify_client import DifyClient
from utils.compress_chat_data import compress_chat_data
class MemberDigestService:
"""成员分层摘要服务"""
def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB,
digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None):
self.contacts_db = contacts_db
self.message_db = message_db
self.digest_db = digest_db
self.LOG = logger
self.plugin_config = plugin_config or {}
api_config = self.plugin_config.get("api", {})
profile_config = self.plugin_config.get("profile", {})
self.dify_client = DifyClient(api_config)
self.ai_enabled = self.dify_client.enabled
self.ai_base_url = self.dify_client.base_url
self.ai_api_key = self.dify_client.api_key
self.ai_endpoint = self.dify_client.endpoint
self.ai_timeout = self.dify_client.timeout
self.bootstrap_days = int(profile_config.get("bootstrap_days", 365))
self.daily_message_limit = int(profile_config.get("daily_message_limit", 120))
self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6))
self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0))
self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16))
self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12))
self.final_daily_limit = int(profile_config.get("final_daily_limit", 8))
self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6))
self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6))
self.group_digest_days = int(profile_config.get("group_digest_days", 1))
def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None,
force: bool = False) -> Dict:
bootstrap_mode = False
if days is None:
bootstrap_mode = self._should_bootstrap_group(chatroom_id)
days = self.bootstrap_days if bootstrap_mode else self.group_digest_days
built_daily = 0
touched_members = set()
processed_dates = []
for offset in range(days):
target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d")
digests = self._build_group_daily_digests(chatroom_id, target_date, force=force)
processed_dates.append(target_date)
for digest in digests:
self.digest_db.save_digest(digest)
built_daily += 1
touched_members.add(digest.get("wxid"))
mode = "bootstrap" if bootstrap_mode else "incremental"
if bootstrap_mode:
bootstrap_status = "done" if built_daily > 0 or touched_members else "empty"
self.digest_db.save_group_state(
chatroom_id,
bootstrap_status=bootstrap_status,
bootstrap_days=days,
built_daily_count=built_daily,
touched_member_count=len(touched_members),
extra={"processed_dates": processed_dates},
)
self.LOG.info(
f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, "
f"built_daily={built_daily}, touched_members={len(touched_members)}"
)
return {
"built_daily": built_daily,
"touched_members": list(touched_members),
"days": days,
"mode": mode,
"processed_dates": processed_dates,
}
def _should_bootstrap_group(self, chatroom_id: str) -> bool:
try:
group_state = self.digest_db.get_group_state(chatroom_id)
if group_state and group_state.get("bootstrap_status") in {"done", "empty"}:
return False
sql = """
SELECT 1
FROM t_member_digest
WHERE chatroom_id = %s AND digest_type = 'daily'
LIMIT 1
"""
row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True)
return not bool(row)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}")
return False
def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False,
enable_weekly: bool = True, enable_monthly: bool = True) -> Dict:
member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {}
display_name = member.get("display_name") or member.get("nick_name") or wxid
all_daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
if not all_daily_digests:
return {
"display_name": display_name,
"daily_digests": [],
"weekly_digests": [],
"monthly_digests": [],
"all_daily_digests": [],
"all_weekly_digests": [],
"all_monthly_digests": [],
"stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0},
}
built_weekly = 0
built_monthly = 0
# 周/月摘要改成“触发即补偿”的思路:
# 1. 只要进入周/月任务,就检查历史上所有“已完结但缺失”的周期;
# 2. 缺了就补,不再依赖“今天刚好是不是周日/月末”这种单点窗口;
# 3. 具体是否跳过当前未完结周期,放到 _ensure_weekly_digests / _ensure_monthly_digests 内部判断。
if enable_weekly:
built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force)
# 月摘要依赖周摘要,所以这里默认在周摘要补偿完成后再继续检查月摘要缺口。
if enable_monthly:
built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force)
all_weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
all_monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=120)
daily_digests = all_daily_digests[:self.final_daily_limit]
weekly_digests = all_weekly_digests[:self.final_weekly_limit]
monthly_digests = all_monthly_digests[:self.final_monthly_limit]
return {
"display_name": display_name,
"daily_digests": daily_digests,
"weekly_digests": weekly_digests,
"monthly_digests": monthly_digests,
"all_daily_digests": all_daily_digests,
"all_weekly_digests": all_weekly_digests,
"all_monthly_digests": all_monthly_digests,
"stats": {
"daily": len(all_daily_digests),
"weekly": len(all_weekly_digests),
"monthly": len(all_monthly_digests),
"active_days": len(all_daily_digests),
"built_daily": 0,
"built_weekly": built_weekly,
"built_monthly": built_monthly,
},
}
@staticmethod
def _get_closed_reference_date() -> datetime:
# 摘要只基于“已经完整结束的一天”做补偿判断。
# 例如凌晨跑任务时,当天仍在进行中,所以统一以“昨天”为参照,
# 这样就能稳定地判断出“哪些周/月已经完结,哪些还是当前进行中的周期”。
return datetime.now() - timedelta(days=1)
@staticmethod
def _normalize_profile_item(item: Dict) -> Dict:
normalized = {}
list_keys = {
"topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals",
"discussion_scenarios", "problem_solving_signals", "value_preferences", "habit_signals",
"expression_markers", "engagement_traits", "reply_entry_points", "reply_taboos",
"representative_messages", "stable_topics", "identity_traits", "skill_profile",
"problem_solving_profile", "family_profile", "life_stage_profile", "value_profile",
"stable_traits", "habit_patterns", "expression_profile", "reply_entry_profile",
"reply_preferences", "long_term_topics", "long_term_reply_preferences", "common_scenarios",
"phase_state", "recent_state"
}
for key, value in item.items():
if key in list_keys:
if isinstance(value, list):
normalized[key] = [str(v).strip() for v in value if str(v).strip()]
elif value:
normalized[key] = [str(value).strip()]
else:
normalized[key] = []
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400)
if not daily_digests:
return 0
grouped = defaultdict(list)
for item in daily_digests:
week_key, _, _ = self._week_period_bounds(item.get("period_key"))
grouped[week_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly"))
reference_date = self._get_closed_reference_date()
current_week_key, _, _ = self._week_period_bounds(reference_date.strftime("%Y-%m-%d"))
built = 0
for week_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
# 非强制模式下,只补“已完结”的历史周:
# 1. 当前参照周还没走完,不能提前生成;
# 2. 已存在的历史周不重复生成;
# 3. 这样周任务每次触发时,都能把之前漏掉的周摘要自动补齐。
if not force and week_key == current_week_key:
continue
if not force and week_key in existing_keys:
continue
period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key"))
digest = self._build_period_digest(
"weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"week={period_key}, days={len(items)}"
)
return built
def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int:
weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200)
if not weekly_digests:
return 0
grouped = defaultdict(list)
for item in weekly_digests:
month_key, _, _ = self._month_period_bounds(item.get("period_end"))
grouped[month_key].append(item)
existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly"))
reference_date = self._get_closed_reference_date()
current_month_key, _, _ = self._month_period_bounds(reference_date.strftime("%Y-%m-%d"))
built = 0
for month_key, items in sorted(grouped.items()):
if len(items) < 2:
continue
# 月摘要同样只补“已经完结的月份”:
# 1. 当前月仍可能继续产生新周摘要,不能过早固化;
# 2. 历史缺失月份在月任务/周任务触发时都会被补齐;
# 3. 这样即使某次月任务漏跑,后续任务也能自动追平。
if not force and month_key == current_month_key:
continue
if not force and month_key in existing_keys:
continue
period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end"))
digest = self._build_period_digest(
"monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items
)
if digest:
self.digest_db.save_digest(digest)
built += 1
self.LOG.info(
f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, "
f"month={period_key}, weeks={len(items)}"
)
return built
def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]:
members = self.contacts_db.get_chatroom_member_list(chatroom_id) or []
member_name_map = {}
for member in members:
wxid = member.get("wxid")
if not wxid:
continue
member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid
messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date)
if not messages:
return []
sender_messages = defaultdict(list)
for msg in messages:
wxid = msg.get("sender")
if not wxid:
continue
sender_messages[wxid].append(msg)
candidate_wxids = [
wxid for wxid, items in sender_messages.items()
if len(items) >= self.daily_digest_min_messages
]
if not candidate_wxids:
return []
pending_wxids = []
for wxid in candidate_wxids:
if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date):
continue
pending_wxids.append(wxid)
if not pending_wxids:
self.LOG.info(
f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, "
f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}"
)
return []
member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids]
compact_chat = self._format_group_messages_optimized(messages, member_name_map)
try:
compact_chat = compress_chat_data(compact_chat)
except Exception as e:
self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}")
parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat)
parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {}
digests = []
for wxid in pending_wxids:
parsed = parsed_map.get(wxid)
if not parsed:
self.LOG.warning(
f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): "
f"group={chatroom_id}, date={digest_date}, wxid={wxid}, "
f"source_count={len(sender_messages.get(wxid, []))}"
)
continue
parsed = self._normalize_profile_item(parsed)
digests.append({
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": "daily",
"period_key": digest_date,
"period_start": f"{digest_date} 00:00:00",
"period_end": f"{digest_date} 23:59:59",
"display_name": member_name_map.get(wxid, wxid),
"source_count": len(sender_messages.get(wxid, [])),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_type": "group_daily_messages",
"representative_messages": parsed.get("representative_messages", []),
},
})
self.LOG.info(
f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, "
f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}"
)
return digests
def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str,
period_key: str, period_start: str, period_end: str,
items: List[Dict]) -> Optional[Dict]:
parsed = self._request_period_json(
digest_type=digest_type,
chatroom_id=chatroom_id,
wxid=wxid,
display_name=display_name,
period_key=period_key,
items=items,
)
if not parsed:
self.LOG.warning(
f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): "
f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}, "
f"last_error={self.dify_client.last_error}"
)
return None
parsed = self._normalize_profile_item(parsed)
return {
"chatroom_id": chatroom_id,
"wxid": wxid,
"digest_type": digest_type,
"period_key": period_key,
"period_start": period_start,
"period_end": period_end,
"display_name": display_name,
"source_count": len(items),
"summary_text": parsed.get("summary_text", ""),
"structured": parsed,
"meta": {
"source_keys": [item.get("period_key") for item in items],
},
}
def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
response = self.dify_client.run(
prompt=prompt,
user=f"member-digest:{chatroom_id}:{wxid}:{tag}",
inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag},
tag=tag,
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if parsed:
parsed["ai_usage"] = response.get("usage", {}) or {}
return parsed
def _request_period_json(self, digest_type: str, chatroom_id: str, wxid: str,
display_name: str, period_key: str, items: List[Dict]) -> Optional[Dict]:
if not self.dify_client.is_available():
return None
inputs = {
"digest_type": digest_type,
"chatroom_id": chatroom_id,
"wxid": wxid,
"display_name": display_name,
"period_key": period_key,
"source_items_json": json.dumps(self._build_period_source_items(items), ensure_ascii=False),
"source_item_count": str(len(items)),
}
response = self.dify_client.run(
prompt="",
user=f"member-digest:{chatroom_id}:{wxid}:{digest_type}:{period_key}",
inputs=inputs,
tag=f"{digest_type}:{period_key}",
)
if not response:
return None
parsed = self._parse_ai_answer(response.get("text", ""))
if parsed:
parsed["ai_usage"] = response.get("usage", {}) or {}
return parsed
def _request_group_daily_json(self, chatroom_id: str, digest_date: str,
member_labels: List[str], compressed_chat: str) -> List[Dict]:
if not self.dify_client.is_available():
return []
response = self.dify_client.run(
prompt="",
user=f"member-digest:{chatroom_id}:group-daily:{digest_date}",
inputs={
"digest_type": "daily",
"chatroom_id": chatroom_id,
"digest_date": digest_date,
"member_labels": "\n".join(member_labels),
"compressed_chat": compressed_chat,
},
tag=f"group-daily:{digest_date}",
)
if not response:
return []
parsed = self._parse_group_daily_answer(response.get("text", ""))
return parsed
@staticmethod
def _build_period_source_items(items: List[Dict]) -> List[Dict]:
source_items = []
for item in items:
structured = item.get("structured", {}) or {}
source_items.append({
"period_key": item.get("period_key"),
"summary_text": item.get("summary_text", ""),
"topics": structured.get("topics") or structured.get("stable_topics") or structured.get("long_term_topics") or [],
"discussion_scenarios": structured.get("discussion_scenarios") or structured.get("common_scenarios") or [],
"identity_clues": structured.get("identity_clues") or structured.get("identity_traits") or [],
"skill_signals": structured.get("skill_signals") or structured.get("skill_profile") or [],
"problem_solving_signals": structured.get("problem_solving_signals") or structured.get("problem_solving_profile") or [],
"family_signals": structured.get("family_signals") or structured.get("family_profile") or [],
"life_stage_signals": structured.get("life_stage_signals") or structured.get("life_stage_profile") or [],
"value_preferences": structured.get("value_preferences") or structured.get("value_profile") or [],
"habit_signals": structured.get("habit_signals") or structured.get("habit_patterns") or [],
"expression_markers": structured.get("expression_markers") or structured.get("expression_profile") or [],
"engagement_traits": structured.get("engagement_traits") or structured.get("stable_traits") or [],
"reply_entry_points": structured.get("reply_entry_points") or structured.get("reply_entry_profile") or [],
"reply_preferences": structured.get("reply_preferences") or structured.get("long_term_reply_preferences") or [],
"social_role": structured.get("social_role") or structured.get("group_role") or "",
"decision_style": structured.get("decision_style") or structured.get("decision_profile") or "",
"temperament_signal": structured.get("temperament_signal") or structured.get("temperament_tendency") or "",
"recent_state": structured.get("recent_state") or structured.get("phase_state") or [],
})
return source_items
def _parse_ai_answer(self, answer: str) -> Optional[Dict]:
if not answer:
return None
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
data = json.loads(text)
except Exception:
return None
normalized = {}
for key, value in data.items():
if isinstance(value, list):
normalized[key] = [str(item).strip() for item in value if str(item).strip()]
elif isinstance(value, (int, float)):
normalized[key] = value
else:
normalized[key] = str(value).strip()
return normalized
def _parse_group_daily_answer(self, answer: str) -> List[Dict]:
if not answer:
return []
text = answer.strip()
match = re.search(r"\{.*\}", text, re.S)
if match:
text = match.group(0)
try:
parsed = json.loads(text)
except Exception:
return []
members = parsed.get("members", [])
if not isinstance(members, list):
return []
normalized_map = {}
for item in members:
if not isinstance(item, dict):
continue
normalized_item = self._normalize_profile_item(item)
wxid = normalized_item.get("wxid")
if not wxid:
continue
existing = normalized_map.get(wxid)
if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing):
normalized_map[wxid] = normalized_item
return list(normalized_map.values())
@staticmethod
def _score_profile_item(item: Dict) -> float:
if not item:
return 0.0
score = 0.0
for key, value in item.items():
if key in {"wxid", "display_name"}:
continue
if isinstance(value, list):
score += len([v for v in value if str(v).strip()]) * 1.0
elif isinstance(value, (int, float)):
score += float(value)
elif str(value).strip():
score += 0.8
try:
score += float(item.get("confidence", 0)) * 2
except Exception:
pass
return score
def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str:
if not messages:
return ""
time_groups = defaultdict(lambda: defaultdict(list))
for msg in messages:
timestamp = msg.get("timestamp")
sender = msg.get("sender", "")
content = str(msg.get("content", "")).strip()
if not sender or not content:
continue
try:
dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S")
except Exception:
dt = None
if not dt:
continue
time_key = dt.strftime("%H:%M")
sender_name = member_name_map.get(sender, sender)
time_groups[time_key][sender_name].append(content)
lines = []
for time_key in sorted(time_groups.keys()):
lines.append(f"{time_key}")
for sender_name, contents in time_groups[time_key].items():
for idx, content in enumerate(contents):
if idx == 0:
lines.append(f"{sender_name}{content}")
else:
lines.append(f" {content}")
return "\n".join(lines)
@staticmethod
def _week_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
week_start = target_date - timedelta(days=target_date.weekday())
week_end = week_start + timedelta(days=6)
week_key = f"{week_start.strftime('%Y-%m-%d')}"
return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59")
@staticmethod
def _month_period_bounds(date_value: str) -> Tuple[str, str, str]:
target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d")
month_start = target_dt.replace(day=1)
if month_start.month == 12:
next_month = month_start.replace(year=month_start.year + 1, month=1, day=1)
else:
next_month = month_start.replace(month=month_start.month + 1, day=1)
month_end = next_month - timedelta(days=1)
month_key = month_start.strftime("%Y-%m")
return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")