From a4b87f4c7a6585db0ed08f5db00ea71972a212ea Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 2 Apr 2026 14:40:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=88=90=E5=91=98=E7=94=BB?= =?UTF-8?q?=E5=83=8F=E5=B7=A5=E4=BD=9C=E6=B5=81=E8=B0=83=E7=94=A8=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7=E5=B9=B6=E7=A6=81=E6=AD=A2=E5=86=99=E5=85=A5?= =?UTF-8?q?=E5=85=9C=E5=BA=95=E5=9E=83=E5=9C=BE=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将 member_context 的 Dify workflow 调用响应模式切换为 streaming,提高长耗时工作流的连接稳定性 - 将成员画像工作流请求超时时间从 60 秒提升到 240 秒,适配当前群日批量提取任务的实际耗时 - 扩展 DifyClient,支持 workflow streaming 响应解析,在流式场景下尽量提取最终输出或增量文本 - 调整群日画像提取逻辑,AI 未返回成员有效结构化结果时不再写入 fallback 通用数据,而是直接跳过,等待下次任务重试 - 调整周/月周期摘要生成逻辑,AI 未返回有效结果时不再使用本地兜底拼装摘要,避免写入低质量周期画像 - 删除成员日摘要和周期摘要对应的 fallback 生成逻辑,彻底阻断这类无意义垃圾画像继续入库 - 新增跳过日志,明确标记哪些成员或周期摘要因为未提取到有效 AI 结果而未入库,便于后续诊断稳定性问题 --- plugins/member_context/config.toml | 3 +- plugins/member_context/dify_client.py | 91 ++++++++++++++++--- plugins/member_context/digest_service.py | 109 +++-------------------- 3 files changed, 93 insertions(+), 110 deletions(-) diff --git a/plugins/member_context/config.toml b/plugins/member_context/config.toml index d0093ef..da7b128 100644 --- a/plugins/member_context/config.toml +++ b/plugins/member_context/config.toml @@ -8,7 +8,8 @@ api_key = "app-b2cj03DipGCIAmgBfcx7SKsT" mode = "workflow" endpoint = "workflows/run" workflow_output_key = "text" -request_timeout = 60 +response_mode = "streaming" +request_timeout = 240 [profile] sample_days = 30 diff --git a/plugins/member_context/dify_client.py b/plugins/member_context/dify_client.py index 938fd7f..5c183c2 100644 --- a/plugins/member_context/dify_client.py +++ b/plugins/member_context/dify_client.py @@ -20,6 +20,7 @@ class DifyClient: default_endpoint = "workflows/run" if self.mode == "workflow" else "completion-messages" self.endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") self.workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() + self.response_mode = str(api_config.get("response_mode", "blocking")).strip().lower() def is_available(self) -> bool: return self.enabled and bool(self.base_url and self.api_key) @@ -41,27 +42,81 @@ class DifyClient: payload = { "inputs": payload_inputs, - "response_mode": "blocking", + "response_mode": self.response_mode, "user": user, } url = f"{self.base_url}/{self.endpoint}" try: - self.LOG.info(f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, endpoint={self.endpoint}, tag={tag}") - response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_response(data) + self.LOG.info( + f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, response_mode={self.response_mode}, " + f"endpoint={self.endpoint}, tag={tag}" + ) + if self.response_mode == "streaming": + parsed = self._run_streaming(url, headers, payload, tag) + else: + response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) + response.raise_for_status() + data = response.json() + parsed = self._parse_response(data) if parsed is not None: return parsed - self.LOG.warning( - f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}, " - f"response_preview={(response.text or '')[:300]}" - ) + self.LOG.warning(f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}") return None except Exception as e: self.LOG.warning(f"[成员交互摘要][Dify] 请求失败: mode={self.mode}, tag={tag}, error={e}") return None + def _run_streaming(self, url: str, headers: Dict, payload: Dict, tag: str) -> Optional[Dict]: + with requests.post(url, headers=headers, json=payload, timeout=self.timeout, stream=True) as response: + response.raise_for_status() + event_name = "" + text_fragments = [] + final_payload = None + + for raw_line in response.iter_lines(decode_unicode=True): + if raw_line is None: + continue + line = str(raw_line).strip() + if not line: + continue + if line.startswith("event:"): + event_name = line[6:].strip() + continue + if not line.startswith("data:"): + continue + + data_text = line[5:].strip() + if not data_text or data_text == "[DONE]": + continue + try: + chunk = json.loads(data_text) + except Exception: + continue + + candidate_text = self._extract_stream_text(chunk) + if candidate_text: + text_fragments.append(candidate_text) + + chunk_event = str(chunk.get("event") or event_name or "").strip() + if chunk_event in {"workflow_finished", "message_end"}: + final_payload = chunk + + if final_payload: + parsed = self._parse_response(final_payload) + if parsed and parsed.get("text"): + return parsed + + text = "".join(fragment for fragment in text_fragments if fragment) + if text: + return { + "text": text.strip(), + "usage": {}, + "raw": final_payload or {}, + } + + self.LOG.warning(f"[成员交互摘要][Dify] 流式响应未产出有效内容: tag={tag}") + return None + def _parse_response(self, data: Dict) -> Optional[Dict]: if self.mode == "workflow": return self._parse_workflow_response(data) @@ -105,6 +160,22 @@ class DifyClient: "raw": data, } + def _extract_stream_text(self, chunk: Dict) -> str: + if not isinstance(chunk, dict): + return "" + payload = (chunk.get("data") or {}) if isinstance(chunk.get("data"), dict) else {} + outputs = payload.get("outputs", {}) if isinstance(payload.get("outputs"), dict) else {} + + for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + return self._stringify_output(outputs.get(key)) + + for key in ("text", "answer"): + if chunk.get(key) is not None: + return self._stringify_output(chunk.get(key)) + + return "" + @staticmethod def _stringify_output(value) -> str: if value is None: diff --git a/plugins/member_context/digest_service.py b/plugins/member_context/digest_service.py index 5a4b6e0..d90ed18 100644 --- a/plugins/member_context/digest_service.py +++ b/plugins/member_context/digest_service.py @@ -276,8 +276,13 @@ class MemberDigestService: digests = [] for wxid in pending_wxids: - parsed = parsed_map.get(wxid) or self._build_daily_digest_fallback(sender_messages.get(wxid, [])) + parsed = parsed_map.get(wxid) if not parsed: + self.LOG.warning( + f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): " + f"group={chatroom_id}, date={digest_date}, wxid={wxid}, " + f"source_count={len(sender_messages.get(wxid, []))}" + ) continue parsed = self._normalize_profile_item(parsed) digests.append({ @@ -310,8 +315,10 @@ class MemberDigestService: ) parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid) if not parsed: - parsed = self._build_period_digest_fallback(digest_type, items) - if not parsed: + self.LOG.warning( + f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): " + f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}" + ) return None return { @@ -438,102 +445,6 @@ class MemberDigestService: pass return score - def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]: - if not messages: - return None - contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")] - if not contents: - return None - short_samples = [content[:60] for content in contents[:3]] - avg_len = sum(len(content) for content in contents) / max(len(contents), 1) - message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度" - return { - "topics": [], - "identity_clues": [], - "skill_signals": [], - "family_signals": [], - "life_stage_signals": [], - "value_preferences": [], - "interaction_style": "自然跟随式互动", - "message_pattern": message_pattern, - "response_style_hint": "保持简洁自然,先回应核心点", - "habit_signals": [], - "engagement_traits": [], - "decision_style": "", - "social_role": "", - "reply_taboos": [], - "temperament_signal": "当天样本有限,暂以中性沟通观察为主", - "summary_text": f"当日消息约{len(messages)}条,{message_pattern}。", - "representative_messages": short_samples, - "confidence": 0.35, - } - - def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]: - if not items: - return None - topic_counts = defaultdict(int) - trait_counts = defaultdict(int) - habit_counts = defaultdict(int) - reply_counts = defaultdict(int) - temperament_values = [] - for item in items: - structured = item.get("structured", {}) or {} - for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []): - topic_counts[topic] += 1 - for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []): - trait_counts[trait] += 1 - for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []): - habit_counts[habit] += 1 - for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []): - reply_counts[pref] += 1 - if structured.get("temperament_signal"): - temperament_values.append(structured.get("temperament_signal")) - if structured.get("temperament_tendency"): - temperament_values.append(structured.get("temperament_tendency")) - - top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]] - temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征" - - if digest_type == "weekly": - return { - "stable_topics": top_topics, - "identity_traits": [], - "skill_profile": [], - "family_profile": [], - "life_stage_profile": [], - "value_profile": [], - "stable_traits": top_traits, - "habit_patterns": top_habits, - "reply_preferences": top_reply, - "group_role": "", - "decision_profile": "", - "recent_state": top_topics[:3], - "temperament_tendency": temperament, - "summary_text": "本周沟通特征已按重复信号汇总。", - "confidence": 0.45, - } - - return { - "long_term_topics": top_topics, - "identity_traits": [], - "skill_profile": [], - "family_profile": [], - "life_stage_profile": [], - "value_profile": [], - "stable_traits": top_traits, - "habit_patterns": top_habits, - "long_term_reply_preferences": top_reply, - "group_role": "", - "decision_profile": "", - "phase_state": top_topics[:3], - "temperament_tendency": temperament, - "summary_text": "本月沟通特征已按周摘要汇总。", - "confidence": 0.5, - } - def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str: if not messages: return ""