接入dify工作流内容

This commit is contained in:
liuwei
2025-03-13 15:46:53 +08:00
parent 52c4db469c
commit 3dbb08ee7c

View File

@@ -124,20 +124,21 @@ class DifyChat:
""" """
# 清理过期会话 # 清理过期会话
self._cleanup_expired_conversations() self._cleanup_expired_conversations()
# 更新最后活动时间 # 更新最后活动时间
self.last_activity[session_id] = time.time() self.last_activity[session_id] = time.time()
# 初始化会话历史 # 初始化会话历史
if session_id not in self.conversations: if session_id not in self.conversations:
self.conversations[session_id] = [] self.conversations[session_id] = []
# 准备请求头 # 准备请求头
headers = { headers = {
"Authorization": f"Bearer {self.api_key}", "Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json" "Content-Type": "application/json",
"Accept": "text/event-stream" # 指定接受事件流
} }
# 准备请求数据 # 准备请求数据
data = { data = {
"query": query, "query": query,
@@ -146,11 +147,11 @@ class DifyChat:
"response_mode": "streaming", # 使用流式响应 "response_mode": "streaming", # 使用流式响应
"conversation_id": session_id # 使用会话ID保持上下文 "conversation_id": session_id # 使用会话ID保持上下文
} }
# 添加历史记录 # 添加历史记录
if self.conversations[session_id]: if self.conversations[session_id]:
data["conversation_history"] = self.conversations[session_id] data["conversation_history"] = self.conversations[session_id]
# 设置代理 # 设置代理
proxies = None proxies = None
if self.http_proxy: if self.http_proxy:
@@ -158,47 +159,92 @@ class DifyChat:
"http": self.http_proxy, "http": self.http_proxy,
"https": self.http_proxy "https": self.http_proxy
} }
# 发送请求 # 发送请求
url = f"{self.base_url}/chat-messages" url = f"{self.base_url}/chat-messages"
response = requests.post(url, headers=headers, json=data, proxies=proxies)
try:
if response.status_code != 200: # 使用流式请求
self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") with requests.post(url, headers=headers, json=data, proxies=proxies, stream=True) as response:
return f"请求失败,状态码: {response.status_code}" if response.status_code != 200:
self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}")
# 解析响应 return f"请求失败,状态码: {response.status_code}"
result = response.json()
answer = ""
# 提取回复内容 total_tokens = 0
answer = result.get("answer", "")
# 处理流式响应
# 更新会话历史 for line in response.iter_lines():
self.conversations[session_id].append({ if not line:
"role": "user", continue
"content": query
}) # 解析事件流数据
line_text = line.decode('utf-8')
self.conversations[session_id].append({ if not line_text.startswith('data: '):
"role": "assistant", continue
"content": answer
}) # 提取JSON数据部分
json_str = line_text[6:] # 去掉 "data: " 前缀
# 限制会话历史长度
if len(self.conversations[session_id]) > self.max_history_length * 2: try:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] event_data = json.loads(json_str)
event_type = event_data.get("event")
# 统计token使用情况
if "usage" in result and "total_tokens" in result["usage"]: # 处理不同类型的事件
total_tokens = result["usage"]["total_tokens"] if event_type == "workflow_finished":
if user_id in self.token_usage: # 工作流完成事件可以获取总token数
self.token_usage[user_id] += total_tokens data_obj = event_data.get("data", {})
else: total_tokens = data_obj.get("total_tokens", 0)
self.token_usage[user_id] = total_tokens
elif event_type == "node_finished":
self.LOG.info(f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens") # 节点完成事件可以获取节点输出和token使用情况
data_obj = event_data.get("data", {})
return answer outputs = data_obj.get("outputs", {})
# 从outputs中提取回答内容
if outputs and isinstance(outputs, dict):
for key, value in outputs.items():
if isinstance(value, str) and value.strip():
answer += value
# 获取token使用情况
execution_metadata = data_obj.get("execution_metadata", {})
if "total_tokens" in execution_metadata:
total_tokens = execution_metadata.get("total_tokens", 0)
except json.JSONDecodeError:
self.LOG.error(f"解析事件流数据失败: {line_text}")
continue
# 更新会话历史
self.conversations[session_id].append({
"role": "user",
"content": query
})
self.conversations[session_id].append({
"role": "assistant",
"content": answer
})
# 限制会话历史长度
if len(self.conversations[session_id]) > self.max_history_length * 2:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
# 统计token使用情况
if total_tokens > 0:
if user_id in self.token_usage:
self.token_usage[user_id] += total_tokens
else:
self.token_usage[user_id] = total_tokens
self.LOG.info(f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens")
return answer
except Exception as e:
self.LOG.error(f"处理Dify流式响应时出错: {str(e)}")
return f"处理响应时出错: {str(e)}"
def _cleanup_expired_conversations(self) -> None: def _cleanup_expired_conversations(self) -> None:
"""清理过期的会话""" """清理过期的会话"""