接入dify工作流内容

This commit is contained in:
liuwei
2025-03-13 16:21:18 +08:00
parent dc4c8c99b6
commit c897c40370

View File

@@ -144,12 +144,12 @@ class DifyChat:
"query": query, "query": query,
"conversation_id": session_id "conversation_id": session_id
} }
# 准备请求数据 # 准备请求数据
data = { data = {
"sys.files": [], "sys.files": [],
"user": user_id, "user": user_id,
"inputs": inputs_params, "inputs": inputs_params,
"response_mode": "streaming" # 使用流式响应 "response_mode": "blocking" # 使用阻塞响应模式
} }
# 添加历史记录 # 添加历史记录
@@ -171,111 +171,80 @@ class DifyChat:
self.LOG.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}") self.LOG.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}")
try: try:
# 使用流式请求 # 使用普通请求(非流式)
with requests.post(url, headers=headers, json=data, proxies=proxies, stream=True) as response: response = requests.post(url, headers=headers, json=data, proxies=proxies)
if response.status_code != 200:
self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}") if response.status_code != 200:
return f"请求失败,状态码: {response.status_code}" self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}")
return f"请求失败,状态码: {response.status_code}"
answer = "" # 解析响应
total_tokens = 0 response_data = response.json()
self.LOG.info(f"收到Dify API响应: {json.dumps(response_data, ensure_ascii=False)}")
# 提取回答内容
answer = ""
total_tokens = 0
# 获取输出内容
outputs = response_data.get("data", {}).get("outputs", {})
if outputs:
# 尝试从text字段获取回答
if "text" in outputs and isinstance(outputs["text"], str):
answer = outputs["text"]
# 如果没有text字段尝试从其他字段获取
else:
for key, value in outputs.items():
if isinstance(value, str) and value.strip():
answer += value
elif isinstance(value, dict):
# 处理嵌套字典的情况
for sub_key, sub_value in value.items():
if isinstance(sub_value, str) and sub_value.strip():
answer += sub_value
elif isinstance(value, list):
# 处理列表的情况
for item in value:
if isinstance(item, str) and item.strip():
answer += item
elif isinstance(item, dict):
# 处理列表中的字典
for item_key, item_value in item.items():
if isinstance(item_value, str) and item_value.strip():
answer += item_value
# 获取token使用情况
total_tokens = response_data.get("data", {}).get("total_tokens", 0)
# 处理流式响应 # 更新会话历史
for line in response.iter_lines(): self.conversations[session_id].append({
if not line: "role": "user",
continue "content": query
})
# 解析事件流数据 self.conversations[session_id].append({
line_text = line.decode('utf-8') "role": "assistant",
self.LOG.info(f"收到事件流数据: {line_text}") "content": answer
if not line_text.startswith('data: '): })
continue
# 提取JSON数据部分 # 限制会话历史长度
json_str = line_text[6:] # 去掉 "data: " 前缀 if len(self.conversations[session_id]) > self.max_history_length * 2:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
try: # 统计token使用情况
event_data = json.loads(json_str) if total_tokens > 0:
if not event_data: if user_id in self.token_usage:
continue self.token_usage[user_id] += total_tokens
else:
event_type = event_data.get("event") self.token_usage[user_id] = total_tokens
if not event_type:
continue
# 处理不同类型的事件
if event_type == "workflow_finished":
# 工作流完成事件可以获取总token数
data_obj = event_data.get("data", {})
if data_obj:
total_tokens = data_obj.get("total_tokens", 0)
elif event_type == "node_finished": self.LOG.info(
# 节点完成事件可以获取节点输出和token使用情况 f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens")
data_obj = event_data.get("data", {})
if not data_obj:
continue
outputs = data_obj.get("outputs", {})
# 从outputs中提取回答内容
if outputs and isinstance(outputs, dict):
for key, value in outputs.items():
if isinstance(value, str) and value.strip():
answer += value
elif isinstance(value, dict):
# 处理嵌套字典的情况
for sub_key, sub_value in value.items():
if isinstance(sub_value, str) and sub_value.strip():
answer += sub_value
elif isinstance(value, list):
# 处理列表的情况
for item in value:
if isinstance(item, str) and item.strip():
answer += item
elif isinstance(item, dict):
# 处理列表中的字典
for item_key, item_value in item.items():
if isinstance(item_value, str) and item_value.strip():
answer += item_value
# 获取token使用情况 return answer
execution_metadata = data_obj.get("execution_metadata", {})
if "total_tokens" in execution_metadata:
total_tokens = execution_metadata.get("total_tokens", 0)
except json.JSONDecodeError:
self.LOG.error(f"解析事件流数据失败: {line_text}")
continue
# 更新会话历史
self.conversations[session_id].append({
"role": "user",
"content": query
})
self.conversations[session_id].append({
"role": "assistant",
"content": answer
})
# 限制会话历史长度
if len(self.conversations[session_id]) > self.max_history_length * 2:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
# 统计token使用情况
if total_tokens > 0:
if user_id in self.token_usage:
self.token_usage[user_id] += total_tokens
else:
self.token_usage[user_id] = total_tokens
self.LOG.info(
f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens")
return answer
except Exception as e: except Exception as e:
self.LOG.error(f"处理Dify流式响应时出错: {str(e)}") self.LOG.error(f"处理Dify响应时出错: {str(e)}")
return f"处理响应时出错: {str(e)}" return f"处理响应时出错: {str(e)}"
def _cleanup_expired_conversations(self) -> None: def _cleanup_expired_conversations(self) -> None: