diff --git a/dify/dify_chat.py b/dify/dify_chat.py index 9803880..37acd04 100644 --- a/dify/dify_chat.py +++ b/dify/dify_chat.py @@ -144,12 +144,12 @@ class DifyChat: "query": query, "conversation_id": session_id } - # 准备请求数据 + # 准备请求数据 data = { "sys.files": [], "user": user_id, "inputs": inputs_params, - "response_mode": "streaming" # 使用流式响应 + "response_mode": "blocking" # 使用阻塞响应模式 } # 添加历史记录 @@ -171,111 +171,80 @@ class DifyChat: self.LOG.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}") try: - # 使用流式请求 - with requests.post(url, headers=headers, json=data, proxies=proxies, stream=True) as response: - if response.status_code != 200: - self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") - return f"请求失败,状态码: {response.status_code}" + # 使用普通请求(非流式) + response = requests.post(url, headers=headers, json=data, proxies=proxies) + + if response.status_code != 200: + self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") + return f"请求失败,状态码: {response.status_code}" - answer = "" - total_tokens = 0 + # 解析响应 + response_data = response.json() + self.LOG.info(f"收到Dify API响应: {json.dumps(response_data, ensure_ascii=False)}") + + # 提取回答内容 + answer = "" + total_tokens = 0 + + # 获取输出内容 + outputs = response_data.get("data", {}).get("outputs", {}) + if outputs: + # 尝试从text字段获取回答 + if "text" in outputs and isinstance(outputs["text"], str): + answer = outputs["text"] + # 如果没有text字段,尝试从其他字段获取 + else: + for key, value in outputs.items(): + if isinstance(value, str) and value.strip(): + answer += value + elif isinstance(value, dict): + # 处理嵌套字典的情况 + for sub_key, sub_value in value.items(): + if isinstance(sub_value, str) and sub_value.strip(): + answer += sub_value + elif isinstance(value, list): + # 处理列表的情况 + for item in value: + if isinstance(item, str) and item.strip(): + answer += item + elif isinstance(item, dict): + # 处理列表中的字典 + for item_key, item_value in item.items(): + if isinstance(item_value, str) and item_value.strip(): + answer += item_value + + # 获取token使用情况 + total_tokens = response_data.get("data", {}).get("total_tokens", 0) - # 处理流式响应 - for line in response.iter_lines(): - if not line: - continue + # 更新会话历史 + self.conversations[session_id].append({ + "role": "user", + "content": query + }) - # 解析事件流数据 - line_text = line.decode('utf-8') - self.LOG.info(f"收到事件流数据: {line_text}") - if not line_text.startswith('data: '): - continue + self.conversations[session_id].append({ + "role": "assistant", + "content": answer + }) - # 提取JSON数据部分 - json_str = line_text[6:] # 去掉 "data: " 前缀 + # 限制会话历史长度 + if len(self.conversations[session_id]) > self.max_history_length * 2: + self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] - try: - event_data = json.loads(json_str) - if not event_data: - continue - - event_type = event_data.get("event") - if not event_type: - continue - - # 处理不同类型的事件 - if event_type == "workflow_finished": - # 工作流完成事件,可以获取总token数 - data_obj = event_data.get("data", {}) - if data_obj: - total_tokens = data_obj.get("total_tokens", 0) + # 统计token使用情况 + if total_tokens > 0: + if user_id in self.token_usage: + self.token_usage[user_id] += total_tokens + else: + self.token_usage[user_id] = total_tokens - elif event_type == "node_finished": - # 节点完成事件,可以获取节点输出和token使用情况 - data_obj = event_data.get("data", {}) - if not data_obj: - continue - - outputs = data_obj.get("outputs", {}) - # 从outputs中提取回答内容 - if outputs and isinstance(outputs, dict): - for key, value in outputs.items(): - if isinstance(value, str) and value.strip(): - answer += value - elif isinstance(value, dict): - # 处理嵌套字典的情况 - for sub_key, sub_value in value.items(): - if isinstance(sub_value, str) and sub_value.strip(): - answer += sub_value - elif isinstance(value, list): - # 处理列表的情况 - for item in value: - if isinstance(item, str) and item.strip(): - answer += item - elif isinstance(item, dict): - # 处理列表中的字典 - for item_key, item_value in item.items(): - if isinstance(item_value, str) and item_value.strip(): - answer += item_value + self.LOG.info( + f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") - # 获取token使用情况 - execution_metadata = data_obj.get("execution_metadata", {}) - if "total_tokens" in execution_metadata: - total_tokens = execution_metadata.get("total_tokens", 0) - - except json.JSONDecodeError: - self.LOG.error(f"解析事件流数据失败: {line_text}") - continue - - # 更新会话历史 - self.conversations[session_id].append({ - "role": "user", - "content": query - }) - - self.conversations[session_id].append({ - "role": "assistant", - "content": answer - }) - - # 限制会话历史长度 - if len(self.conversations[session_id]) > self.max_history_length * 2: - self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] - - # 统计token使用情况 - if total_tokens > 0: - if user_id in self.token_usage: - self.token_usage[user_id] += total_tokens - else: - self.token_usage[user_id] = total_tokens - - self.LOG.info( - f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") - - return answer + return answer except Exception as e: - self.LOG.error(f"处理Dify流式响应时出错: {str(e)}") + self.LOG.error(f"处理Dify响应时出错: {str(e)}") return f"处理响应时出错: {str(e)}" def _cleanup_expired_conversations(self) -> None: