diff --git a/admin/dashboard/blueprints/contacts.py b/admin/dashboard/blueprints/contacts.py index 7d8f999..4a513d9 100644 --- a/admin/dashboard/blueprints/contacts.py +++ b/admin/dashboard/blueprints/contacts.py @@ -260,9 +260,13 @@ def api_group_member_context(roomid, wxid): if not server.member_context_service.is_group_enabled(roomid): return jsonify({"success": False, "error": "该群未启用成员交互摘要功能"}), 403 context = server.member_context_db.get_member_context(roomid, wxid) - if not context: - context = server.member_context_service.refresh_member_context(roomid, wxid) - return jsonify({"success": True, "data": {"context": context}}) + return jsonify({ + "success": True, + "data": { + "context": context, + "ready": bool(context) + } + }) except Exception as e: logger.error(f"获取群成员交互摘要失败: {e}") return jsonify({"success": False, "error": str(e)}), 500 diff --git a/admin/dashboard/templates/contacts_management.html b/admin/dashboard/templates/contacts_management.html index 548d97a..0d53d1c 100644 --- a/admin/dashboard/templates/contacts_management.html +++ b/admin/dashboard/templates/contacts_management.html @@ -262,19 +262,58 @@ 刷新摘要 + + + {% raw %}{{ memberContext.activity_level || '-' }}{% endraw %} {% raw %}{{ memberContext.message_pattern || '-' }}{% endraw %} {% raw %}{{ memberContext.interaction_style || '-' }}{% endraw %} + {% raw %}{{ ((memberContext.meta || {}).temperament_tendency) || '-' }}{% endraw %} {% raw %}{{ memberContext.response_style_hint || '-' }}{% endraw %} + + {% raw %}{{ ((memberContext.meta || {}).observation_days || 0) }}{% endraw %} 天 + + 已进入长期画像 + + + 仍在积累 + + {% raw %}{{ item }}{% endraw %} - + + {% raw %}{{ item }}{% endraw %} + - + + + {% raw %}{{ item }}{% endraw %} + - + + + {% raw %}{{ item }}{% endraw %} + - + {% raw %}{{ item }}{% endraw %} - + + {% raw %}{{ item }}{% endraw %} + - + {% raw %}{{ item }}{% endraw %} - @@ -285,6 +324,13 @@ {% raw %}{{ memberContext.summary_text || '-' }}{% endraw %} {% raw %}{{ memberContext.source_message_count || 0 }}{% endraw %} + {% raw %}{{ ((memberContext.meta || {}).history_message_count) || 0 }}{% endraw %} + + 日 {% raw %}{{ ((memberContext.meta || {}).digest_daily_count) || 0 }}{% endraw %} + / 周 {% raw %}{{ ((memberContext.meta || {}).digest_weekly_count) || 0 }}{% endraw %} + / 月 {% raw %}{{ ((memberContext.meta || {}).digest_monthly_count) || 0 }}{% endraw %} + + {% raw %}{{ ((memberContext.meta || {}).profile_iterations) || 0 }}{% endraw %} {% raw %}{{ memberContext.last_profiled_at || '-' }}{% endraw %} diff --git a/db/member_digest_db.py b/db/member_digest_db.py new file mode 100644 index 0000000..854a8cc --- /dev/null +++ b/db/member_digest_db.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +import json +from datetime import datetime +from typing import Dict, List, Optional + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class MemberDigestDBOperator(BaseDBOperator): + """成员分层摘要数据库操作""" + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + self._create_tables() + + def _create_tables(self): + try: + self.execute_update(""" + CREATE TABLE IF NOT EXISTS t_member_digest ( + id INT AUTO_INCREMENT PRIMARY KEY, + chatroom_id VARCHAR(64) NOT NULL COMMENT '群聊ID', + wxid VARCHAR(64) NOT NULL COMMENT '成员微信ID', + digest_type VARCHAR(16) NOT NULL COMMENT '摘要类型 daily|weekly|monthly', + period_key VARCHAR(32) NOT NULL COMMENT '周期主键', + period_start DATETIME NULL COMMENT '周期开始时间', + period_end DATETIME NULL COMMENT '周期结束时间', + display_name VARCHAR(128) COMMENT '成员展示名', + source_count INT DEFAULT 0 COMMENT '源数据条数', + summary_text TEXT COMMENT '摘要说明', + structured_json LONGTEXT COMMENT '结构化摘要JSON', + meta_json LONGTEXT COMMENT '附加元数据JSON', + last_generated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '最后生成时间', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY idx_member_digest (chatroom_id, wxid, digest_type, period_key), + KEY idx_digest_lookup (chatroom_id, wxid, digest_type, period_end) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='成员分层摘要表'; + """) + except Exception as e: + self.LOG.error(f"创建成员分层摘要表失败: {e}") + + def save_digest(self, digest: Dict) -> bool: + try: + data = { + "chatroom_id": digest.get("chatroom_id", ""), + "wxid": digest.get("wxid", ""), + "digest_type": digest.get("digest_type", ""), + "period_key": digest.get("period_key", ""), + "period_start": digest.get("period_start"), + "period_end": digest.get("period_end"), + "display_name": digest.get("display_name", ""), + "source_count": digest.get("source_count", 0), + "summary_text": digest.get("summary_text", ""), + "structured_json": json.dumps(digest.get("structured", {}), ensure_ascii=False), + "meta_json": json.dumps(digest.get("meta", {}), ensure_ascii=False), + "last_generated_at": digest.get("last_generated_at", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), + } + fields = ", ".join(data.keys()) + placeholders = ", ".join(["%s"] * len(data)) + update_clause = ", ".join( + [f"{key}=VALUES({key})" for key in data.keys() if key not in ("chatroom_id", "wxid", "digest_type", "period_key")] + ) + sql = f""" + INSERT INTO t_member_digest ({fields}) + VALUES ({placeholders}) + ON DUPLICATE KEY UPDATE {update_clause} + """ + return self.execute_update(sql, tuple(data.values())) + except Exception as e: + self.LOG.error(f"保存成员分层摘要失败: {e}") + return False + + def get_digest(self, chatroom_id: str, wxid: str, digest_type: str, period_key: str) -> Optional[Dict]: + try: + sql = """ + SELECT * + FROM t_member_digest + WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s AND period_key = %s + LIMIT 1 + """ + row = self.execute_query(sql, (chatroom_id, wxid, digest_type, period_key), fetch_one=True) + return self._deserialize_row(row) + except Exception as e: + self.LOG.error(f"获取成员分层摘要失败: {e}") + return None + + def list_digests(self, chatroom_id: str, wxid: str, digest_type: str, limit: int = 20) -> List[Dict]: + try: + sql = """ + SELECT * + FROM t_member_digest + WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s + ORDER BY period_end DESC, period_key DESC + LIMIT %s + """ + rows = self.execute_query(sql, (chatroom_id, wxid, digest_type, limit)) or [] + return [self._deserialize_row(row) for row in rows] + except Exception as e: + self.LOG.error(f"获取成员分层摘要列表失败: {e}") + return [] + + def list_digest_keys(self, chatroom_id: str, wxid: str, digest_type: str) -> List[str]: + try: + sql = """ + SELECT period_key + FROM t_member_digest + WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s + """ + rows = self.execute_query(sql, (chatroom_id, wxid, digest_type)) or [] + return [str(row.get("period_key")) for row in rows if row.get("period_key")] + except Exception as e: + self.LOG.error(f"获取成员摘要key失败: {e}") + return [] + + def list_period_digests(self, chatroom_id: str, wxid: str, digest_type: str, + period_keys: List[str]) -> List[Dict]: + try: + if not period_keys: + return [] + placeholders = ", ".join(["%s"] * len(period_keys)) + sql = f""" + SELECT * + FROM t_member_digest + WHERE chatroom_id = %s AND wxid = %s AND digest_type = %s AND period_key IN ({placeholders}) + ORDER BY period_end ASC, period_key ASC + """ + params = (chatroom_id, wxid, digest_type, *period_keys) + rows = self.execute_query(sql, params) or [] + return [self._deserialize_row(row) for row in rows] + except Exception as e: + self.LOG.error(f"批量获取成员分层摘要失败: {e}") + return [] + + @staticmethod + def _deserialize_row(row: Optional[Dict]) -> Optional[Dict]: + if not row: + return row + + for key in ("structured_json", "meta_json"): + value = row.get(key) + if not value: + row[key] = {} + continue + try: + row[key] = json.loads(value) + except Exception: + row[key] = {} + + for key in ("period_start", "period_end", "last_generated_at"): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") + + row["structured"] = row.get("structured_json", {}) + row["meta"] = row.get("meta_json", {}) + return row diff --git a/db/message_storage.py b/db/message_storage.py index 403c54a..22b9dd0 100644 --- a/db/message_storage.py +++ b/db/message_storage.py @@ -60,6 +60,68 @@ class MessageStorageDB(BaseDBOperator): results = self.execute_query(sql, (days, group_id, wxid, limit)) or [] return list(reversed(results)) + def get_member_messages_since(self, group_id: str, wxid: str, since_time, limit: int = 200) -> List[Dict]: + """获取指定时间之后的成员消息""" + sql = """ + SELECT timestamp, sender, content, message_type + FROM messages + WHERE timestamp > %s + AND group_id = %s + AND sender = %s + AND message_type IN (1, 49) + AND CHAR_LENGTH(content) BETWEEN 2 AND 500 + AND content NOT LIKE '/%%' + ORDER BY timestamp ASC + LIMIT %s + """ + if isinstance(since_time, datetime): + since_time = since_time.strftime("%Y-%m-%d %H:%M:%S") + return self.execute_query(sql, (since_time, group_id, wxid, limit)) or [] + + def get_member_active_dates(self, group_id: str, wxid: str, days: int = 365) -> List[Dict]: + """获取成员在指定时间窗口内的活跃日期列表""" + sql = """ + SELECT + DATE(timestamp) AS message_date, + COUNT(*) AS msg_count, + MIN(timestamp) AS first_message_time, + MAX(timestamp) AS last_message_time + FROM messages + WHERE timestamp >= DATE_SUB(NOW(), INTERVAL %s DAY) + AND group_id = %s + AND sender = %s + AND message_type IN (1, 49) + AND CHAR_LENGTH(content) BETWEEN 2 AND 500 + AND content NOT LIKE '/%%' + GROUP BY DATE(timestamp) + ORDER BY message_date ASC + """ + rows = self.execute_query(sql, (days, group_id, wxid)) or [] + for row in rows: + for key in ("message_date", "first_message_time", "last_message_time"): + value = row.get(key) + if isinstance(value, datetime): + row[key] = value.strftime("%Y-%m-%d %H:%M:%S") if key != "message_date" else value.strftime("%Y-%m-%d") + elif value: + row[key] = str(value) + return rows + + def get_member_messages_on_date(self, group_id: str, wxid: str, target_date: str, limit: int = 120) -> List[Dict]: + """获取成员在某一天的消息""" + sql = """ + SELECT timestamp, sender, content, message_type + FROM messages + WHERE DATE(timestamp) = %s + AND group_id = %s + AND sender = %s + AND message_type IN (1, 49) + AND CHAR_LENGTH(content) BETWEEN 2 AND 500 + AND content NOT LIKE '/%%' + ORDER BY timestamp ASC + LIMIT %s + """ + return self.execute_query(sql, (target_date, group_id, wxid, limit)) or [] + def get_message_count_by_date(self, date: str) -> List[Dict]: """获取指定日期的消息统计""" sql = """ diff --git a/plugins/member_context/config.toml b/plugins/member_context/config.toml index afe5477..46974ef 100644 --- a/plugins/member_context/config.toml +++ b/plugins/member_context/config.toml @@ -12,6 +12,30 @@ request_timeout = 60 sample_days = 30 sample_message_limit = 80 refresh_limit_per_member = 200 +long_term_days = 365 +long_term_message_limit = 600 +bootstrap_days = 365 +bootstrap_message_limit = 600 +incremental_message_limit = 80 +incremental_recent_days = 7 +recalibration_days = 30 +daily_message_limit = 120 +daily_digest_min_messages = 6 +max_daily_digests_per_run = 45 +weekly_digest_limit = 16 +monthly_digest_limit = 12 +final_daily_limit = 8 +final_weekly_limit = 6 +final_monthly_limit = 6 +ai_min_member_messages = 12 +active_member_hours = 72 +min_member_messages = 3 +max_members_per_group_per_run = 30 +stale_hours = 24 +stable_decay = 0.96 +stable_max_items = 6 +stable_min_score = 0.9 +stable_ready_days = 180 [schedule] refresh_times = ["04:20"] diff --git a/plugins/member_context/digest_service.py b/plugins/member_context/digest_service.py new file mode 100644 index 0000000..310a65a --- /dev/null +++ b/plugins/member_context/digest_service.py @@ -0,0 +1,373 @@ +# -*- coding: utf-8 -*- +import json +import re +from collections import defaultdict +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Tuple + +import requests +from loguru import logger + +from db.contacts_db import ContactsDBOperator +from db.member_digest_db import MemberDigestDBOperator +from db.message_storage import MessageStorageDB +from plugins.member_context.prompt_builder import MemberContextPromptBuilder + + +class MemberDigestService: + """成员分层摘要服务""" + + def __init__(self, contacts_db: ContactsDBOperator, message_db: MessageStorageDB, + digest_db: MemberDigestDBOperator, plugin_config: Optional[Dict] = None): + self.contacts_db = contacts_db + self.message_db = message_db + self.digest_db = digest_db + self.LOG = logger + self.plugin_config = plugin_config or {} + + api_config = self.plugin_config.get("api", {}) + profile_config = self.plugin_config.get("profile", {}) + + self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False))) + self.ai_base_url = (api_config.get("base_url") or "").rstrip("/") + self.ai_api_key = api_config.get("api_key", "") + self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/") + self.ai_timeout = int(api_config.get("request_timeout", 60)) + + self.bootstrap_days = int(profile_config.get("bootstrap_days", 365)) + self.daily_message_limit = int(profile_config.get("daily_message_limit", 120)) + self.daily_digest_min_messages = int(profile_config.get("daily_digest_min_messages", 6)) + self.max_daily_digests_per_run = int(profile_config.get("max_daily_digests_per_run", 45)) + self.weekly_digest_limit = int(profile_config.get("weekly_digest_limit", 16)) + self.monthly_digest_limit = int(profile_config.get("monthly_digest_limit", 12)) + self.final_daily_limit = int(profile_config.get("final_daily_limit", 8)) + self.final_weekly_limit = int(profile_config.get("final_weekly_limit", 6)) + self.final_monthly_limit = int(profile_config.get("final_monthly_limit", 6)) + + def ensure_member_digest_pipeline(self, chatroom_id: str, wxid: str, force: bool = False) -> Dict: + member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} + display_name = member.get("display_name") or member.get("nick_name") or wxid + + active_dates = self.message_db.get_member_active_dates(chatroom_id, wxid, days=self.bootstrap_days) + if not active_dates: + return { + "display_name": display_name, + "daily_digests": [], + "weekly_digests": [], + "monthly_digests": [], + "stats": {"daily": 0, "weekly": 0, "monthly": 0, "active_days": 0}, + } + + built_daily = self._ensure_daily_digests(chatroom_id, wxid, display_name, active_dates, force=force) + built_weekly = self._ensure_weekly_digests(chatroom_id, wxid, display_name, force=force) + built_monthly = self._ensure_monthly_digests(chatroom_id, wxid, display_name, force=force) + + daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=self.final_daily_limit) + weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=self.final_weekly_limit) + monthly_digests = self.digest_db.list_digests(chatroom_id, wxid, "monthly", limit=self.final_monthly_limit) + + return { + "display_name": display_name, + "daily_digests": daily_digests, + "weekly_digests": weekly_digests, + "monthly_digests": monthly_digests, + "stats": { + "daily": len(daily_digests), + "weekly": len(weekly_digests), + "monthly": len(monthly_digests), + "active_days": len(active_dates), + "built_daily": built_daily, + "built_weekly": built_weekly, + "built_monthly": built_monthly, + }, + } + + def _ensure_daily_digests(self, chatroom_id: str, wxid: str, display_name: str, + active_dates: List[Dict], force: bool = False) -> int: + existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "daily")) + built = 0 + processed = 0 + sorted_dates = sorted(active_dates, key=lambda item: str(item.get("message_date"))) + current_day = datetime.now().strftime("%Y-%m-%d") + + for item in sorted_dates: + period_key = str(item.get("message_date")) + msg_count = int(item.get("msg_count", 0)) + if msg_count < self.daily_digest_min_messages: + continue + if not force and period_key in existing_keys and period_key != current_day: + continue + messages = self.message_db.get_member_messages_on_date( + chatroom_id, wxid, period_key, limit=self.daily_message_limit + ) + if len(messages) < self.daily_digest_min_messages: + continue + digest = self._build_daily_digest(chatroom_id, wxid, display_name, period_key, messages) + if digest: + self.digest_db.save_digest(digest) + built += 1 + processed += 1 + self.LOG.info( + f"[成员交互摘要][日摘要] 完成: group={chatroom_id}, wxid={wxid}, " + f"date={period_key}, messages={len(messages)}" + ) + if not force and processed >= self.max_daily_digests_per_run: + break + return built + + def _ensure_weekly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: + daily_digests = self.digest_db.list_digests(chatroom_id, wxid, "daily", limit=400) + grouped = defaultdict(list) + for item in daily_digests: + week_key, _, _ = self._week_period_bounds(item.get("period_key")) + grouped[week_key].append(item) + + existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "weekly")) + current_week_key, _, _ = self._week_period_bounds(datetime.now().strftime("%Y-%m-%d")) + built = 0 + for week_key, items in sorted(grouped.items()): + if len(items) < 2: + continue + if not force and week_key in existing_keys and week_key != current_week_key: + continue + period_key, period_start, period_end = self._week_period_bounds(items[0].get("period_key")) + digest = self._build_period_digest( + "weekly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items + ) + if digest: + self.digest_db.save_digest(digest) + built += 1 + self.LOG.info( + f"[成员交互摘要][周摘要] 完成: group={chatroom_id}, wxid={wxid}, " + f"week={period_key}, days={len(items)}" + ) + return built + + def _ensure_monthly_digests(self, chatroom_id: str, wxid: str, display_name: str, force: bool = False) -> int: + weekly_digests = self.digest_db.list_digests(chatroom_id, wxid, "weekly", limit=200) + grouped = defaultdict(list) + for item in weekly_digests: + month_key, _, _ = self._month_period_bounds(item.get("period_end")) + grouped[month_key].append(item) + + existing_keys = set(self.digest_db.list_digest_keys(chatroom_id, wxid, "monthly")) + current_month_key, _, _ = self._month_period_bounds(datetime.now().strftime("%Y-%m-%d")) + built = 0 + for month_key, items in sorted(grouped.items()): + if len(items) < 2: + continue + if not force and month_key in existing_keys and month_key != current_month_key: + continue + period_key, period_start, period_end = self._month_period_bounds(items[-1].get("period_end")) + digest = self._build_period_digest( + "monthly", chatroom_id, wxid, display_name, period_key, period_start, period_end, items + ) + if digest: + self.digest_db.save_digest(digest) + built += 1 + self.LOG.info( + f"[成员交互摘要][月摘要] 完成: group={chatroom_id}, wxid={wxid}, " + f"month={period_key}, weeks={len(items)}" + ) + return built + + def _build_daily_digest(self, chatroom_id: str, wxid: str, display_name: str, + digest_date: str, messages: List[Dict]) -> Optional[Dict]: + prompt = MemberContextPromptBuilder.build_daily_digest_prompt( + chatroom_id, wxid, display_name, digest_date, messages + ) + parsed = self._request_ai_json(prompt, tag=f"daily:{digest_date}", chatroom_id=chatroom_id, wxid=wxid) + if not parsed: + parsed = self._build_daily_digest_fallback(messages) + if not parsed: + return None + + period_start = f"{digest_date} 00:00:00" + period_end = f"{digest_date} 23:59:59" + return { + "chatroom_id": chatroom_id, + "wxid": wxid, + "digest_type": "daily", + "period_key": digest_date, + "period_start": period_start, + "period_end": period_end, + "display_name": display_name, + "source_count": len(messages), + "summary_text": parsed.get("summary_text", ""), + "structured": parsed, + "meta": { + "source_type": "messages", + "representative_messages": parsed.get("representative_messages", []), + }, + } + + def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str, + period_key: str, period_start: str, period_end: str, + items: List[Dict]) -> Optional[Dict]: + prompt = MemberContextPromptBuilder.build_period_digest_prompt( + digest_type, chatroom_id, wxid, display_name, period_key, items + ) + parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid) + if not parsed: + parsed = self._build_period_digest_fallback(digest_type, items) + if not parsed: + return None + + return { + "chatroom_id": chatroom_id, + "wxid": wxid, + "digest_type": digest_type, + "period_key": period_key, + "period_start": period_start, + "period_end": period_end, + "display_name": display_name, + "source_count": len(items), + "summary_text": parsed.get("summary_text", ""), + "structured": parsed, + "meta": { + "source_keys": [item.get("period_key") for item in items], + }, + } + + def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]: + if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: + return None + headers = { + "Authorization": f"Bearer {self.ai_api_key}", + "Content-Type": "application/json", + } + payload = { + "inputs": {"query": prompt}, + "response_mode": "blocking", + "user": f"member-digest:{chatroom_id}:{wxid}:{tag}", + } + url = f"{self.ai_base_url}/{self.ai_endpoint}" + try: + self.LOG.info(f"[成员交互摘要][AI] 发起摘要请求: group={chatroom_id}, wxid={wxid}, tag={tag}") + response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) + response.raise_for_status() + data = response.json() + parsed = self._parse_ai_answer(data.get("answer", "")) + if parsed: + usage = (data.get("metadata") or {}).get("usage", {}) or {} + parsed["ai_usage"] = usage + return parsed + except Exception as e: + self.LOG.warning(f"[成员交互摘要][AI] 摘要请求失败: group={chatroom_id}, wxid={wxid}, tag={tag}, error={e}") + return None + + def _parse_ai_answer(self, answer: str) -> Optional[Dict]: + if not answer: + return None + text = answer.strip() + match = re.search(r"\{.*\}", text, re.S) + if match: + text = match.group(0) + try: + data = json.loads(text) + except Exception: + return None + normalized = {} + for key, value in data.items(): + if isinstance(value, list): + normalized[key] = [str(item).strip() for item in value if str(item).strip()] + elif isinstance(value, (int, float)): + normalized[key] = value + else: + normalized[key] = str(value).strip() + return normalized + + def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]: + if not messages: + return None + contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")] + if not contents: + return None + short_samples = [content[:60] for content in contents[:3]] + avg_len = sum(len(content) for content in contents) / max(len(contents), 1) + message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度" + return { + "topics": [], + "interaction_style": "自然跟随式互动", + "message_pattern": message_pattern, + "response_style_hint": "保持简洁自然,先回应核心点", + "habit_signals": [], + "engagement_traits": [], + "reply_taboos": [], + "temperament_signal": "当天样本有限,暂以中性沟通观察为主", + "summary_text": f"当日消息约{len(messages)}条,{message_pattern}。", + "representative_messages": short_samples, + "confidence": 0.35, + } + + def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]: + if not items: + return None + topic_counts = defaultdict(int) + trait_counts = defaultdict(int) + habit_counts = defaultdict(int) + reply_counts = defaultdict(int) + temperament_values = [] + for item in items: + structured = item.get("structured", {}) or {} + for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []): + topic_counts[topic] += 1 + for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []): + trait_counts[trait] += 1 + for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []): + habit_counts[habit] += 1 + for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []): + reply_counts[pref] += 1 + if structured.get("temperament_signal"): + temperament_values.append(structured.get("temperament_signal")) + if structured.get("temperament_tendency"): + temperament_values.append(structured.get("temperament_tendency")) + + top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]] + top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]] + top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]] + top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]] + temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征" + + if digest_type == "weekly": + return { + "stable_topics": top_topics, + "stable_traits": top_traits, + "habit_patterns": top_habits, + "reply_preferences": top_reply, + "recent_state": top_topics[:3], + "temperament_tendency": temperament, + "summary_text": "本周沟通特征已按重复信号汇总。", + "confidence": 0.45, + } + + return { + "long_term_topics": top_topics, + "stable_traits": top_traits, + "habit_patterns": top_habits, + "long_term_reply_preferences": top_reply, + "phase_state": top_topics[:3], + "temperament_tendency": temperament, + "summary_text": "本月沟通特征已按周摘要汇总。", + "confidence": 0.5, + } + + @staticmethod + def _week_period_bounds(date_value: str) -> Tuple[str, str, str]: + target_date = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") + week_start = target_date - timedelta(days=target_date.weekday()) + week_end = week_start + timedelta(days=6) + week_key = f"{week_start.strftime('%Y-%m-%d')}" + return week_key, week_start.strftime("%Y-%m-%d 00:00:00"), week_end.strftime("%Y-%m-%d 23:59:59") + + @staticmethod + def _month_period_bounds(date_value: str) -> Tuple[str, str, str]: + target_dt = datetime.strptime(str(date_value)[:10], "%Y-%m-%d") + month_start = target_dt.replace(day=1) + if month_start.month == 12: + next_month = month_start.replace(year=month_start.year + 1, month=1, day=1) + else: + next_month = month_start.replace(month=month_start.month + 1, day=1) + month_end = next_month - timedelta(days=1) + month_key = month_start.strftime("%Y-%m") + return month_key, month_start.strftime("%Y-%m-%d 00:00:00"), month_end.strftime("%Y-%m-%d 23:59:59") diff --git a/plugins/member_context/prompt_builder.py b/plugins/member_context/prompt_builder.py new file mode 100644 index 0000000..1b17d90 --- /dev/null +++ b/plugins/member_context/prompt_builder.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +import json +from typing import Dict, List + + +class MemberContextPromptBuilder: + """成员分层画像提示词构建器""" + + @staticmethod + def build_daily_digest_prompt(chatroom_id: str, wxid: str, display_name: str, + digest_date: str, messages: List[Dict]) -> str: + lines = [] + for msg in messages[-80:]: + ts = str(msg.get("timestamp", ""))[11:16] + content = (msg.get("content") or "").replace("\n", " ").strip()[:180] + if content: + lines.append(f"[{ts}] {content}") + + return ( + "你是微信群后台的成员日观察摘要生成器。\n" + "请仅基于给定的当日公开聊天记录,提取对后续互动有帮助的中性行为观察。\n" + "不要做人格诊断、隐私猜测、负面评价,不要脑补群外信息。\n" + "输出严格 JSON,不要 markdown。\n" + "{" + "\"topics\":[\"主题1\"]," + "\"interaction_style\":\"一句中文\"," + "\"message_pattern\":\"一句中文\"," + "\"response_style_hint\":\"一句中文\"," + "\"habit_signals\":[\"信号1\"]," + "\"engagement_traits\":[\"特征1\"]," + "\"reply_taboos\":[\"避坑1\"]," + "\"temperament_signal\":\"一句中文,描述当天显露的沟通倾向,必须克制\"," + "\"summary_text\":\"一段不超过100字的日摘要\"," + "\"representative_messages\":[\"原话1\",\"原话2\"]," + "\"confidence\":0.0" + "}\n" + "要求:\n" + "1. topics 最多4个,habit_signals 最多4个,engagement_traits 最多4个,reply_taboos 最多3个。\n" + "2. temperament_signal 只能写当日可观察到的沟通倾向,不可上升为长期性格判断。\n" + "3. representative_messages 保留最能代表当天风格的短句,最多3条。\n" + f"成员: {display_name} ({wxid})\n" + f"群ID: {chatroom_id}\n" + f"日期: {digest_date}\n" + "当日消息:\n" + ("\n".join(lines) or "暂无") + ) + + @staticmethod + def build_period_digest_prompt(digest_type: str, chatroom_id: str, wxid: str, + display_name: str, period_key: str, items: List[Dict]) -> str: + structured_lines = [] + for item in items: + structured = item.get("structured", {}) or {} + payload = { + "period_key": item.get("period_key"), + "summary_text": item.get("summary_text", ""), + "topics": structured.get("topics") or structured.get("stable_topics") or structured.get("long_term_topics") or [], + "habit_signals": structured.get("habit_signals") or structured.get("habit_patterns") or [], + "engagement_traits": structured.get("engagement_traits") or structured.get("stable_traits") or [], + "reply_preferences": structured.get("reply_preferences") or structured.get("long_term_reply_preferences") or [], + "temperament_signal": structured.get("temperament_signal") or structured.get("temperament_tendency") or "", + "recent_state": structured.get("recent_state") or [], + } + structured_lines.append(json.dumps(payload, ensure_ascii=False)) + + if digest_type == "weekly": + schema = ( + "{" + "\"stable_topics\":[\"主题1\"]," + "\"stable_traits\":[\"特征1\"]," + "\"habit_patterns\":[\"习惯1\"]," + "\"reply_preferences\":[\"偏好1\"]," + "\"recent_state\":[\"状态1\"]," + "\"temperament_tendency\":\"一句中文\"," + "\"summary_text\":\"一段不超过120字的周摘要\"," + "\"confidence\":0.0" + "}" + ) + extra = "请从多个日摘要中提炼本周重复出现的模式,过滤单日噪音。" + else: + schema = ( + "{" + "\"long_term_topics\":[\"主题1\"]," + "\"stable_traits\":[\"特征1\"]," + "\"habit_patterns\":[\"习惯1\"]," + "\"long_term_reply_preferences\":[\"偏好1\"]," + "\"phase_state\":[\"状态1\"]," + "\"temperament_tendency\":\"一句中文\"," + "\"summary_text\":\"一段不超过140字的月摘要\"," + "\"confidence\":0.0" + "}" + ) + extra = "请从多个周摘要中提炼阶段性稳定特征,只有反复出现的模式才能进入长期层。" + + return ( + f"你是微信群后台的成员{digest_type}摘要生成器。\n" + f"{extra}\n" + "不可做心理诊断、负面评价、隐私猜测。输出严格 JSON,不要 markdown。\n" + f"{schema}\n" + "要求:\n" + "1. 所有列表字段最多5项,必须中性克制。\n" + "2. 只有多个下级摘要反复出现的特征,才允许写进 stable_traits / habit_patterns / long_term_reply_preferences。\n" + "3. recent_state / phase_state 只描述当前阶段状态,不要冒充长期人格。\n" + f"成员: {display_name} ({wxid})\n" + f"群ID: {chatroom_id}\n" + f"周期: {period_key}\n" + "下级摘要:\n" + ("\n".join(structured_lines) or "暂无") + ) + + @staticmethod + def build_final_context_prompt(chatroom_id: str, wxid: str, display_name: str, + monthly_digests: List[Dict], weekly_digests: List[Dict], + daily_digests: List[Dict]) -> str: + monthly_lines = [json.dumps(item.get("structured", {}), ensure_ascii=False) for item in monthly_digests[:6]] + weekly_lines = [json.dumps(item.get("structured", {}), ensure_ascii=False) for item in weekly_digests[:4]] + daily_lines = [json.dumps(item.get("structured", {}), ensure_ascii=False) for item in daily_digests[:6]] + + return ( + "你是微信群后台的最终成员交互画像整理器。\n" + "请结合月级、周级、日级摘要,输出一个既有长期层又有近期层的后台交互画像。\n" + "不要做敏感推断、心理诊断、隐私猜测。输出严格 JSON,不要 markdown。\n" + "{" + "\"activity_level\":\"高活跃|中活跃|低活跃|观察中\"," + "\"message_pattern\":\"一句中文\"," + "\"interaction_style\":\"一句中文\"," + "\"response_style_hint\":\"一句中文\"," + "\"topics_of_interest\":[\"主题1\"]," + "\"recent_focus\":[\"近期主题1\"]," + "\"stable_traits\":[\"长期特征1\"]," + "\"habit_patterns\":[\"习惯1\"]," + "\"long_term_reply_preferences\":[\"偏好1\"]," + "\"recent_state\":[\"近期状态1\"]," + "\"temperament_tendency\":\"一句中文\"," + "\"summary_text\":\"一段不超过150字的后台摘要\"," + "\"confidence\":0.0," + "\"engagement_traits\":[\"特征1\"]," + "\"reply_taboos\":[\"避坑1\"]" + "}\n" + "要求:\n" + "1. stable_traits、habit_patterns、long_term_reply_preferences 只从月级和多次重复证据中提取。\n" + "2. recent_focus、recent_state 更依赖最近周级和日级。\n" + "3. summary_text 要像后台备注,不要明显暴露在给用户做画像。\n" + f"成员: {display_name} ({wxid})\n" + f"群ID: {chatroom_id}\n" + "月级摘要:\n" + ("\n".join(monthly_lines) or "暂无") + + "\n周级摘要:\n" + ("\n".join(weekly_lines) or "暂无") + + "\n日级摘要:\n" + ("\n".join(daily_lines) or "暂无") + ) diff --git a/plugins/member_context/service.py b/plugins/member_context/service.py index 5feadd4..580ec1c 100644 --- a/plugins/member_context/service.py +++ b/plugins/member_context/service.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- import json -import math import re from collections import Counter from datetime import datetime @@ -12,7 +11,10 @@ from loguru import logger from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator from db.member_context_db import MemberContextDBOperator +from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB +from plugins.member_context.digest_service import MemberDigestService +from plugins.member_context.prompt_builder import MemberContextPromptBuilder from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus @@ -33,66 +35,122 @@ class MemberContextService: self.contacts_db = ContactsDBOperator(self.db_manager) self.message_db = MessageStorageDB(self.db_manager) self.member_context_db = MemberContextDBOperator(self.db_manager) + self.member_digest_db = MemberDigestDBOperator(self.db_manager) + self.digest_service = MemberDigestService( + self.contacts_db, self.message_db, self.member_digest_db, plugin_config or {} + ) self.LOG = logger self.plugin_config = plugin_config or {} api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) + schedule_config = self.plugin_config.get("schedule", {}) - self.ai_enabled = bool(api_config.get("enabled", False)) + self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False))) self.ai_base_url = (api_config.get("base_url") or "").rstrip("/") self.ai_api_key = api_config.get("api_key", "") self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/") self.ai_timeout = int(api_config.get("request_timeout", 60)) + self.sample_days = int(profile_config.get("sample_days", 30)) - self.ai_sample_limit = int(profile_config.get("sample_message_limit", 80)) self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200)) - self.ai_min_member_messages = int(profile_config.get("ai_min_member_messages", 12)) self.active_member_hours = int(profile_config.get("active_member_hours", 72)) self.min_member_messages = int(profile_config.get("min_member_messages", 3)) self.max_members_per_group_per_run = int(profile_config.get("max_members_per_group_per_run", 30)) self.stale_hours = int(profile_config.get("stale_hours", 24)) - schedule_config = self.plugin_config.get("schedule", {}) + self.stable_decay = float(profile_config.get("stable_decay", 0.96)) + self.stable_max_items = int(profile_config.get("stable_max_items", 6)) + self.stable_min_score = float(profile_config.get("stable_min_score", 0.9)) + self.stable_ready_days = int(profile_config.get("stable_ready_days", 180)) self.only_recent_active_groups = bool(schedule_config.get("only_recent_active_groups", False)) self.active_hours = int(schedule_config.get("active_hours", 72)) self.min_group_messages = int(schedule_config.get("min_group_messages", 20)) def build_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None, - limit: Optional[int] = None) -> Dict: + limit: Optional[int] = None, force_digest_rebuild: bool = False) -> Dict: days = days or self.sample_days limit = limit or self.refresh_limit_per_member + existing_context = self.member_context_db.get_member_context(chatroom_id, wxid) member = self.contacts_db.get_chatroom_member_info(chatroom_id, wxid) or {} - messages = self.message_db.get_member_recent_messages(chatroom_id, wxid, days=days, limit=limit) - recent_messages = self.message_db.get_member_recent_messages(chatroom_id, wxid, days=min(days, 7), limit=100) - display_name = member.get("display_name") or member.get("nick_name") or wxid - activity_level = self._calc_activity_level(len(messages), days) - message_pattern = self._build_message_pattern(messages) - response_style_hint = self._build_response_style_hint(messages) - topics = self._extract_keywords(messages, limit=5) - recent_focus = self._extract_keywords(recent_messages, limit=4) - confidence = self._calc_confidence(len(messages)) + digest_snapshot = self.digest_service.ensure_member_digest_pipeline( + chatroom_id, wxid, force=force_digest_rebuild + ) + daily_digests = digest_snapshot.get("daily_digests", []) + weekly_digests = digest_snapshot.get("weekly_digests", []) + monthly_digests = digest_snapshot.get("monthly_digests", []) + + recent_messages = self.message_db.get_member_recent_messages(chatroom_id, wxid, days=min(days, 7), limit=120) + monthly_structured = [item.get("structured", {}) or {} for item in monthly_digests] + weekly_structured = [item.get("structured", {}) or {} for item in weekly_digests] + daily_structured = [item.get("structured", {}) or {} for item in daily_digests] + + observation_days = self._calc_observation_days(daily_digests) + activity_level = self._calc_activity_level(len(recent_messages), max(min(days, 7), 1)) context = { "chatroom_id": chatroom_id, "wxid": wxid, "display_name": display_name, "activity_level": activity_level, - "message_pattern": message_pattern, - "interaction_style": self._build_interaction_style(messages), - "response_style_hint": response_style_hint, - "topics_of_interest": topics, - "recent_focus": recent_focus, - "summary_text": self._build_summary_text(activity_level, message_pattern, response_style_hint, topics, recent_focus), - "confidence": confidence, - "source_message_count": len(messages), + "message_pattern": self._best_text( + daily_structured, ["message_pattern"], default=self._build_message_pattern(recent_messages) + ), + "interaction_style": self._best_text( + daily_structured, ["interaction_style"], default=self._build_interaction_style(recent_messages) + ), + "response_style_hint": self._build_response_style_hint_from_digests( + daily_structured, weekly_structured, monthly_structured + ), + "topics_of_interest": self._extract_scored_items( + monthly_structured + weekly_structured, ["long_term_topics", "stable_topics", "topics"], limit=5 + ), + "recent_focus": self._extract_scored_items(daily_structured, ["topics"], limit=4), + "summary_text": "", + "confidence": self._calc_digest_confidence(monthly_digests, weekly_digests, daily_digests), + "source_message_count": len(recent_messages), "source_days": days, "last_profiled_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "meta": self._build_meta(messages, recent_messages), + "meta": { + "stable_traits": self._extract_scored_items( + monthly_structured + weekly_structured, ["stable_traits", "engagement_traits"], limit=self.stable_max_items + ), + "habit_patterns": self._extract_scored_items( + monthly_structured + weekly_structured + daily_structured, + ["habit_patterns", "habit_signals"], limit=self.stable_max_items + ), + "long_term_reply_preferences": self._extract_scored_items( + monthly_structured + weekly_structured, ["long_term_reply_preferences", "reply_preferences"], limit=4 + ), + "recent_state": self._extract_scored_items( + weekly_structured + daily_structured, ["recent_state", "phase_state", "topics"], limit=4 + ), + "temperament_tendency": self._best_text( + monthly_structured + weekly_structured + daily_structured, + ["temperament_tendency", "temperament_signal"], default="" + ), + "engagement_traits": self._extract_scored_items( + daily_structured + weekly_structured, ["engagement_traits", "stable_traits"], limit=4 + ), + "reply_taboos": self._extract_scored_items(daily_structured, ["reply_taboos"], limit=3), + "observation_days": observation_days, + "stable_ready": observation_days >= self.stable_ready_days, + "profile_iterations": int(((existing_context or {}).get("meta", {}) or {}).get("profile_iterations", 0)) + 1, + "history_message_count": self._sum_digest_source_count(daily_digests), + "digest_daily_count": len(daily_digests), + "digest_weekly_count": len(weekly_digests), + "digest_monthly_count": len(monthly_digests), + "last_daily_digest_at": daily_digests[0].get("last_generated_at") if daily_digests else "", + "last_weekly_digest_at": weekly_digests[0].get("last_generated_at") if weekly_digests else "", + "last_monthly_digest_at": monthly_digests[0].get("last_generated_at") if monthly_digests else "", + "refresh_mode": self._build_refresh_mode(existing_context, digest_snapshot), + }, } - ai_context = self._generate_ai_context(chatroom_id, wxid, display_name, context, messages) + ai_context = self._generate_ai_context_from_digests( + chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests + ) if ai_context: context.update({ "activity_level": ai_context.get("activity_level") or context["activity_level"], @@ -105,6 +163,9 @@ class MemberContextService: "confidence": ai_context.get("confidence", context["confidence"]), }) context["meta"].update(ai_context.get("meta", {})) + + context = self._merge_with_existing_context(existing_context, context) + context["summary_text"] = context.get("summary_text") or self._build_summary_text_from_context(context) return context def refresh_member_context(self, chatroom_id: str, wxid: str, days: Optional[int] = None, @@ -117,6 +178,10 @@ class MemberContextService: self.LOG.info( f"[成员交互摘要] 单个成员刷新完成: group={chatroom_id}, wxid={wxid}, " f"display_name={context.get('display_name', wxid)}, messages={context.get('source_message_count', 0)}, " + f"mode={context.get('meta', {}).get('refresh_mode', '')}, " + f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/" + f"{context.get('meta', {}).get('digest_weekly_count', 0)}/" + f"{context.get('meta', {}).get('digest_monthly_count', 0)}, " f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" ) return context @@ -167,7 +232,7 @@ class MemberContextService: ) continue context = self.build_member_context(chatroom_id, wxid, days=days, limit=limit_per_member) - if context["source_message_count"] <= 0: + if context["source_message_count"] <= 0 and context.get("meta", {}).get("digest_daily_count", 0) <= 0: skipped += 1 self.LOG.debug( f"[成员交互摘要] 跳过成员(样本不足): group={chatroom_id}, " @@ -181,6 +246,10 @@ class MemberContextService: f"wxid={wxid}, display_name={context.get('display_name', wxid)}, " f"messages={context.get('source_message_count', 0)}, " f"activity={context.get('activity_level', '')}, " + f"mode={context.get('meta', {}).get('refresh_mode', '')}, " + f"digests={context.get('meta', {}).get('digest_daily_count', 0)}/" + f"{context.get('meta', {}).get('digest_weekly_count', 0)}/" + f"{context.get('meta', {}).get('digest_monthly_count', 0)}, " f"ai={'yes' if context.get('meta', {}).get('ai_provider') else 'no'}" ) @@ -238,7 +307,10 @@ class MemberContextService: f"skipped={result.get('skipped', 0)}, active_candidates={result.get('active_candidates', 0)}" ) - self.LOG.info(f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, 未启用群={disabled}, 非活跃群={inactive}") + self.LOG.info( + f"成员交互摘要刷新完成: 启用活跃群={group_count}, 成员={member_count}, 跳过={skipped}, " + f"未启用群={disabled}, 非活跃群={inactive}" + ) return {"groups": group_count, "members": member_count, "skipped": skipped, "disabled_groups": disabled, "inactive_groups": inactive} def is_group_enabled(self, chatroom_id: str) -> bool: @@ -247,129 +319,6 @@ class MemberContextService: return True return GroupBotManager.get_group_permission(chatroom_id, feature) == PermissionStatus.ENABLED - def _calc_activity_level(self, message_count: int, days: int) -> str: - daily_avg = message_count / max(days, 1) - if message_count >= 80 or daily_avg >= 3: - return "高活跃" - if message_count >= 25 or daily_avg >= 1: - return "中活跃" - if message_count > 0: - return "低活跃" - return "观察中" - - def _build_message_pattern(self, messages: List[Dict]) -> str: - if not messages: - return "样本较少,暂不做明显模式判断" - - contents = [m.get("content", "") for m in messages if m.get("content")] - if not contents: - return "样本较少,暂不做明显模式判断" - - avg_len = sum(len(c) for c in contents) / len(contents) - question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / len(contents) - link_ratio = sum(1 for c in contents if "http://" in c or "https://" in c) / len(contents) - - traits = [] - if avg_len <= 12: - traits.append("短句居多") - elif avg_len >= 35: - traits.append("表达较完整") - else: - traits.append("表达中等长度") - - if question_ratio >= 0.35: - traits.append("问题导向明显") - elif question_ratio >= 0.15: - traits.append("偶尔连续追问") - - if link_ratio >= 0.15: - traits.append("常分享链接或资料") - - if not traits: - traits.append("发言较平稳") - return ",".join(traits) - - def _build_response_style_hint(self, messages: List[Dict]) -> str: - if not messages: - return "样本不足时保持中性、简洁、避免过度熟络" - - contents = [m.get("content", "") for m in messages if m.get("content")] - avg_len = sum(len(c) for c in contents) / max(len(contents), 1) - question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / max(len(contents), 1) - - if question_ratio >= 0.35: - return "优先给明确结论,再补充步骤或依据,避免空泛回应" - if avg_len <= 12: - return "回复尽量简洁直接,先回答核心点,减少铺垫" - if avg_len >= 35: - return "可以给稍完整的解释,但保持结构清楚,避免冗长" - return "保持自然口语化,结论和解释尽量平衡" - - def _build_interaction_style(self, messages: List[Dict]) -> str: - if not messages: - return "互动样本较少" - contents = [m.get("content", "") for m in messages if m.get("content")] - question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / max(len(contents), 1) - emoji_ratio = sum(1 for c in contents if re.search(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]", c)) / max(len(contents), 1) - mention_ratio = sum(1 for c in contents if "@" in c) / max(len(contents), 1) - - parts = [] - if question_ratio >= 0.3: - parts.append("偏提问推进") - if emoji_ratio >= 0.15: - parts.append("表情互动感较强") - if mention_ratio >= 0.1: - parts.append("会主动点名互动") - if not parts: - parts.append("自然跟随式互动") - return ",".join(parts) - - def _extract_keywords(self, messages: List[Dict], limit: int = 5) -> List[str]: - counter = Counter() - for message in messages: - content = message.get("content", "") - for token in self._tokenize(content): - if token in self.STOPWORDS: - continue - counter[token] += 1 - return [word for word, _ in counter.most_common(limit)] - - def _tokenize(self, text: str) -> List[str]: - chinese_words = re.findall(r"[\u4e00-\u9fff]{2,6}", text) - english_words = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,20}", text) - return chinese_words + [word.lower() for word in english_words] - - def _calc_confidence(self, message_count: int) -> float: - return round(min(0.95, math.log(message_count + 1, 10)), 2) if message_count > 0 else 0.1 - - def _build_summary_text(self, activity_level: str, message_pattern: str, - response_style_hint: str, topics: List[str], recent_focus: List[str]) -> str: - parts = [ - f"近期互动强度:{activity_level}", - f"表达特征:{message_pattern}", - f"回复建议:{response_style_hint}", - ] - if topics: - parts.append(f"长期关注:{'、'.join(topics)}") - if recent_focus: - parts.append(f"近期话题:{'、'.join(recent_focus)}") - return ";".join(parts) - - def _build_meta(self, messages: List[Dict], recent_messages: List[Dict]) -> Dict: - latest_time = None - if recent_messages: - latest = recent_messages[-1].get("timestamp") - if isinstance(latest, datetime): - latest_time = latest.strftime("%Y-%m-%d %H:%M:%S") - elif latest: - latest_time = str(latest) - - return { - "message_count_30d": len(messages), - "message_count_7d": len(recent_messages), - "latest_message_time": latest_time, - } - def _get_recent_active_members(self, chatroom_id: str) -> List[Dict]: sql = """ SELECT @@ -425,7 +374,10 @@ class MemberContextService: try: return datetime.strptime(str(value), "%Y-%m-%d %H:%M:%S") except Exception: - return None + try: + return datetime.strptime(str(value)[:10], "%Y-%m-%d") + except Exception: + return None def _get_recent_active_chatrooms(self) -> set: sql = """ @@ -439,14 +391,17 @@ class MemberContextService: rows = self.message_db.execute_query(sql, ("%@chatroom", self.active_hours, self.min_group_messages)) or [] return {row.get("group_id") for row in rows if row.get("group_id")} - def _generate_ai_context(self, chatroom_id: str, wxid: str, display_name: str, - base_context: Dict, messages: List[Dict]) -> Optional[Dict]: + def _generate_ai_context_from_digests(self, chatroom_id: str, wxid: str, display_name: str, + monthly_digests: List[Dict], weekly_digests: List[Dict], + daily_digests: List[Dict]) -> Optional[Dict]: if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: return None - if len(messages) < self.ai_min_member_messages: + if len(daily_digests) < 2 and len(weekly_digests) < 1 and len(monthly_digests) < 1: return None - prompt = self._build_ai_prompt(chatroom_id, wxid, display_name, base_context, messages[-self.ai_sample_limit:]) + prompt = MemberContextPromptBuilder.build_final_context_prompt( + chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests + ) headers = { "Authorization": f"Bearer {self.ai_api_key}", "Content-Type": "application/json", @@ -454,88 +409,38 @@ class MemberContextService: payload = { "inputs": {"query": prompt}, "response_mode": "blocking", - "user": f"member-context:{chatroom_id}:{wxid}", + "user": f"member-context-final:{chatroom_id}:{wxid}", } url = f"{self.ai_base_url}/{self.ai_endpoint}" try: + self.LOG.info( + f"[成员交互摘要][AI] 发起最终画像请求: group={chatroom_id}, wxid={wxid}, " + f"monthly={len(monthly_digests)}, weekly={len(weekly_digests)}, daily={len(daily_digests)}" + ) response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) response.raise_for_status() - response_data = response.json() - parsed = self._parse_ai_answer(response_data.get("answer", "")) + data = response.json() + parsed = self._parse_ai_answer(data.get("answer", "")) if not parsed: + self.LOG.warning( + f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, " + f"answer_preview={(data.get('answer', '') or '')[:200]}" + ) return None - usage = (response_data.get("metadata") or {}).get("usage", {}) or {} - parsed["meta"] = { + usage = (data.get("metadata") or {}).get("usage", {}) or {} + parsed_meta = parsed.get("meta", {}) or {} + parsed_meta.update({ "ai_provider": "dify", "ai_mode": "completion", "ai_tokens": usage.get("total_tokens"), "ai_latency": usage.get("latency"), - } + }) + parsed["meta"] = parsed_meta return parsed except Exception as e: - self.LOG.warning(f"成员交互摘要 AI 生成失败,回退到本地摘要: chatroom={chatroom_id}, wxid={wxid}, error={e}") + self.LOG.warning(f"成员交互摘要最终画像 AI 生成失败,回退到本地融合: chatroom={chatroom_id}, wxid={wxid}, error={e}") return None - def _build_ai_prompt(self, chatroom_id: str, wxid: str, display_name: str, - base_context: Dict, messages: List[Dict]) -> str: - message_lines = [] - for msg in messages[-40:]: - ts = msg.get("timestamp") - if isinstance(ts, datetime): - ts = ts.strftime("%m-%d %H:%M") - content = (msg.get("content") or "").replace("\n", " ").strip() - content = content[:160] - if content: - message_lines.append(f"[{ts}] {content}") - - topics = "、".join(base_context.get("topics_of_interest", [])) or "无明显长期话题" - recent_focus = "、".join(base_context.get("recent_focus", [])) or "无明显近期话题" - - return ( - "你是一个微信群运营后台的成员交互摘要提取器。\n" - "你的任务不是做人设分析,也不是性格判断,而是基于公开聊天记录,提取对后续回复策略有帮助的“交互特征摘要”。\n" - "你只能依据给定聊天样本输出保守结论,不能脑补,不能做敏感推断,不能写负面标签,不能输出隐私猜测。\n" - "请根据以下成员近30天公开发言,输出一个严格 JSON 对象,不要 markdown,不要解释,不要代码块。\n" - "JSON schema:\n" - "{" - "\"activity_level\":\"高活跃|中活跃|低活跃|观察中\"," - "\"message_pattern\":\"一句中文,描述表达特点\"," - "\"interaction_style\":\"一句中文,描述他在群里如何与人互动\"," - "\"response_style_hint\":\"一句中文,描述适合怎样回应\"," - "\"topics_of_interest\":[\"主题1\",\"主题2\"]," - "\"recent_focus\":[\"近期主题1\",\"近期主题2\"]," - "\"summary_text\":\"一段不超过120字的后台交互摘要\"," - "\"confidence\":0.0," - "\"engagement_traits\":[\"特征1\",\"特征2\"]," - "\"reply_taboos\":[\"避坑1\",\"避坑2\"]" - "}\n" - "要求:\n" - "1. 只总结群内公开行为特征,不要输出性格诊断、负面标签或敏感结论。\n" - "2. topics_of_interest 表示相对稳定的话题偏好,最多5个;recent_focus 表示近期频繁提及的话题,最多4个。\n" - "3. message_pattern 只能描述可观察到的表达方式,例如:短句居多、问题导向、爱发链接、解释较完整、常接梗互动。\n" - "4. interaction_style 要描述他在群里的参与方式,例如:偏围观后插话、喜欢接梗、会连续追问、偏一对一回应。\n" - "5. response_style_hint 只能写对回复策略有帮助的建议,例如:先给结论再补步骤、保持简洁直接、可以适度接梗;不要写成评价语。\n" - "6. engagement_traits 最多4个,写成中性的短标签,例如:节奏快、爱追问细节、接梗自然、偏结果导向。\n" - "7. reply_taboos 最多3个,只写回复时应避免的方式,例如:避免长篇铺垫、避免过度说教、避免太官方。\n" - "8. summary_text 要像后台备注,客观、中性、克制,不要让人一眼看出是在给用户贴标签。\n" - "9. confidence 取值 0 到 1;如果样本较少或不稳定,必须降低 confidence。\n" - "10. 如果证据不足,宁可输出更弱、更泛化的结论,也不要瞎猜。\n\n" - "下面是正反例参考。\n" - "坏例子:这个人情绪化、爱抬杠、虚荣、玻璃心。\n" - "好例子:常用短句直接表达观点;遇到问题时更适合先给明确结论,再补充解释。\n\n" - f"成员标识: {display_name} ({wxid})\n" - f"群ID: {chatroom_id}\n" - f"样本消息数: {base_context.get('source_message_count', 0)}\n" - f"本地活跃度估计: {base_context.get('activity_level', '')}\n" - f"本地表达特征: {base_context.get('message_pattern', '')}\n" - f"本地互动风格: {base_context.get('interaction_style', '')}\n" - f"本地回复建议: {base_context.get('response_style_hint', '')}\n" - f"本地长期关注: {topics}\n" - f"本地近期话题: {recent_focus}\n" - "最近消息样本:\n" - + "\n".join(message_lines) - ) - def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: return None @@ -548,18 +453,10 @@ class MemberContextService: except Exception: return None - topics = data.get("topics_of_interest") or [] - recent_focus = data.get("recent_focus") or [] - engagement_traits = data.get("engagement_traits") or [] - reply_taboos = data.get("reply_taboos") or [] - if not isinstance(topics, list): - topics = [] - if not isinstance(recent_focus, list): - recent_focus = [] - if not isinstance(engagement_traits, list): - engagement_traits = [] - if not isinstance(reply_taboos, list): - reply_taboos = [] + def norm_list(value, limit): + if not isinstance(value, list): + return [] + return [str(item).strip() for item in value[:limit] if str(item).strip()] try: confidence = float(data.get("confidence", 0)) @@ -571,12 +468,244 @@ class MemberContextService: "message_pattern": str(data.get("message_pattern", "")).strip(), "interaction_style": str(data.get("interaction_style", "")).strip(), "response_style_hint": str(data.get("response_style_hint", "")).strip(), - "topics_of_interest": [str(item).strip() for item in topics[:5] if str(item).strip()], - "recent_focus": [str(item).strip() for item in recent_focus[:4] if str(item).strip()], + "topics_of_interest": norm_list(data.get("topics_of_interest"), 5), + "recent_focus": norm_list(data.get("recent_focus"), 4), "summary_text": str(data.get("summary_text", "")).strip(), "confidence": max(0.0, min(1.0, confidence)), "meta": { - "engagement_traits": [str(item).strip() for item in engagement_traits[:4] if str(item).strip()], - "reply_taboos": [str(item).strip() for item in reply_taboos[:3] if str(item).strip()], + "stable_traits": norm_list(data.get("stable_traits"), self.stable_max_items), + "habit_patterns": norm_list(data.get("habit_patterns"), self.stable_max_items), + "long_term_reply_preferences": norm_list(data.get("long_term_reply_preferences"), 4), + "recent_state": norm_list(data.get("recent_state"), 4), + "temperament_tendency": str(data.get("temperament_tendency", "")).strip(), + "engagement_traits": norm_list(data.get("engagement_traits"), 4), + "reply_taboos": norm_list(data.get("reply_taboos"), 3), } } + + def _merge_with_existing_context(self, existing_context: Optional[Dict], current_context: Dict) -> Dict: + existing_context = existing_context or {} + existing_meta = existing_context.get("meta", {}) or {} + meta = current_context.get("meta", {}) or {} + + observation_days = max( + int(meta.get("observation_days", 0)), + int(existing_meta.get("observation_days", 0)), + ) + meta["observation_days"] = observation_days + meta["stable_ready"] = observation_days >= self.stable_ready_days + + merged_topic_scores = self._merge_scored_items( + existing_meta.get("topic_scores", {}), + current_context.get("topics_of_interest", []), + current_context.get("confidence", 0), + ) + merged_trait_scores = self._merge_scored_items( + existing_meta.get("stable_trait_scores", {}), + meta.get("stable_traits", []), + current_context.get("confidence", 0), + ) + merged_habit_scores = self._merge_scored_items( + existing_meta.get("habit_pattern_scores", {}), + meta.get("habit_patterns", []), + current_context.get("confidence", 0), + ) + merged_reply_pref_scores = self._merge_scored_items( + existing_meta.get("long_term_reply_preference_scores", {}), + meta.get("long_term_reply_preferences", []), + current_context.get("confidence", 0), + ) + merged_temperament_scores = self._merge_scored_items( + existing_meta.get("temperament_tendency_scores", {}), + [meta.get("temperament_tendency")] if meta.get("temperament_tendency") else [], + current_context.get("confidence", 0) * 0.9, + ) + + meta["topic_scores"] = merged_topic_scores + meta["stable_trait_scores"] = merged_trait_scores + meta["habit_pattern_scores"] = merged_habit_scores + meta["long_term_reply_preference_scores"] = merged_reply_pref_scores + meta["temperament_tendency_scores"] = merged_temperament_scores + meta["stable_traits"] = self._top_scored_items(merged_trait_scores, limit=self.stable_max_items) + meta["habit_patterns"] = self._top_scored_items(merged_habit_scores, limit=self.stable_max_items) + meta["long_term_reply_preferences"] = self._top_scored_items(merged_reply_pref_scores, limit=4) + temperament = self._top_scored_items(merged_temperament_scores, limit=1) + meta["temperament_tendency"] = temperament[0] if temperament else meta.get("temperament_tendency", "") + meta["engagement_traits"] = (meta.get("engagement_traits") or existing_meta.get("engagement_traits") or [])[:4] + meta["reply_taboos"] = (meta.get("reply_taboos") or existing_meta.get("reply_taboos") or [])[:3] + meta["recent_state"] = (meta.get("recent_state") or existing_meta.get("recent_state") or [])[:4] + meta["profile_iterations"] = max( + int(meta.get("profile_iterations", 0)), + int(existing_meta.get("profile_iterations", 0)), + ) + meta["history_message_count"] = max( + int(meta.get("history_message_count", 0)), + int(existing_meta.get("history_message_count", 0)), + ) + + current_context["topics_of_interest"] = self._top_scored_items(merged_topic_scores, limit=5) or current_context.get("topics_of_interest", []) + current_context["recent_focus"] = (current_context.get("recent_focus") or existing_context.get("recent_focus") or [])[:4] + current_context["response_style_hint"] = current_context.get("response_style_hint") or existing_context.get("response_style_hint") or "" + current_context["meta"] = meta + return current_context + + def _extract_scored_items(self, items: List[Dict], keys: List[str], limit: int) -> List[str]: + scores = {} + for index, item in enumerate(items): + weight = max(0.5, 1.2 - index * 0.08) + for key in keys: + values = item.get(key, []) + if not isinstance(values, list): + continue + for value in values: + normalized = str(value).strip() + if not normalized: + continue + scores[normalized] = scores.get(normalized, 0.0) + weight + return [key for key, _ in sorted(scores.items(), key=lambda pair: pair[1], reverse=True)[:limit]] + + def _best_text(self, items: List[Dict], keys: List[str], default: str = "") -> str: + counter = Counter() + for item in items: + for key in keys: + value = str(item.get(key, "")).strip() + if value: + counter[value] += 1 + if counter: + return counter.most_common(1)[0][0] + return default + + def _build_response_style_hint_from_digests(self, daily_structured: List[Dict], + weekly_structured: List[Dict], + monthly_structured: List[Dict]) -> str: + hint = self._best_text(daily_structured, ["response_style_hint"]) + if hint: + return hint + preferences = self._extract_scored_items( + monthly_structured + weekly_structured, + ["long_term_reply_preferences", "reply_preferences"], + limit=3, + ) + if preferences: + return "更适合:" + "、".join(preferences[:3]) + return "保持自然口语化,结论和解释尽量平衡" + + def _calc_digest_confidence(self, monthly_digests: List[Dict], weekly_digests: List[Dict], + daily_digests: List[Dict]) -> float: + base = 0.25 + base += min(0.35, len(monthly_digests) * 0.08) + base += min(0.2, len(weekly_digests) * 0.04) + base += min(0.15, len(daily_digests) * 0.02) + return round(min(0.95, base), 2) + + def _calc_observation_days(self, daily_digests: List[Dict]) -> int: + if not daily_digests: + return 0 + end_dt = self._parse_datetime(daily_digests[0].get("period_end")) + start_dt = self._parse_datetime(daily_digests[-1].get("period_start")) + if not start_dt or not end_dt: + return 0 + return max(0, (end_dt - start_dt).days) + + @staticmethod + def _sum_digest_source_count(daily_digests: List[Dict]) -> int: + return sum(int(item.get("source_count", 0)) for item in daily_digests) + + def _build_refresh_mode(self, existing_context: Optional[Dict], digest_snapshot: Dict) -> str: + if not existing_context: + return "bootstrap" + if (digest_snapshot.get("stats", {}) or {}).get("built_monthly", 0) > 0: + return "recalibration" + return "incremental" + + def _build_summary_text_from_context(self, context: Dict) -> str: + meta = context.get("meta", {}) or {} + parts = [] + if meta.get("temperament_tendency"): + label = "长期沟通倾向" if meta.get("stable_ready") else "阶段性沟通倾向" + parts.append(f"{label}:{meta.get('temperament_tendency')}") + if meta.get("stable_traits"): + parts.append(f"长期特征:{'、'.join(meta.get('stable_traits')[:3])}") + if meta.get("habit_patterns"): + parts.append(f"习惯模式:{'、'.join(meta.get('habit_patterns')[:3])}") + if meta.get("recent_state"): + parts.append(f"近期状态:{'、'.join(meta.get('recent_state')[:3])}") + if context.get("response_style_hint"): + parts.append(f"回复建议:{context.get('response_style_hint')}") + return ";".join(parts[:5]) + + def _merge_scored_items(self, existing_scores: Dict, current_items: List[str], confidence: float) -> Dict[str, float]: + merged = {} + for key, value in (existing_scores or {}).items(): + try: + score = float(value) * self.stable_decay + except Exception: + continue + if score >= 0.2: + merged[str(key).strip()] = round(score, 4) + + boost = max(0.6, min(1.8, 0.8 + confidence)) + for item in current_items or []: + normalized = str(item).strip() + if not normalized: + continue + merged[normalized] = round(merged.get(normalized, 0.0) + boost, 4) + return merged + + def _top_scored_items(self, scores: Dict, limit: int) -> List[str]: + ordered = sorted( + ((str(key).strip(), float(value)) for key, value in (scores or {}).items() if str(key).strip()), + key=lambda item: item[1], + reverse=True, + ) + return [key for key, value in ordered if value >= self.stable_min_score][:limit] + + def _calc_activity_level(self, message_count: int, days: int) -> str: + daily_avg = message_count / max(days, 1) + if message_count >= 80 or daily_avg >= 3: + return "高活跃" + if message_count >= 25 or daily_avg >= 1: + return "中活跃" + if message_count > 0: + return "低活跃" + return "观察中" + + def _build_message_pattern(self, messages: List[Dict]) -> str: + if not messages: + return "样本较少,暂不做明显模式判断" + contents = [m.get("content", "") for m in messages if m.get("content")] + if not contents: + return "样本较少,暂不做明显模式判断" + avg_len = sum(len(c) for c in contents) / len(contents) + question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / len(contents) + link_ratio = sum(1 for c in contents if "http://" in c or "https://" in c) / len(contents) + traits = [] + if avg_len <= 12: + traits.append("短句居多") + elif avg_len >= 35: + traits.append("表达较完整") + else: + traits.append("表达中等长度") + if question_ratio >= 0.35: + traits.append("问题导向明显") + elif question_ratio >= 0.15: + traits.append("偶尔连续追问") + if link_ratio >= 0.15: + traits.append("常分享链接或资料") + return ",".join(traits or ["发言较平稳"]) + + def _build_interaction_style(self, messages: List[Dict]) -> str: + if not messages: + return "互动样本较少" + contents = [m.get("content", "") for m in messages if m.get("content")] + question_ratio = sum(1 for c in contents if "?" in c or "?" in c) / max(len(contents), 1) + emoji_ratio = sum(1 for c in contents if re.search(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]", c)) / max(len(contents), 1) + mention_ratio = sum(1 for c in contents if "@" in c) / max(len(contents), 1) + parts = [] + if question_ratio >= 0.3: + parts.append("偏提问推进") + if emoji_ratio >= 0.15: + parts.append("表情互动感较强") + if mention_ratio >= 0.1: + parts.append("会主动点名互动") + return ",".join(parts or ["自然跟随式互动"])