# -*- coding: utf-8 -*- import json import re from collections import defaultdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from loguru import logger from db.contacts_db import ContactsDBOperator from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB from plugins.member_context.dify_client import DifyClient from utils.compress_chat_data import compress_chat_data class MemberDigestService: """成员分层摘要服务""" def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB, digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None): self.contacts_db = contacts_db self.message_db = message_db self.digest_db = digest_db self.LOG = logger self.plugin_config = plugin_config or {} api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) self.dify_client = DifyClient(api_config) self.ai_enabled = self.dify_client.enabled self.ai_base_url = self.dify_client.base_url self.ai_api_key = self.dify_client.api_key self.ai_endpoint = self.dify_client.endpoint self.ai_timeout = self.dify_client.timeout self.bootstrap_days = int(profile_config.get("bootstrap_days", 365)) self.daily_message_limit = int(profile_config.get("daily_message_limit", 120)) self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6)) self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 0)) self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16)) self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12)) self.final_daily_limit = int(profile_config.get("final_daily_limit", 8)) self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6)) self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6)) self.group_digest_days = int(profile_config.get("group_digest_days", 1)) def ensure_recent_group_daily_digests(self, chatroom_id: str, days: Optional[int] = None, force: bool = False) -> Dict: bootstrap_mode = False if days is None: bootstrap_mode = self._should_bootstrap_group(chatroom_id) days = self.bootstrap_days if bootstrap_mode else self.group_digest_days built_daily = 0 touched_members = set() processed_dates = [] for offset in range(days): target_date = (datetime.now() - timedelta(days=offset + 1)).strftime("%Y-%m-%d") digests = self._build_group_daily_digests(chatroom_id, target_date, force=force) processed_dates.append(target_date) for digest in digests: self.digest_db.save_digest(digest) built_daily += 1 touched_members.add(digest.get("wxid")) mode = "bootstrap" if bootstrap_mode else "incremental" if bootstrap_mode: bootstrap_status = "done" if built_daily > 0 or touched_members else "empty" self.digest_db.save_group_state( chatroom_id, bootstrap_status=bootstrap_status, bootstrap_days=days, built_daily_count=built_daily, touched_member_count=len(touched_members), extra={"processed_dates": processed_dates}, ) self.LOG.info( f"[成员交互摘要][群日摘要窗口] group={chatroom_id}, mode={mode}, days={days}, " f"built_daily={built_daily}, touched_members={len(touched_members)}" ) return { "built_daily": built_daily, "touched_members": list(touched_members), "days": days, "mode": mode, "processed_dates": processed_dates, } def _should_bootstrap_group(self, chatroom_id: str) -> bool: try: group_state = self.digest_db.get_group_state(chatroom_id) if group_state and group_state.get("bootstrap_status") in {"done", "empty"}: return False sql = """ SELECT 1 FROM t_member_digest WHERE chatroom_id = %s AND digest_type = 'daily' LIMIT 1 """ row = self.digest_db.execute_query(sql, (chatroom_id,), fetch_one=True) return not bool(row) except Exception as e: self.LOG.warning(f"[成员交互摘要] 检查群初始化状态失败,按增量处理: group={chatroom_id}, error={e}") return False def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False, enable_weekly: bool = True, enable_monthly: bool = True) -> Dict: member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} display_name = member.get("display_name") or member.get("nick_name") or wxid all_daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400) if not all_daily_digests: return { "display_name": display_name, "daily_digests": [], "weekly_digests": [], "monthly_digests": [], "all_daily_digests": [], "all_weekly_digests": [], "all_monthly_digests": [], "stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0, "built_daily": 0}, } built_weekly = 0 built_monthly = 0 # 周/月摘要改成“触发即补偿”的思路: # 1. 只要进入周/月任务,就检查历史上所有“已完结但缺失”的周期; # 2. 缺了就补,不再依赖“今天刚好是不是周日/月末”这种单点窗口; # 3. 具体是否跳过当前未完结周期,放到 _ensure_weekly_digests / _ensure_monthly_digests 内部判断。 if enable_weekly: built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force) # 月摘要依赖周摘要,所以这里默认在周摘要补偿完成后再继续检查月摘要缺口。 if enable_monthly: built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force) all_weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200) all_monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=120) daily_digests = all_daily_digests[:self.final_daily_limit] weekly_digests = all_weekly_digests[:self.final_weekly_limit] monthly_digests = all_monthly_digests[:self.final_monthly_limit] return { "display_name": display_name, "daily_digests": daily_digests, "weekly_digests": weekly_digests, "monthly_digests": monthly_digests, "all_daily_digests": all_daily_digests, "all_weekly_digests": all_weekly_digests, "all_monthly_digests": all_monthly_digests, "stats": { "daily": len(all_daily_digests), "weekly": len(all_weekly_digests), "monthly": len(all_monthly_digests), "active_days": len(all_daily_digests), "built_daily": 0, "built_weekly": built_weekly, "built_monthly": built_monthly, }, } @staticmethod def _get_closed_reference_date() -> datetime: # 摘要只基于“已经完整结束的一天”做补偿判断。 # 例如凌晨跑任务时,当天仍在进行中,所以统一以“昨天”为参照, # 这样就能稳定地判断出“哪些周/月已经完结,哪些还是当前进行中的周期”。 return datetime.now() - timedelta(days=1) @staticmethod def _normalize_profile_item(item: Dict) -> Dict: normalized = {} list_keys = { "topics", "identity_clues", "skill_signals", "family_signals", "life_stage_signals", "discussion_scenarios", "problem_solving_signals", "value_preferences", "habit_signals", "expression_markers", "engagement_traits", "reply_entry_points", "reply_taboos", "representative_messages", "stable_topics", "identity_traits", "skill_profile", "problem_solving_profile", "family_profile", "life_stage_profile", "value_profile", "stable_traits", "habit_patterns", "expression_profile", "reply_entry_profile", "reply_preferences", "long_term_topics", "long_term_reply_preferences", "common_scenarios", "phase_state", "recent_state" } for key, value in item.items(): if key in list_keys: if isinstance(value, list): normalized[key] = [str(v).strip() for v in value if str(v).strip()] elif value: normalized[key] = [str(value).strip()] else: normalized[key] = [] elif isinstance(value, (int, float)): normalized[key] = value else: normalized[key] = str(value).strip() return normalized def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400) if not daily_digests: return 0 grouped = defaultdict(list) for item in daily_digests: week_key, _, _ = self._week_period_bounds(item.get("period_key")) grouped[week_key].append(item) existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly")) reference_date = self._get_closed_reference_date() current_week_key, _, _ = self._week_period_bounds(reference_date.strftime("%Y-%m-%d")) built = 0 for week_key, items in sorted(grouped.items()): if len(items) < 2: continue # 非强制模式下,只补“已完结”的历史周: # 1. 当前参照周还没走完,不能提前生成; # 2. 已存在的历史周不重复生成; # 3. 这样周任务每次触发时,都能把之前漏掉的周摘要自动补齐。 if not force and week_key == current_week_key: continue if not force and week_key in existing_keys: continue period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key")) digest = self._build_period_digest( "weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items ) if digest: self.digest_db.save_digest(digest) built += 1 self.LOG.info( f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, " f"week={period_key}, days={len(items)}" ) return built def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200) if not weekly_digests: return 0 grouped = defaultdict(list) for item in weekly_digests: month_key, _, _ = self._month_period_bounds(item.get("period_end")) grouped[month_key].append(item) existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly")) reference_date = self._get_closed_reference_date() current_month_key, _, _ = self._month_period_bounds(reference_date.strftime("%Y-%m-%d")) built = 0 for month_key, items in sorted(grouped.items()): if len(items) < 2: continue # 月摘要同样只补“已经完结的月份”: # 1. 当前月仍可能继续产生新周摘要,不能过早固化; # 2. 历史缺失月份在月任务/周任务触发时都会被补齐; # 3. 这样即使某次月任务漏跑,后续任务也能自动追平。 if not force and month_key == current_month_key: continue if not force and month_key in existing_keys: continue period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end")) digest = self._build_period_digest( "monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items ) if digest: self.digest_db.save_digest(digest) built += 1 self.LOG.info( f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, " f"month={period_key}, weeks={len(items)}" ) return built def _build_group_daily_digests(self, chatroom_id: str, digest_date: str, force: bool = False) -> List[Dict]: members = self.contacts_db.get_chatroom_member_list(chatroom_id) or [] member_name_map = {} for member in members: wxid = member.get("wxid") if not wxid: continue member_name_map[wxid] = member.get("display_name") or member.get("nick_name") or wxid messages = self.message_db.get_member_messages_for_group_date(chatroom_id, digest_date) if not messages: return [] sender_messages = defaultdict(list) for msg in messages: wxid = msg.get("sender") if not wxid: continue sender_messages[wxid].append(msg) candidate_wxids = [ wxid for wxid, items in sender_messages.items() if len(items) >= self.daily_digest_min_messages ] if not candidate_wxids: return [] pending_wxids = [] for wxid in candidate_wxids: if not force and self.digest_db.get_digest(chatroom_id, wxid, "daily", digest_date): continue pending_wxids.append(wxid) if not pending_wxids: self.LOG.info( f"[成员交互摘要][群日批处理] 跳过: group={chatroom_id}, date={digest_date}, " f"reason=all_candidates_already_built, candidates={len(candidate_wxids)}" ) return [] member_labels = [f"{wxid} | {member_name_map.get(wxid, wxid)}" for wxid in pending_wxids] compact_chat = self._format_group_messages_optimized(messages, member_name_map) try: compact_chat = compress_chat_data(compact_chat) except Exception as e: self.LOG.warning(f"[成员交互摘要] 压缩群日消息失败: group={chatroom_id}, date={digest_date}, error={e}") parsed_members = self._request_group_daily_json(chatroom_id, digest_date, member_labels, compact_chat) parsed_map = {item.get("wxid"): item for item in parsed_members if item.get("wxid")} if parsed_members else {} digests = [] for wxid in pending_wxids: parsed = parsed_map.get(wxid) if not parsed: self.LOG.warning( f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): " f"group={chatroom_id}, date={digest_date}, wxid={wxid}, " f"source_count={len(sender_messages.get(wxid, []))}" ) continue parsed = self._normalize_profile_item(parsed) digests.append({ "chatroom_id": chatroom_id, "wxid": wxid, "digest_type": "daily", "period_key": digest_date, "period_start": f"{digest_date} 00:00:00", "period_end": f"{digest_date} 23:59:59", "display_name": member_name_map.get(wxid, wxid), "source_count": len(sender_messages.get(wxid, [])), "summary_text": parsed.get("summary_text", ""), "structured": parsed, "meta": { "source_type": "group_daily_messages", "representative_messages": parsed.get("representative_messages", []), }, }) self.LOG.info( f"[成员交互摘要][群日批处理] 完成: group={chatroom_id}, date={digest_date}, " f"candidates={len(candidate_wxids)}, pending={len(pending_wxids)}, built={len(digests)}" ) return digests def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str, period_key: str, period_start: str, period_end: str, items: List[Dict]) -> Optional[Dict]: parsed = self._request_period_json( digest_type=digest_type, chatroom_id=chatroom_id, wxid=wxid, display_name=display_name, period_key=period_key, items=items, ) if not parsed: self.LOG.warning( f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): " f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}, " f"last_error={self.dify_client.last_error}" ) return None parsed = self._normalize_profile_item(parsed) return { "chatroom_id": chatroom_id, "wxid": wxid, "digest_type": digest_type, "period_key": period_key, "period_start": period_start, "period_end": period_end, "display_name": display_name, "source_count": len(items), "summary_text": parsed.get("summary_text", ""), "structured": parsed, "meta": { "source_keys": [item.get("period_key") for item in items], }, } def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]: if not self.dify_client.is_available(): return None response = self.dify_client.run( prompt=prompt, user=f"member-digest:{chatroom_id}:{wxid}:{tag}", inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag}, tag=tag, ) if not response: return None parsed = self._parse_ai_answer(response.get("text", "")) if parsed: parsed["ai_usage"] = response.get("usage", {}) or {} return parsed def _request_period_json(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str, period_key: str, items: List[Dict]) -> Optional[Dict]: if not self.dify_client.is_available(): return None inputs = { "digest_type": digest_type, "chatroom_id": chatroom_id, "wxid": wxid, "display_name": display_name, "period_key": period_key, "source_items_json": json.dumps(self._build_period_source_items(items), ensure_ascii=False), "source_item_count": str(len(items)), } response = self.dify_client.run( prompt="", user=f"member-digest:{chatroom_id}:{wxid}:{digest_type}:{period_key}", inputs=inputs, tag=f"{digest_type}:{period_key}", ) if not response: return None parsed = self._parse_ai_answer(response.get("text", "")) if parsed: parsed["ai_usage"] = response.get("usage", {}) or {} return parsed def _request_group_daily_json(self, chatroom_id: str, digest_date: str, member_labels: List[str], compressed_chat: str) -> List[Dict]: if not self.dify_client.is_available(): return [] response = self.dify_client.run( prompt="", user=f"member-digest:{chatroom_id}:group-daily:{digest_date}", inputs={ "digest_type": "daily", "chatroom_id": chatroom_id, "digest_date": digest_date, "member_labels": "\n".join(member_labels), "compressed_chat": compressed_chat, }, tag=f"group-daily:{digest_date}", ) if not response: return [] parsed = self._parse_group_daily_answer(response.get("text", "")) return parsed @staticmethod def _build_period_source_items(items: List[Dict]) -> List[Dict]: source_items = [] for item in items: structured = item.get("structured", {}) or {} source_items.append({ "period_key": item.get("period_key"), "summary_text": item.get("summary_text", ""), "topics": structured.get("topics") or structured.get("stable_topics") or structured.get("long_term_topics") or [], "discussion_scenarios": structured.get("discussion_scenarios") or structured.get("common_scenarios") or [], "identity_clues": structured.get("identity_clues") or structured.get("identity_traits") or [], "skill_signals": structured.get("skill_signals") or structured.get("skill_profile") or [], "problem_solving_signals": structured.get("problem_solving_signals") or structured.get("problem_solving_profile") or [], "family_signals": structured.get("family_signals") or structured.get("family_profile") or [], "life_stage_signals": structured.get("life_stage_signals") or structured.get("life_stage_profile") or [], "value_preferences": structured.get("value_preferences") or structured.get("value_profile") or [], "habit_signals": structured.get("habit_signals") or structured.get("habit_patterns") or [], "expression_markers": structured.get("expression_markers") or structured.get("expression_profile") or [], "engagement_traits": structured.get("engagement_traits") or structured.get("stable_traits") or [], "reply_entry_points": structured.get("reply_entry_points") or structured.get("reply_entry_profile") or [], "reply_preferences": structured.get("reply_preferences") or structured.get("long_term_reply_preferences") or [], "social_role": structured.get("social_role") or structured.get("group_role") or "", "decision_style": structured.get("decision_style") or structured.get("decision_profile") or "", "temperament_signal": structured.get("temperament_signal") or structured.get("temperament_tendency") or "", "recent_state": structured.get("recent_state") or structured.get("phase_state") or [], }) return source_items def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: return None text = answer.strip() match = re.search(r"\{.*\}", text, re.S) if match: text = match.group(0) try: data = json.loads(text) except Exception: return None normalized = {} for key, value in data.items(): if isinstance(value, list): normalized[key] = [str(item).strip() for item in value if str(item).strip()] elif isinstance(value, (int, float)): normalized[key] = value else: normalized[key] = str(value).strip() return normalized def _parse_group_daily_answer(self, answer: str) -> List[Dict]: if not answer: return [] text = answer.strip() match = re.search(r"\{.*\}", text, re.S) if match: text = match.group(0) try: parsed = json.loads(text) except Exception: return [] members = parsed.get("members", []) if not isinstance(members, list): return [] normalized_map = {} for item in members: if not isinstance(item, dict): continue normalized_item = self._normalize_profile_item(item) wxid = normalized_item.get("wxid") if not wxid: continue existing = normalized_map.get(wxid) if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing): normalized_map[wxid] = normalized_item return list(normalized_map.values()) @staticmethod def _score_profile_item(item: Dict) -> float: if not item: return 0.0 score = 0.0 for key, value in item.items(): if key in {"wxid", "display_name"}: continue if isinstance(value, list): score += len([v for v in value if str(v).strip()]) * 1.0 elif isinstance(value, (int, float)): score += float(value) elif str(value).strip(): score += 0.8 try: score += float(item.get("confidence", 0)) * 2 except Exception: pass return score def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str: if not messages: return "" time_groups = defaultdict(lambda: defaultdict(list)) for msg in messages: timestamp = msg.get("timestamp") sender = msg.get("sender", "") content = str(msg.get("content", "")).strip() if not sender or not content: continue try: dt = datetime.strptime(str(timestamp), "%Y-%m-%d %H:%M:%S") except Exception: dt = None if not dt: continue time_key = dt.strftime("%H:%M") sender_name = member_name_map.get(sender, sender) time_groups[time_key][sender_name].append(content) lines = [] for time_key in sorted(time_groups.keys()): lines.append(f"【{time_key}】") for sender_name, contents in time_groups[time_key].items(): for idx, content in enumerate(contents): if idx == 0: lines.append(f"{sender_name}:{content}") else: lines.append(f" {content}") return "\n".join(lines) @staticmethod def _week_period_bounds(date_value: str) -> Tuple[str, str, str]: target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") week_start = target_date - timedelta(days=target_date.weekday()) week_end = week_start + timedelta(days=6) week_key = f"{week_start.strftime('%Y-%m-%d')}" return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59") @staticmethod def _month_period_bounds(date_value: str) -> Tuple[str, str, str]: target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") month_start = target_dt.replace(day=1) if month_start.month == 12: next_month = month_start.replace(year=month_start.year + 1, month=1, day=1) else: next_month = month_start.replace(month=month_start.month + 1, day=1) month_end = next_month - timedelta(days=1) month_key = month_start.strftime("%Y-%m") return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")