From bfd0dbc15c2186bc9d939433d21f00b0199e6f03 Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 2 Apr 2026 14:25:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E6=88=90=E5=91=98=E7=94=BB?= =?UTF-8?q?=E5=83=8F=20Dify=20=E5=B7=A5=E4=BD=9C=E6=B5=81=E5=B9=B6?= =?UTF-8?q?=E6=B8=85=E7=90=86=E6=97=A7=E6=8F=90=E5=8F=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 member_context 专用 DifyClient,统一兼容 completion 与 workflow 两种调用模式 - 将成员画像插件默认切换到 Dify workflow 模式,配置改用新的 workflow 应用与 workflows/run 接口 - 生成可直接导入 Dify 的成员画像工作流 DSL 文件,方便后台一键导入和发布 - 补充 Dify 工作流接入说明文档,明确输入字段、输出字段、发布步骤与插件消费方式 - 清理旧的单成员日摘要提取链路,日级画像统一收敛到群日批量提取路径,减少无效分支和历史残留 - 去除 member_context 内部多处旧 requests 直连调用,统一改为通过 DifyClient 调用 AI 服务 - 优化群日批量结果解析逻辑,只按 wxid 作为唯一主键识别成员,不再依赖昵称做唯一判断 - 新增按 wxid 的结果去重与完整度评分逻辑,遇到重复成员结果时优先保留字段更完整、置信度更高的一条 - 保留现有初始化、增量、周/月聚合与最终画像生成链路,同时剔除 workflow 接入后已无效或低价值的旧逻辑 - 为后续继续收紧 fallback 标记、增强后台质量诊断和优化工作流输出稳定性打下基础 --- plugins/member_context/DIFY_WORKFLOW.md | 187 ++++++++++++ plugins/member_context/config.toml | 6 +- plugins/member_context/dify_client.py | 116 ++++++++ plugins/member_context/digest_service.py | 151 ++++------ .../member_context_workflow.yml | 271 ++++++++++++++++++ plugins/member_context/prompt_builder.py | 51 ---- plugins/member_context/service.py | 76 +++-- 7 files changed, 673 insertions(+), 185 deletions(-) create mode 100644 plugins/member_context/DIFY_WORKFLOW.md create mode 100644 plugins/member_context/dify_client.py create mode 100644 plugins/member_context/member_context_workflow.yml diff --git a/plugins/member_context/DIFY_WORKFLOW.md b/plugins/member_context/DIFY_WORKFLOW.md new file mode 100644 index 0000000..3e6deb8 --- /dev/null +++ b/plugins/member_context/DIFY_WORKFLOW.md @@ -0,0 +1,187 @@ +# 成员画像 Dify Workflow 设计 + +## 当前状态 + +- Dify app 信息已确认: + - `base_url`: `http://192.168.2.240/v1` + - `api_key`: `app-b2cj03DipGCIAmgBfcx7SKsT` + - `mode`: `workflow` +- 当前接口探测结果: + - `GET /info` 正常,说明应用存在 + - `POST /workflows/run` 返回 `Workflow not published` +- 结论: + - 插件侧已切为 workflow 调用模式 + - 你需要在 Dify 后台把工作流发布,发布后插件才能真正调用 + +## 可直接导入的 DSL + +仓库里已生成可导入文件: + +- [member_context_workflow.yml](/d:/learn/abot/plugins/member_context/member_context_workflow.yml) + +你可以直接在 Dify 后台导入这个 DSL,再检查模型可用性并发布。 + +## 目标 + +将“群某一天的成员画像提取”从单大提示词,拆成 Dify 内部轻工作流,减少结果波动: + +1. 按 `wxid` 作为唯一主键 +2. 不用昵称做唯一识别 +3. 优先提取稳定的行为信号 +4. 允许弱信号为空 +5. 输出严格 JSON 文本,供插件直接落库 + +## 建议工作流 + +### 1. Start 节点 + +输入变量建议如下: + +- `query`:文本,大段提示词正文,插件会直接传入 +- `chatroom_id`:文本 +- `digest_date`:文本,格式 `YYYY-MM-DD` +- `member_labels`:段落,候选成员列表,格式 `wxid | display_name` +- `compressed_chat`:段落,压缩后的群聊天文本 + +说明: +- 当前插件已按上述字段调用 workflow +- 为了兼容未来扩展,`query` 仍会保留 + +### 2. LLM 节点:群日画像批量提取 + +模型建议: +- 上下文长度足够大的模型 +- 响应稳定、结构化能力强的模型 + +系统提示词建议: + +```text +你是微信群后台的成员日行为证据提取器。 + +任务: +根据给定的一天群聊记录,只按 wxid 识别成员,输出每个成员当天的结构化行为观察。 + +关键规则: +1. wxid 是唯一标识。display_name 仅用于展示,不用于身份判定。 +2. 每个 wxid 最终只能输出一条记录,严禁重复输出同一个 wxid。 +3. 请先按 wxid 汇总该成员全天发言,再提取结果。 +4. 即使成员发言以短句为主,只要样本量足够,也必须尽量提炼: + - topics + - skill_signals + - value_preferences + - habit_signals + - engagement_traits + - social_role + - temperament_signal + - summary_text +5. identity_clues、family_signals、life_stage_signals 没有明确公开证据时允许为空。 +6. 不允许因为“短句较多”就统一输出空数组和通用摘要。 +7. 不做心理诊断、不做隐私猜测、不把玩笑当事实。 +8. 只能输出候选成员列表中的 wxid。 + +输出要求: +- 只输出严格 JSON,不要 markdown,不要解释。 +- 输出格式: +{ + "members": [ + { + "wxid": "成员wxid", + "display_name": "显示名", + "topics": ["主题1"], + "identity_clues": ["身份线索1"], + "skill_signals": ["技能信号1"], + "family_signals": ["家庭线索1"], + "life_stage_signals": ["阶段线索1"], + "value_preferences": ["价值偏好1"], + "interaction_style": "一句中文", + "message_pattern": "一句中文", + "response_style_hint": "一句中文", + "habit_signals": ["习惯1"], + "engagement_traits": ["参与特征1"], + "decision_style": "一句中文", + "social_role": "一句中文", + "reply_taboos": ["避坑1"], + "temperament_signal": "一句中文", + "summary_text": "不超过100字", + "representative_messages": ["原话1", "原话2"], + "confidence": 0.95 + } + ] +} +``` + +用户提示词建议: + +```text +群ID: {{chatroom_id}} +日期: {{digest_date}} + +候选成员: +{{member_labels}} + +压缩后的群聊记录: +{{compressed_chat}} +``` + +### 3. End 节点 + +输出变量建议: + +- `text`:直接输出 LLM 节点的文本结果 + +说明: +- 插件默认读取 workflow outputs 中的 `text` +- 如果你在 Dify 里把输出字段换成别的名字,例如 `result_json`,需要同步修改插件配置中的 `workflow_output_key` + +## 插件侧已对接的输入 + +当前插件发给 workflow 的 inputs 如下: + +```json +{ + "query": "完整提示词文本", + "chatroom_id": "45317011307@chatroom", + "digest_date": "2026-04-01", + "member_labels": "wxid_a | 张三\nwxid_b | 李四", + "compressed_chat": "【09:20】...\n..." +} +``` + +## 插件侧已对接的输出 + +插件会优先读取: + +1. `outputs.text` +2. `outputs.answer` +3. `outputs.result_json` +4. `outputs.result` + +如果你的工作流最终输出不是 `text`,请把 + +```toml +workflow_output_key = "text" +``` + +改成对应字段名。 + +## 为什么建议这样设计 + +相较于把所有事情都压进单次 completion: + +- workflow 模式更适合后面继续拆节点 +- 可以后续增加: + - JSON 清洗 Code 节点 + - wxid 去重 Code 节点 + - 质量分判断节点 +- 插件侧已经兼容 workflow 调用,不需要再改主逻辑 + +## 你现在要做的最后一步 + +在 Dify 后台: + +1. 打开 `member_context` +2. 按上面结构补 Start / LLM / End +3. 确保 End 输出字段叫 `text` +4. 点击 Publish 发布 + +发布完成后,插件就能直接调用。 diff --git a/plugins/member_context/config.toml b/plugins/member_context/config.toml index 413bfb8..d0093ef 100644 --- a/plugins/member_context/config.toml +++ b/plugins/member_context/config.toml @@ -4,8 +4,10 @@ enable = true [api] enable = true base_url = "http://192.168.2.240/v1" -api_key = "app-URBzTCyx2VB10cTalurJNkcz" -endpoint = "completion-messages" +api_key = "app-b2cj03DipGCIAmgBfcx7SKsT" +mode = "workflow" +endpoint = "workflows/run" +workflow_output_key = "text" request_timeout = 60 [profile] diff --git a/plugins/member_context/dify_client.py b/plugins/member_context/dify_client.py new file mode 100644 index 0000000..938fd7f --- /dev/null +++ b/plugins/member_context/dify_client.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +import json +from typing import Dict, Optional + +import requests +from loguru import logger + + +class DifyClient: + """Dify completion/workflow 通用调用客户端""" + + def __init__(self, api_config: Optional[Dict] = None): + api_config = api_config or {} + self.LOG = logger + self.enabled = bool(api_config.get("enable", api_config.get("enabled", False))) + self.base_url = (api_config.get("base_url") or "").rstrip("/") + self.api_key = api_config.get("api_key", "") + self.timeout = int(api_config.get("request_timeout", 60)) + self.mode = str(api_config.get("mode", "completion")).strip().lower() + default_endpoint = "workflows/run" if self.mode == "workflow" else "completion-messages" + self.endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") + self.workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() + + def is_available(self) -> bool: + return self.enabled and bool(self.base_url and self.api_key) + + def run(self, prompt: str, user: str, inputs: Optional[Dict] = None, + tag: str = "") -> Optional[Dict]: + if not self.is_available(): + return None + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload_inputs = dict(inputs or {}) + if self.mode == "completion": + payload_inputs.setdefault("query", prompt) + elif prompt and "query" not in payload_inputs: + payload_inputs["query"] = prompt + + payload = { + "inputs": payload_inputs, + "response_mode": "blocking", + "user": user, + } + url = f"{self.base_url}/{self.endpoint}" + try: + self.LOG.info(f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, endpoint={self.endpoint}, tag={tag}") + response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) + response.raise_for_status() + data = response.json() + parsed = self._parse_response(data) + if parsed is not None: + return parsed + self.LOG.warning( + f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}, " + f"response_preview={(response.text or '')[:300]}" + ) + return None + except Exception as e: + self.LOG.warning(f"[成员交互摘要][Dify] 请求失败: mode={self.mode}, tag={tag}, error={e}") + return None + + def _parse_response(self, data: Dict) -> Optional[Dict]: + if self.mode == "workflow": + return self._parse_workflow_response(data) + answer = data.get("answer", "") + usage = (data.get("metadata") or {}).get("usage", {}) or {} + return { + "text": str(answer or "").strip(), + "usage": usage, + "raw": data, + } + + def _parse_workflow_response(self, data: Dict) -> Optional[Dict]: + payload = (data or {}).get("data", {}) or {} + outputs = payload.get("outputs", {}) or {} + text = "" + + if self.workflow_output_key and outputs.get(self.workflow_output_key) is not None: + value = outputs.get(self.workflow_output_key) + text = self._stringify_output(value) + elif outputs.get("text") is not None: + text = self._stringify_output(outputs.get("text")) + elif outputs.get("answer") is not None: + text = self._stringify_output(outputs.get("answer")) + elif outputs.get("result_json") is not None: + text = self._stringify_output(outputs.get("result_json")) + elif outputs.get("result") is not None: + text = self._stringify_output(outputs.get("result")) + else: + for value in outputs.values(): + text = self._stringify_output(value) + if text: + break + + usage = { + "total_tokens": payload.get("total_tokens"), + "latency": payload.get("elapsed_time"), + } + return { + "text": str(text or "").strip(), + "usage": usage, + "raw": data, + } + + @staticmethod + def _stringify_output(value) -> str: + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False) + return str(value).strip() diff --git a/plugins/member_context/digest_service.py b/plugins/member_context/digest_service.py index 8cfa4f7..5a4b6e0 100644 --- a/plugins/member_context/digest_service.py +++ b/plugins/member_context/digest_service.py @@ -5,12 +5,12 @@ from collections import defaultdict from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple -import requests from loguru import logger from db.contacts_db import ContactsDBOperator from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB +from plugins.member_context.dify_client import DifyClient from plugins.member_context.prompt_builder import MemberContextPromptBuilder from utils.compress_chat_data import compress_chat_data @@ -28,12 +28,13 @@ class MemberDigestService: api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) + self.dify_client = DifyClient(api_config) - self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False))) - self.ai_base_url = (api_config.get("base_url") or "").rstrip("/") - self.ai_api_key = api_config.get("api_key", "") - self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/") - self.ai_timeout = int(api_config.get("request_timeout", 60)) + self.ai_enabled = self.dify_client.enabled + self.ai_base_url = self.dify_client.base_url + self.ai_api_key = self.dify_client.api_key + self.ai_endpoint = self.dify_client.endpoint + self.ai_timeout = self.dify_client.timeout self.bootstrap_days = int(profile_config.get("bootstrap_days", 365)) self.daily_message_limit = int(profile_config.get("daily_message_limit", 120)) @@ -301,36 +302,6 @@ class MemberDigestService: ) return digests - def _build_daily_digest(self, chatroom_id: str, wxid: str, display_name: str, - digest_date: str, messages: List[Dict]) -> Optional[Dict]: - prompt = MemberContextPromptBuilder.build_daily_digest_prompt( - chatroom_id, wxid, display_name, digest_date, messages - ) - parsed = self._request_ai_json(prompt, tag=f"daily:{digest_date}", chatroom_id=chatroom_id, wxid=wxid) - if not parsed: - parsed = self._build_daily_digest_fallback(messages) - if not parsed: - return None - - period_start = f"{digest_date} 00:00:00" - period_end = f"{digest_date} 23:59:59" - return { - "chatroom_id": chatroom_id, - "wxid": wxid, - "digest_type": "daily", - "period_key": digest_date, - "period_start": period_start, - "period_end": period_end, - "display_name": display_name, - "source_count": len(messages), - "summary_text": parsed.get("summary_text", ""), - "structured": parsed, - "meta": { - "source_type": "messages", - "representative_messages": parsed.get("representative_messages", []), - }, - } - def _build_period_digest(self, digest_type: str, chatroom_id: str, wxid: str, display_name: str, period_key: str, period_start: str, period_end: str, items: List[Dict]) -> Optional[Dict]: @@ -360,64 +331,44 @@ class MemberDigestService: } def _request_ai_json(self, prompt: str, tag: str, chatroom_id: str, wxid: str) -> Optional[Dict]: - if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: + if not self.dify_client.is_available(): return None - headers = { - "Authorization": f"Bearer {self.ai_api_key}", - "Content-Type": "application/json", - } - payload = { - "inputs": {"query": prompt}, - "response_mode": "blocking", - "user": f"member-digest:{chatroom_id}:{wxid}:{tag}", - } - url = f"{self.ai_base_url}/{self.ai_endpoint}" - try: - self.LOG.info(f"[成员交互摘要][AI] 发起摘要请求: group={chatroom_id}, wxid={wxid}, tag={tag}") - response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_ai_answer(data.get("answer", "")) - if parsed: - usage = (data.get("metadata") or {}).get("usage", {}) or {} - parsed["ai_usage"] = usage - return parsed - except Exception as e: - self.LOG.warning(f"[成员交互摘要][AI] 摘要请求失败: group={chatroom_id}, wxid={wxid}, tag={tag}, error={e}") + response = self.dify_client.run( + prompt=prompt, + user=f"member-digest:{chatroom_id}:{wxid}:{tag}", + inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid, "tag": tag}, + tag=tag, + ) + if not response: return None + parsed = self._parse_ai_answer(response.get("text", "")) + if parsed: + parsed["ai_usage"] = response.get("usage", {}) or {} + return parsed def _request_group_daily_json(self, chatroom_id: str, digest_date: str, member_labels: List[str], compressed_chat: str) -> List[Dict]: - if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: + if not self.dify_client.is_available(): return [] prompt = MemberContextPromptBuilder.build_group_daily_digest_prompt( chatroom_id, digest_date, member_labels, compressed_chat ) - headers = { - "Authorization": f"Bearer {self.ai_api_key}", - "Content-Type": "application/json", - } - payload = { - "inputs": {"query": prompt}, - "response_mode": "blocking", - "user": f"member-digest:{chatroom_id}:group-daily:{digest_date}", - } - url = f"{self.ai_base_url}/{self.ai_endpoint}" - try: - self.LOG.info( - f"[成员交互摘要][AI] 发起群日批量摘要请求: group={chatroom_id}, " - f"date={digest_date}, members={len(member_labels)}" - ) - response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_group_daily_answer(data.get("answer", "")) - return parsed - except Exception as e: - self.LOG.warning( - f"[成员交互摘要][AI] 群日批量摘要失败: group={chatroom_id}, date={digest_date}, error={e}" - ) + response = self.dify_client.run( + prompt=prompt, + user=f"member-digest:{chatroom_id}:group-daily:{digest_date}", + inputs={ + "query": prompt, + "chatroom_id": chatroom_id, + "digest_date": digest_date, + "member_labels": "\n".join(member_labels), + "compressed_chat": compressed_chat, + }, + tag=f"group-daily:{digest_date}", + ) + if not response: return [] + parsed = self._parse_group_daily_answer(response.get("text", "")) + return parsed def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: @@ -454,14 +405,38 @@ class MemberDigestService: members = parsed.get("members", []) if not isinstance(members, list): return [] - normalized = [] + normalized_map = {} for item in members: if not isinstance(item, dict): continue normalized_item = self._normalize_profile_item(item) - if normalized_item.get("wxid"): - normalized.append(normalized_item) - return normalized + wxid = normalized_item.get("wxid") + if not wxid: + continue + existing = normalized_map.get(wxid) + if not existing or self._score_profile_item(normalized_item) > self._score_profile_item(existing): + normalized_map[wxid] = normalized_item + return list(normalized_map.values()) + + @staticmethod + def _score_profile_item(item: Dict) -> float: + if not item: + return 0.0 + score = 0.0 + for key, value in item.items(): + if key in {"wxid", "display_name"}: + continue + if isinstance(value, list): + score += len([v for v in value if str(v).strip()]) * 1.0 + elif isinstance(value, (int, float)): + score += float(value) + elif str(value).strip(): + score += 0.8 + try: + score += float(item.get("confidence", 0)) * 2 + except Exception: + pass + return score def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]: if not messages: diff --git a/plugins/member_context/member_context_workflow.yml b/plugins/member_context/member_context_workflow.yml new file mode 100644 index 0000000..89ff0bf --- /dev/null +++ b/plugins/member_context/member_context_workflow.yml @@ -0,0 +1,271 @@ +app: + description: 按群和日期提取群成员日画像,输出严格 JSON,供 member_context 插件直接消费 + icon: 🧠 + icon_background: '#E0F2FE' + mode: workflow + name: member_context + use_icon_as_answer_icon: false +dependencies: +- current_identifier: null + type: marketplace + value: + marketplace_plugin_unique_identifier: langgenius/volcengine_maas:0.0.13@d402dc32a505b1b4f27588f10e729209bf413ec263467635774d96c4345bd197 +kind: app +version: 0.3.0 +workflow: + conversation_variables: [] + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .TXT + allowed_file_types: + - document + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + fileUploadConfig: + audio_file_size_limit: 50 + batch_count_limit: 5 + file_size_limit: 15 + image_file_size_limit: 10 + video_file_size_limit: 100 + workflow_file_upload_limit: 10 + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: '' + retriever_resource: + enabled: false + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + isInIteration: false + isInLoop: false + sourceType: start + targetType: llm + id: start-source-llm-target + selected: false + source: 'start_node' + sourceHandle: source + target: 'llm_node' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: false + isInLoop: false + sourceType: llm + targetType: end + id: llm-source-end-target + selected: false + source: 'llm_node' + sourceHandle: source + target: 'end_node' + targetHandle: target + type: custom + zIndex: 0 + nodes: + - data: + desc: '' + selected: false + title: 开始 + type: start + variables: + - label: query + max_length: 120000 + options: [] + required: false + type: paragraph + variable: query + - label: chatroom_id + max_length: 128 + options: [] + required: true + type: text-input + variable: chatroom_id + - label: digest_date + max_length: 32 + options: [] + required: true + type: text-input + variable: digest_date + - label: member_labels + max_length: 50000 + options: [] + required: true + type: paragraph + variable: member_labels + - label: compressed_chat + max_length: 200000 + options: [] + required: true + type: paragraph + variable: compressed_chat + height: 194 + id: 'start_node' + position: + x: -420 + y: 120 + positionAbsolute: + x: -420 + y: 120 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 244 + - data: + context: + enabled: false + variable_selector: [] + default_value: + - key: text + type: string + value: '{"members":[]}' + desc: '' + error_strategy: default-value + model: + completion_params: + temperature: 0.2 + mode: chat + name: Doubao-1.5-pro-256k + provider: langgenius/volcengine_maas/volcengine_maas + prompt_template: + - id: system_prompt_member_context + role: system + text: | + 你是微信群后台的成员日行为证据提取器。 + + 任务: + 根据给定的一天群聊记录,只按 wxid 识别成员,输出每个成员当天的结构化行为观察。 + + 关键规则: + 1. wxid 是唯一标识。display_name 仅用于展示,不用于身份判定。 + 2. 每个 wxid 最终只能输出一条记录,严禁重复输出同一个 wxid。 + 3. 请先按 wxid 汇总该成员全天发言,再提取结果。 + 4. 即使成员发言以短句为主,只要样本量足够,也必须尽量提炼: + - topics + - skill_signals + - value_preferences + - habit_signals + - engagement_traits + - social_role + - temperament_signal + - summary_text + 5. identity_clues、family_signals、life_stage_signals 没有明确公开证据时允许为空。 + 6. 不允许因为“短句较多”就统一输出空数组和通用摘要。 + 7. 不做心理诊断、不做隐私猜测、不把玩笑当事实。 + 8. 只能输出候选成员列表中的 wxid。 + 9. topics 更偏向反复出现的关注方向;skill_signals 更偏向能力表现;value_preferences 更偏向判断偏好;social_role 更偏向当天在群里的实际作用。 + 10. 输出前自行去重,同一个 wxid 只保留一条最完整结果。 + + 输出要求: + - 只输出严格 JSON,不要 markdown,不要解释,不要前后缀。 + - 输出格式: + { + "members": [ + { + "wxid": "成员wxid", + "display_name": "显示名", + "topics": ["主题1"], + "identity_clues": ["身份线索1"], + "skill_signals": ["技能信号1"], + "family_signals": ["家庭线索1"], + "life_stage_signals": ["阶段线索1"], + "value_preferences": ["价值偏好1"], + "interaction_style": "一句中文", + "message_pattern": "一句中文", + "response_style_hint": "一句中文", + "habit_signals": ["习惯1"], + "engagement_traits": ["参与特征1"], + "decision_style": "一句中文", + "social_role": "一句中文", + "reply_taboos": ["避坑1"], + "temperament_signal": "一句中文", + "summary_text": "不超过100字", + "representative_messages": ["原话1", "原话2"], + "confidence": 0.95 + } + ] + } + + 字段约束: + - topics、skill_signals、value_preferences、habit_signals、engagement_traits 最多 4 个 + - identity_clues、family_signals、life_stage_signals 最多 3 个 + - reply_taboos 最多 3 个 + - representative_messages 最多 3 条 + - 如果某成员样本明显不足,可以不输出该成员 + - id: user_prompt_member_context + role: user + text: | + 群ID: {{#start_node.chatroom_id#}} + 日期: {{#start_node.digest_date#}} + + 候选成员: + {{#start_node.member_labels#}} + + 压缩后的群聊记录: + {{#start_node.compressed_chat#}} + selected: false + title: 成员画像提取 + type: llm + variables: [] + vision: + enabled: false + height: 98 + id: 'llm_node' + position: + x: 10 + y: 140 + positionAbsolute: + x: 10 + y: 140 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 244 + - data: + desc: '' + outputs: + - value_selector: + - 'llm_node' + - text + variable: text + selected: false + title: 结束 + type: end + height: 90 + id: 'end_node' + position: + x: 430 + y: 140 + positionAbsolute: + x: 430 + y: 140 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 244 + viewport: + x: 120 + y: 180 + zoom: 0.9 diff --git a/plugins/member_context/prompt_builder.py b/plugins/member_context/prompt_builder.py index e5693a9..2eb505b 100644 --- a/plugins/member_context/prompt_builder.py +++ b/plugins/member_context/prompt_builder.py @@ -58,57 +58,6 @@ class MemberContextPromptBuilder: "压缩后的群聊记录:\n" + compressed_chat ) - @staticmethod - def build_daily_digest_prompt(chatroom_id: str, wxid: str, display_name: str, - digest_date: str, messages: List[Dict]) -> str: - lines = [] - for msg in messages[-80:]: - ts = str(msg.get("timestamp", ""))[11:16] - content = (msg.get("content") or "").replace("\n", " ").strip()[:180] - if content: - lines.append(f"[{ts}] {content}") - - return ( - "你是微信群后台的成员日观察摘要生成器。\n" - "请仅基于给定的当日公开聊天记录,提取对后续互动有帮助的中性行为观察。\n" - "不要做人格诊断、隐私猜测、负面评价,不要脑补群外信息。\n" - "这些日观察会被后续系统按周、按月持续累积,所以应优先输出长期可验证的行为线索,而不是一次性情绪。\n" - "输出严格 JSON,不要 markdown。\n" - "{" - "\"topics\":[\"主题1\"]," - "\"identity_clues\":[\"身份线索1\"]," - "\"skill_signals\":[\"技能信号1\"]," - "\"family_signals\":[\"家庭线索1\"]," - "\"life_stage_signals\":[\"阶段线索1\"]," - "\"value_preferences\":[\"价值偏好1\"]," - "\"interaction_style\":\"一句中文\"," - "\"message_pattern\":\"一句中文\"," - "\"response_style_hint\":\"一句中文\"," - "\"habit_signals\":[\"信号1\"]," - "\"engagement_traits\":[\"特征1\"]," - "\"decision_style\":\"一句中文\"," - "\"social_role\":\"一句中文\"," - "\"reply_taboos\":[\"避坑1\"]," - "\"temperament_signal\":\"一句中文,描述当天显露的沟通倾向,必须克制\"," - "\"summary_text\":\"一段不超过100字的日摘要\"," - "\"representative_messages\":[\"原话1\",\"原话2\"]," - "\"confidence\":0.0" - "}\n" - "要求:\n" - "1. topics、identity_clues、skill_signals、family_signals、life_stage_signals、value_preferences、habit_signals、engagement_traits 最多4个,reply_taboos 最多3个。\n" - "2. temperament_signal 只能写当日可观察到的沟通倾向,不可上升为长期性格判断。\n" - "3. representative_messages 保留最能代表当天风格的短句,最多3条。\n" - "4. identity_clues、family_signals、life_stage_signals 只能写线索,不可写成确定事实。\n" - "5. skill_signals 重点描述专业能力、工具熟练度、信息组织能力、问题解决能力等当天显露出的信号。\n" - "6. topics 尽量写持续关注方向,避免写一次性插话;habit_signals 只写当天已明显出现的表达或互动习惯。\n" - "7. value_preferences 只保留公开表达出的判断偏好,如效率优先、先验证再决策、重成本、重稳定。\n" - "8. summary_text 要像后台备注,不要像对话回复。\n" - f"成员: {display_name} ({wxid})\n" - f"群ID: {chatroom_id}\n" - f"日期: {digest_date}\n" - "当日消息:\n" + ("\n".join(lines) or "暂无") - ) - @staticmethod def build_period_digest_prompt(digest_type: str, chatroom_id: str, wxid: str, display_name: str, period_key: str, items: List[Dict]) -> str: diff --git a/plugins/member_context/service.py b/plugins/member_context/service.py index 087b712..73f8050 100644 --- a/plugins/member_context/service.py +++ b/plugins/member_context/service.py @@ -5,7 +5,6 @@ from collections import Counter from datetime import datetime from typing import Dict, List, Optional -import requests from loguru import logger from db.connection import DBConnectionManager @@ -14,6 +13,7 @@ from db.member_context_db import MemberContextDBOperator from db.member_digest_db import MemberDigestDBOperator from db.message_storage import MessageStorageDB from plugins.member_context.digest_service import MemberDigestService +from plugins.member_context.dify_client import DifyClient from plugins.member_context.prompt_builder import MemberContextPromptBuilder from utils.robot_cmd.robot_command import Feature, GroupBotManager, PermissionStatus @@ -45,12 +45,13 @@ class MemberContextService: api_config = self.plugin_config.get("api", {}) profile_config = self.plugin_config.get("profile", {}) schedule_config = self.plugin_config.get("schedule", {}) + self.dify_client = DifyClient(api_config) - self.ai_enabled = bool(api_config.get("enable", api_config.get("enabled", False))) - self.ai_base_url = (api_config.get("base_url") or "").rstrip("/") - self.ai_api_key = api_config.get("api_key", "") - self.ai_endpoint = str(api_config.get("endpoint", "completion-messages")).lstrip("/") - self.ai_timeout = int(api_config.get("request_timeout", 60)) + self.ai_enabled = self.dify_client.enabled + self.ai_base_url = self.dify_client.base_url + self.ai_api_key = self.dify_client.api_key + self.ai_endpoint = self.dify_client.endpoint + self.ai_timeout = self.dify_client.timeout self.sample_days = int(profile_config.get("sample_days", 30)) self.refresh_limit_per_member = int(profile_config.get("refresh_limit_per_member", 200)) @@ -470,7 +471,7 @@ class MemberContextService: def _generate_ai_context_from_digests(self, chatroom_id: str, wxid: str, display_name: str, monthly_digests: List[Dict], weekly_digests: List[Dict], daily_digests: List[Dict]) -> Optional[Dict]: - if not self.ai_enabled or not self.ai_base_url or not self.ai_api_key: + if not self.dify_client.is_available(): return None if len(daily_digests) < 2 and len(weekly_digests) < 1 and len(monthly_digests) < 1: return None @@ -478,44 +479,31 @@ class MemberContextService: prompt = MemberContextPromptBuilder.build_final_context_prompt( chatroom_id, wxid, display_name, monthly_digests, weekly_digests, daily_digests ) - headers = { - "Authorization": f"Bearer {self.ai_api_key}", - "Content-Type": "application/json", - } - payload = { - "inputs": {"query": prompt}, - "response_mode": "blocking", - "user": f"member-context-final:{chatroom_id}:{wxid}", - } - url = f"{self.ai_base_url}/{self.ai_endpoint}" - try: - self.LOG.info( - f"[成员交互摘要][AI] 发起最终画像请求: group={chatroom_id}, wxid={wxid}, " - f"monthly={len(monthly_digests)}, weekly={len(weekly_digests)}, daily={len(daily_digests)}" - ) - response = requests.post(url, headers=headers, json=payload, timeout=self.ai_timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_ai_answer(data.get("answer", "")) - if not parsed: - self.LOG.warning( - f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, " - f"answer_preview={(data.get('answer', '') or '')[:200]}" - ) - return None - usage = (data.get("metadata") or {}).get("usage", {}) or {} - parsed_meta = parsed.get("meta", {}) or {} - parsed_meta.update({ - "ai_provider": "dify", - "ai_mode": "completion", - "ai_tokens": usage.get("total_tokens"), - "ai_latency": usage.get("latency"), - }) - parsed["meta"] = parsed_meta - return parsed - except Exception as e: - self.LOG.warning(f"成员交互摘要最终画像 AI 生成失败,回退到本地融合: chatroom={chatroom_id}, wxid={wxid}, error={e}") + response = self.dify_client.run( + prompt=prompt, + user=f"member-context-final:{chatroom_id}:{wxid}", + inputs={"query": prompt, "chatroom_id": chatroom_id, "wxid": wxid}, + tag=f"final:{wxid}", + ) + if not response: return None + parsed = self._parse_ai_answer(response.get("text", "")) + if not parsed: + self.LOG.warning( + f"[成员交互摘要][AI] 最终画像JSON解析失败: group={chatroom_id}, wxid={wxid}, " + f"answer_preview={(response.get('text', '') or '')[:200]}" + ) + return None + usage = response.get("usage", {}) or {} + parsed_meta = parsed.get("meta", {}) or {} + parsed_meta.update({ + "ai_provider": "dify", + "ai_mode": self.dify_client.mode, + "ai_tokens": usage.get("total_tokens"), + "ai_latency": usage.get("latency"), + }) + parsed["meta"] = parsed_meta + return parsed def _parse_ai_answer(self, answer: str) -> Optional[Dict]: if not answer: