diff --git a/dify/dify_chat.py b/dify/dify_chat.py index 7a4bcc7..7806f01 100644 --- a/dify/dify_chat.py +++ b/dify/dify_chat.py @@ -124,20 +124,21 @@ class DifyChat: """ # 清理过期会话 self._cleanup_expired_conversations() - + # 更新最后活动时间 self.last_activity[session_id] = time.time() - + # 初始化会话历史 if session_id not in self.conversations: self.conversations[session_id] = [] - + # 准备请求头 headers = { "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json" + "Content-Type": "application/json", + "Accept": "text/event-stream" # 指定接受事件流 } - + # 准备请求数据 data = { "query": query, @@ -146,11 +147,11 @@ class DifyChat: "response_mode": "streaming", # 使用流式响应 "conversation_id": session_id # 使用会话ID保持上下文 } - + # 添加历史记录 if self.conversations[session_id]: data["conversation_history"] = self.conversations[session_id] - + # 设置代理 proxies = None if self.http_proxy: @@ -158,47 +159,92 @@ class DifyChat: "http": self.http_proxy, "https": self.http_proxy } - + # 发送请求 url = f"{self.base_url}/chat-messages" - response = requests.post(url, headers=headers, json=data, proxies=proxies) - - if response.status_code != 200: - self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") - return f"请求失败,状态码: {response.status_code}" - - # 解析响应 - result = response.json() - - # 提取回复内容 - answer = result.get("answer", "") - - # 更新会话历史 - self.conversations[session_id].append({ - "role": "user", - "content": query - }) - - self.conversations[session_id].append({ - "role": "assistant", - "content": answer - }) - - # 限制会话历史长度 - if len(self.conversations[session_id]) > self.max_history_length * 2: - self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] - - # 统计token使用情况 - if "usage" in result and "total_tokens" in result["usage"]: - total_tokens = result["usage"]["total_tokens"] - if user_id in self.token_usage: - self.token_usage[user_id] += total_tokens - else: - self.token_usage[user_id] = total_tokens - - self.LOG.info(f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") - - return answer + + try: + # 使用流式请求 + with requests.post(url, headers=headers, json=data, proxies=proxies, stream=True) as response: + if response.status_code != 200: + self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") + return f"请求失败,状态码: {response.status_code}" + + answer = "" + total_tokens = 0 + + # 处理流式响应 + for line in response.iter_lines(): + if not line: + continue + + # 解析事件流数据 + line_text = line.decode('utf-8') + if not line_text.startswith('data: '): + continue + + # 提取JSON数据部分 + json_str = line_text[6:] # 去掉 "data: " 前缀 + + try: + event_data = json.loads(json_str) + event_type = event_data.get("event") + + # 处理不同类型的事件 + if event_type == "workflow_finished": + # 工作流完成事件,可以获取总token数 + data_obj = event_data.get("data", {}) + total_tokens = data_obj.get("total_tokens", 0) + + elif event_type == "node_finished": + # 节点完成事件,可以获取节点输出和token使用情况 + data_obj = event_data.get("data", {}) + outputs = data_obj.get("outputs", {}) + + # 从outputs中提取回答内容 + if outputs and isinstance(outputs, dict): + for key, value in outputs.items(): + if isinstance(value, str) and value.strip(): + answer += value + + # 获取token使用情况 + execution_metadata = data_obj.get("execution_metadata", {}) + if "total_tokens" in execution_metadata: + total_tokens = execution_metadata.get("total_tokens", 0) + + except json.JSONDecodeError: + self.LOG.error(f"解析事件流数据失败: {line_text}") + continue + + # 更新会话历史 + self.conversations[session_id].append({ + "role": "user", + "content": query + }) + + self.conversations[session_id].append({ + "role": "assistant", + "content": answer + }) + + # 限制会话历史长度 + if len(self.conversations[session_id]) > self.max_history_length * 2: + self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] + + # 统计token使用情况 + if total_tokens > 0: + if user_id in self.token_usage: + self.token_usage[user_id] += total_tokens + else: + self.token_usage[user_id] = total_tokens + + self.LOG.info(f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") + + return answer + + except Exception as e: + self.LOG.error(f"处理Dify流式响应时出错: {str(e)}") + return f"处理响应时出错: {str(e)}" def _cleanup_expired_conversations(self) -> None: """清理过期的会话"""