# -*- coding: utf-8 -*- import json import re from collections import defaultdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import requests from loguru import logger from db.contacts_db import ContactsDBOperator from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB from plugins.member_context.prompt_builder import MemberContextPromptBuilder class MemberDigestService: """成员分层摘要服务""" def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB, digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None): self.contacts_db = contacts_db self.message_db = message_db self.digest_db = digest_db self.LOG = logger self.plugin_config = plugin_config or {} api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False))) self.ai_base_url = (api_config.get("base_url") or "").rstrip("/") self.ai_api_key = api_config.get("api_key", "") self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/") self.ai_timeout = int(api_config.get("request_timeout", 60)) self.bootstrap_days = int(profile_config.get("bootstrap_days", 365)) self.daily_message_limit = int(profile_config.get("daily_message_limit", 120)) self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6)) self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 45)) self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16)) self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12)) self.final_daily_limit = int(profile_config.get("final_daily_limit", 8)) self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6)) self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6)) def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict: member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} display_name = member.get("display_name") or member.get("nick_name") or wxid active_dates = self.message_db.get_member_active_dates(chatroom_id, wxid, days=self.bootstrap_days) if not active_dates: return { "display_name": display_name, "daily_digests": [], "weekly_digests": [], "monthly_digests": [], "stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0}, } built_daily = self._ensure_daily_digests(chatroom_id, wxid, display_name, active_dates, force=force) built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force) built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force) daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=self.final_daily_limit) weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=self.final_weekly_limit) monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=self.final_monthly_limit) return { "display_name": display_name, "daily_digests": daily_digests, "weekly_digests": weekly_digests, "monthly_digests": monthly_digests, "stats": { "daily": len(daily_digests), "weekly": len(weekly_digests), "monthly": len(monthly_digests), "active_days": len(active_dates), "built_daily": built_daily, "built_weekly": built_weekly, "built_monthly": built_monthly, }, } def _ensure_daily_digests(self, chatroom_id: str, wxid: str, display_name: str, active_dates: List[Dict], force: bool = False) -> int: existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily")) built = 0 processed = 0 sorted_dates = sorted(active_dates, key=lambda item: str(item.get("message_date"))) current_day = datetime.now().strftime("%Y-%m-%d") for item in sorted_dates: period_key = str(item.get("message_date")) msg_count = int(item.get("msg_count", 0)) if msg_count < self.daily_digest_min_messages: continue if not force and period_key in existing_keys and period_key != current_day: continue messages = self.message_db.get_member_messages_on_date( chatroom_id, wxid, period_key, limit=self.daily_message_limit ) if len(messages) < self.daily_digest_min_messages: continue digest = self._build_daily_digest(chatroom_id, wxid, display_name, period_key, messages) if digest: self.digest_db.save_digest(digest) built += 1 processed += 1 self.LOG.info( f"[成员交互摘要][日摘要] 完成: group={chatroom_id}, wxid={wxid}, " f"date={period_key}, messages={len(messages)}" ) if not force and processed >= self.max_daily_digests_per_run: break return built def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400) grouped = defaultdict(list) for item in daily_digests: week_key, _, _ = self._week_period_bounds(item.get("period_key")) grouped[week_key].append(item) existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly")) current_week_key, _, _ = self._week_period_bounds(datetime.now().strftime("%Y-%m-%d")) built = 0 for week_key, items in sorted(grouped.items()): if len(items) < 2: continue if not force and week_key in existing_keys and week_key != current_week_key: continue period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key")) digest = self._build_period_digest( "weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items ) if digest: self.digest_db.save_digest(digest) built += 1 self.LOG.info( f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, " f"week={period_key}, days={len(items)}" ) return built def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200) grouped = defaultdict(list) for item in weekly_digests: month_key, _, _ = self._month_period_bounds(item.get("period_end")) grouped[month_key].append(item) existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly")) current_month_key, _, _ = self._month_period_bounds(datetime.now().strftime("%Y-%m-%d")) built = 0 for month_key, items in sorted(grouped.items()): if len(items) < 2: continue if not force and month_key in existing_keys and month_key != current_month_key: continue period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end")) digest = self._build_period_digest( "monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items ) if digest: self.digest_db.save_digest(digest) built += 1 self.LOG.info( f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, " f"month={period_key}, weeks={len(items)}" ) return built def _build_daily_digest(self, chatroom_id: str, wxid: str, display_name: str, digest_date: str, messages: List[Dict]) -> Optional[Dict]: prompt = MemberContextPromptBuilder.build_daily_digest_prompt( chatroom_id, wxid, display_name, digest_date, messages ) parsed = self._request_ai_json(prompt, tag=f"daily:{digest_date}", chatroom_id=chatroom_id, wxid=wxid) if not parsed: parsed = self._build_daily_digest_fallback(messages) if not parsed: return None period_start = f"{digest_date} 00:00:00" period_end = f"{digest_date} 23:59:59" return { "chatroom_id": chatroom_id, "wxid": wxid, "digest_type": "daily", "period_key": digest_date, "period_start": period_start, "period_end": period_end, "display_name": display_name, "source_count": len(messages), "summary_text": parsed.get("summary_text", ""), "structured": parsed, "meta": { "source_type": "messages", "representative_messages": parsed.get("representative_messages", []), }, } def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str, period_key: str, period_start: str, period_end: str, items: List[Dict]) -> Optional[Dict]: prompt = MemberContextPromptBuilder.build_period_digest_prompt( digest_type, chatroom_id, wxid, display_name, period_key, items ) parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid) if not parsed: parsed = self._build_period_digest_fallback(digest_type, items) if not parsed: return None return { "chatroom_id": chatroom_id, "wxid": wxid, "digest_type": digest_type, "period_key": period_key, "period_start": period_start, "period_end": period_end, "display_name": display_name, "source_count": len(items), "summary_text": parsed.get("summary_text", ""), "structured": parsed, "meta": { "source_keys": [item.get("period_key") for item in items], }, } def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]: if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: return None headers = { "Authorization": f"Bearer {self.ai_api_key}", "Content-Type": "application/json", } payload = { "inputs": {"query": prompt}, "response_mode": "blocking", "user": f"member-digest:{chatroom_id}:{wxid}:{tag}", } url = f"{self.ai_base_url}/{self.ai_endpoint}" try: self.LOG.info(f"[成员交互摘要][AI] 发起摘要请求: group={chatroom_id}, wxid={wxid}, tag={tag}") response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) response.raise_for_status() data = response.json() parsed = self._parse_ai_answer(data.get("answer", "")) if parsed: usage = (data.get("metadata") or {}).get("usage", {}) or {} parsed["ai_usage"] = usage return parsed except Exception as e: self.LOG.warning(f"[成员交互摘要][AI] 摘要请求失败: group={chatroom_id}, wxid={wxid}, tag={tag}, error={e}") return None def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: return None text = answer.strip() match = re.search(r"\{.*\}", text, re.S) if match: text = match.group(0) try: data = json.loads(text) except Exception: return None normalized = {} for key, value in data.items(): if isinstance(value, list): normalized[key] = [str(item).strip() for item in value if str(item).strip()] elif isinstance(value, (int, float)): normalized[key] = value else: normalized[key] = str(value).strip() return normalized def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]: if not messages: return None contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")] if not contents: return None short_samples = [content[:60] for content in contents[:3]] avg_len = sum(len(content) for content in contents) / max(len(contents), 1) message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度" return { "topics": [], "interaction_style": "自然跟随式互动", "message_pattern": message_pattern, "response_style_hint": "保持简洁自然,先回应核心点", "habit_signals": [], "engagement_traits": [], "reply_taboos": [], "temperament_signal": "当天样本有限,暂以中性沟通观察为主", "summary_text": f"当日消息约{len(messages)}条,{message_pattern}。", "representative_messages": short_samples, "confidence": 0.35, } def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]: if not items: return None topic_counts = defaultdict(int) trait_counts = defaultdict(int) habit_counts = defaultdict(int) reply_counts = defaultdict(int) temperament_values = [] for item in items: structured = item.get("structured", {}) or {} for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []): topic_counts[topic] += 1 for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []): trait_counts[trait] += 1 for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []): habit_counts[habit] += 1 for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []): reply_counts[pref] += 1 if structured.get("temperament_signal"): temperament_values.append(structured.get("temperament_signal")) if structured.get("temperament_tendency"): temperament_values.append(structured.get("temperament_tendency")) top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]] top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]] top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]] top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]] temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征" if digest_type == "weekly": return { "stable_topics": top_topics, "stable_traits": top_traits, "habit_patterns": top_habits, "reply_preferences": top_reply, "recent_state": top_topics[:3], "temperament_tendency": temperament, "summary_text": "本周沟通特征已按重复信号汇总。", "confidence": 0.45, } return { "long_term_topics": top_topics, "stable_traits": top_traits, "habit_patterns": top_habits, "long_term_reply_preferences": top_reply, "phase_state": top_topics[:3], "temperament_tendency": temperament, "summary_text": "本月沟通特征已按周摘要汇总。", "confidence": 0.5, } @staticmethod def _week_period_bounds(date_value: str) -> Tuple[str, str, str]: target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") week_start = target_date - timedelta(days=target_date.weekday()) week_end = week_start + timedelta(days=6) week_key = f"{week_start.strftime('%Y-%m-%d')}" return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59") @staticmethod def _month_period_bounds(date_value: str) -> Tuple[str, str, str]: target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") month_start = target_dt.replace(day=1) if month_start.month == 12: next_month = month_start.replace(year=month_start.year + 1, month=1, day=1) else: next_month = month_start.replace(month=month_start.month + 1, day=1) month_end = next_month - timedelta(days=1) month_key = month_start.strftime("%Y-%m") return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59")