diff --git a/plugins/member_context/config.toml b/plugins/member_context/config.toml index d0093ef..da7b128 100644 --- a/plugins/member_context/config.toml +++ b/plugins/member_context/config.toml @@ -8,7 +8,8 @@ api_key = "app-b2cj03DipGCIAmgBfcx7SKsT" mode = "workflow" endpoint = "workflows/run" workflow_output_key = "text" -request_timeout = 60 +response_mode = "streaming" +request_timeout = 240 [profile] sample_days = 30 diff --git a/plugins/member_context/dify_client.py b/plugins/member_context/dify_client.py index 938fd7f..5c183c2 100644 --- a/plugins/member_context/dify_client.py +++ b/plugins/member_context/dify_client.py @@ -20,6 +20,7 @@ class DifyClient: default_endpoint = "workflows/run" if self.mode == "workflow" else "completion-messages" self.endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") self.workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() + self.response_mode = str(api_config.get("response_mode", "blocking")).strip().lower() def is_available(self) -> bool: return self.enabled and bool(self.base_url and self.api_key) @@ -41,27 +42,81 @@ class DifyClient: payload = { "inputs": payload_inputs, - "response_mode": "blocking", + "response_mode": self.response_mode, "user": user, } url = f"{self.base_url}/{self.endpoint}" try: - self.LOG.info(f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, endpoint={self.endpoint}, tag={tag}") - response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_response(data) + self.LOG.info( + f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, response_mode={self.response_mode}, " + f"endpoint={self.endpoint}, tag={tag}" + ) + if self.response_mode == "streaming": + parsed = self._run_streaming(url, headers, payload, tag) + else: + response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) + response.raise_for_status() + data = response.json() + parsed = self._parse_response(data) if parsed is not None: return parsed - self.LOG.warning( - f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}, " - f"response_preview={(response.text or '')[:300]}" - ) + self.LOG.warning(f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}") return None except Exception as e: self.LOG.warning(f"[成员交互摘要][Dify] 请求失败: mode={self.mode}, tag={tag}, error={e}") return None + def _run_streaming(self, url: str, headers: Dict, payload: Dict, tag: str) -> Optional[Dict]: + with requests.post(url, headers=headers, json=payload, timeout=self.timeout, stream=True) as response: + response.raise_for_status() + event_name = "" + text_fragments = [] + final_payload = None + + for raw_line in response.iter_lines(decode_unicode=True): + if raw_line is None: + continue + line = str(raw_line).strip() + if not line: + continue + if line.startswith("event:"): + event_name = line[6:].strip() + continue + if not line.startswith("data:"): + continue + + data_text = line[5:].strip() + if not data_text or data_text == "[DONE]": + continue + try: + chunk = json.loads(data_text) + except Exception: + continue + + candidate_text = self._extract_stream_text(chunk) + if candidate_text: + text_fragments.append(candidate_text) + + chunk_event = str(chunk.get("event") or event_name or "").strip() + if chunk_event in {"workflow_finished", "message_end"}: + final_payload = chunk + + if final_payload: + parsed = self._parse_response(final_payload) + if parsed and parsed.get("text"): + return parsed + + text = "".join(fragment for fragment in text_fragments if fragment) + if text: + return { + "text": text.strip(), + "usage": {}, + "raw": final_payload or {}, + } + + self.LOG.warning(f"[成员交互摘要][Dify] 流式响应未产出有效内容: tag={tag}") + return None + def _parse_response(self, data: Dict) -> Optional[Dict]: if self.mode == "workflow": return self._parse_workflow_response(data) @@ -105,6 +160,22 @@ class DifyClient: "raw": data, } + def _extract_stream_text(self, chunk: Dict) -> str: + if not isinstance(chunk, dict): + return "" + payload = (chunk.get("data") or {}) if isinstance(chunk.get("data"), dict) else {} + outputs = payload.get("outputs", {}) if isinstance(payload.get("outputs"), dict) else {} + + for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + return self._stringify_output(outputs.get(key)) + + for key in ("text", "answer"): + if chunk.get(key) is not None: + return self._stringify_output(chunk.get(key)) + + return "" + @staticmethod def _stringify_output(value) -> str: if value is None: diff --git a/plugins/member_context/digest_service.py b/plugins/member_context/digest_service.py index 5a4b6e0..d90ed18 100644 --- a/plugins/member_context/digest_service.py +++ b/plugins/member_context/digest_service.py @@ -276,8 +276,13 @@ class MemberDigestService: digests = [] for wxid in pending_wxids: - parsed = parsed_map.get(wxid) or self._build_daily_digest_fallback(sender_messages.get(wxid, [])) + parsed = parsed_map.get(wxid) if not parsed: + self.LOG.warning( + f"[成员交互摘要][群日批处理] 跳过成员(未提取到有效结果): " + f"group={chatroom_id}, date={digest_date}, wxid={wxid}, " + f"source_count={len(sender_messages.get(wxid, []))}" + ) continue parsed = self._normalize_profile_item(parsed) digests.append({ @@ -310,8 +315,10 @@ class MemberDigestService: ) parsed = self._request_ai_json(prompt, tag=f"{digest_type}:{period_key}", chatroom_id=chatroom_id, wxid=wxid) if not parsed: - parsed = self._build_period_digest_fallback(digest_type, items) - if not parsed: + self.LOG.warning( + f"[成员交互摘要][{digest_type}] 跳过周期摘要(未提取到有效结果): " + f"group={chatroom_id}, wxid={wxid}, period={period_key}, source_count={len(items)}" + ) return None return { @@ -438,102 +445,6 @@ class MemberDigestService: pass return score - def _build_daily_digest_fallback(self, messages: List[Dict]) -> Optional[Dict]: - if not messages: - return None - contents = [str(item.get("content", "")).strip() for item in messages if item.get("content")] - if not contents: - return None - short_samples = [content[:60] for content in contents[:3]] - avg_len = sum(len(content) for content in contents) / max(len(contents), 1) - message_pattern = "短句居多" if avg_len <= 16 else "表达较完整" if avg_len >= 35 else "表达中等长度" - return { - "topics": [], - "identity_clues": [], - "skill_signals": [], - "family_signals": [], - "life_stage_signals": [], - "value_preferences": [], - "interaction_style": "自然跟随式互动", - "message_pattern": message_pattern, - "response_style_hint": "保持简洁自然,先回应核心点", - "habit_signals": [], - "engagement_traits": [], - "decision_style": "", - "social_role": "", - "reply_taboos": [], - "temperament_signal": "当天样本有限,暂以中性沟通观察为主", - "summary_text": f"当日消息约{len(messages)}条,{message_pattern}。", - "representative_messages": short_samples, - "confidence": 0.35, - } - - def _build_period_digest_fallback(self, digest_type: str, items: List[Dict]) -> Optional[Dict]: - if not items: - return None - topic_counts = defaultdict(int) - trait_counts = defaultdict(int) - habit_counts = defaultdict(int) - reply_counts = defaultdict(int) - temperament_values = [] - for item in items: - structured = item.get("structured", {}) or {} - for topic in structured.get("topics", []) + structured.get("stable_topics", []) + structured.get("long_term_topics", []): - topic_counts[topic] += 1 - for trait in structured.get("engagement_traits", []) + structured.get("stable_traits", []): - trait_counts[trait] += 1 - for habit in structured.get("habit_signals", []) + structured.get("habit_patterns", []): - habit_counts[habit] += 1 - for pref in structured.get("reply_preferences", []) + structured.get("long_term_reply_preferences", []): - reply_counts[pref] += 1 - if structured.get("temperament_signal"): - temperament_values.append(structured.get("temperament_signal")) - if structured.get("temperament_tendency"): - temperament_values.append(structured.get("temperament_tendency")) - - top_topics = [key for key, _ in sorted(topic_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_traits = [key for key, _ in sorted(trait_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_habits = [key for key, _ in sorted(habit_counts.items(), key=lambda item: item[1], reverse=True)[:5]] - top_reply = [key for key, _ in sorted(reply_counts.items(), key=lambda item: item[1], reverse=True)[:4]] - temperament = temperament_values[0] if temperament_values else "整体保持中性沟通特征" - - if digest_type == "weekly": - return { - "stable_topics": top_topics, - "identity_traits": [], - "skill_profile": [], - "family_profile": [], - "life_stage_profile": [], - "value_profile": [], - "stable_traits": top_traits, - "habit_patterns": top_habits, - "reply_preferences": top_reply, - "group_role": "", - "decision_profile": "", - "recent_state": top_topics[:3], - "temperament_tendency": temperament, - "summary_text": "本周沟通特征已按重复信号汇总。", - "confidence": 0.45, - } - - return { - "long_term_topics": top_topics, - "identity_traits": [], - "skill_profile": [], - "family_profile": [], - "life_stage_profile": [], - "value_profile": [], - "stable_traits": top_traits, - "habit_patterns": top_habits, - "long_term_reply_preferences": top_reply, - "group_role": "", - "decision_profile": "", - "phase_state": top_topics[:3], - "temperament_tendency": temperament, - "summary_text": "本月沟通特征已按周摘要汇总。", - "confidence": 0.5, - } - def _format_group_messages_optimized(self, messages: List[Dict], member_name_map: Dict[str, str]) -> str: if not messages: return ""