接入dify工作流内容
This commit is contained in:
@@ -64,11 +64,12 @@ class DifyChat:
|
||||
# 如果没有查询内容,返回使用提示
|
||||
if len(parts) < 2 or not parts[1].strip():
|
||||
self.wcf.send_text(self.command_tip,
|
||||
(message.roomid if message.from_group() else message.sender))
|
||||
(message.roomid if message.from_group() else message.sender))
|
||||
return
|
||||
|
||||
# 检查权限
|
||||
if message.from_group() and self.gbm.get_group_permission(message.roomid, Feature.AI_CAPABILITY) == PermissionStatus.DISABLED:
|
||||
if message.from_group() and self.gbm.get_group_permission(message.roomid,
|
||||
Feature.AI_CAPABILITY) == PermissionStatus.DISABLED:
|
||||
return
|
||||
|
||||
# 获取查询内容
|
||||
@@ -102,13 +103,13 @@ class DifyChat:
|
||||
# 发送回复
|
||||
if response:
|
||||
self.wcf.send_text(response,
|
||||
(message.roomid if message.from_group() else message.sender),
|
||||
message.sender if message.from_group() else "")
|
||||
(message.roomid if message.from_group() else message.sender),
|
||||
message.sender if message.from_group() else "")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"Dify聊天出错:{e}")
|
||||
self.wcf.send_text(f"-----Bot-----\n❌请求出错:{str(e)}",
|
||||
(message.roomid if message.from_group() else message.sender),
|
||||
message.sender if message.from_group() else "")
|
||||
(message.roomid if message.from_group() else message.sender),
|
||||
message.sender if message.from_group() else "")
|
||||
|
||||
def chat_with_dify(self, session_id: str, user_id: str, query: str) -> Optional[str]:
|
||||
"""
|
||||
@@ -124,35 +125,35 @@ class DifyChat:
|
||||
"""
|
||||
# 清理过期会话
|
||||
self._cleanup_expired_conversations()
|
||||
|
||||
|
||||
# 更新最后活动时间
|
||||
self.last_activity[session_id] = time.time()
|
||||
|
||||
|
||||
# 初始化会话历史
|
||||
if session_id not in self.conversations:
|
||||
self.conversations[session_id] = []
|
||||
|
||||
|
||||
# 准备请求头
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "text/event-stream" # 指定接受事件流
|
||||
}
|
||||
|
||||
|
||||
# 准备请求数据
|
||||
data = {
|
||||
"query": query,
|
||||
"sys.files": [],
|
||||
"user": user_id,
|
||||
"inputs":{},
|
||||
"inputs": { "query": query},
|
||||
"response_mode": "streaming", # 使用流式响应
|
||||
"conversation_id": session_id # 使用会话ID保持上下文
|
||||
}
|
||||
|
||||
|
||||
# 添加历史记录
|
||||
if self.conversations[session_id]:
|
||||
data["conversation_history"] = self.conversations[session_id]
|
||||
|
||||
|
||||
# 设置代理
|
||||
proxies = None
|
||||
if self.http_proxy:
|
||||
@@ -160,89 +161,90 @@ class DifyChat:
|
||||
"http": self.http_proxy,
|
||||
"https": self.http_proxy
|
||||
}
|
||||
|
||||
|
||||
# 发送请求
|
||||
url = f"{self.base_url}/workflows/run"
|
||||
|
||||
|
||||
try:
|
||||
# 使用流式请求
|
||||
with requests.post(url, headers=headers, json=data, proxies=proxies, stream=True) as response:
|
||||
if response.status_code != 200:
|
||||
self.LOG.error(f"Dify API请求失败: {response.status_code} {response.text}")
|
||||
return f"请求失败,状态码: {response.status_code}"
|
||||
|
||||
|
||||
answer = ""
|
||||
total_tokens = 0
|
||||
|
||||
|
||||
# 处理流式响应
|
||||
for line in response.iter_lines():
|
||||
if not line:
|
||||
continue
|
||||
|
||||
|
||||
# 解析事件流数据
|
||||
line_text = line.decode('utf-8')
|
||||
if not line_text.startswith('data: '):
|
||||
continue
|
||||
|
||||
|
||||
# 提取JSON数据部分
|
||||
json_str = line_text[6:] # 去掉 "data: " 前缀
|
||||
|
||||
|
||||
try:
|
||||
event_data = json.loads(json_str)
|
||||
event_type = event_data.get("event")
|
||||
|
||||
|
||||
# 处理不同类型的事件
|
||||
if event_type == "workflow_finished":
|
||||
# 工作流完成事件,可以获取总token数
|
||||
data_obj = event_data.get("data", {})
|
||||
total_tokens = data_obj.get("total_tokens", 0)
|
||||
|
||||
|
||||
elif event_type == "node_finished":
|
||||
# 节点完成事件,可以获取节点输出和token使用情况
|
||||
data_obj = event_data.get("data", {})
|
||||
outputs = data_obj.get("outputs", {})
|
||||
|
||||
|
||||
# 从outputs中提取回答内容
|
||||
if outputs and isinstance(outputs, dict):
|
||||
for key, value in outputs.items():
|
||||
if isinstance(value, str) and value.strip():
|
||||
answer += value
|
||||
|
||||
|
||||
# 获取token使用情况
|
||||
execution_metadata = data_obj.get("execution_metadata", {})
|
||||
if "total_tokens" in execution_metadata:
|
||||
total_tokens = execution_metadata.get("total_tokens", 0)
|
||||
|
||||
|
||||
except json.JSONDecodeError:
|
||||
self.LOG.error(f"解析事件流数据失败: {line_text}")
|
||||
continue
|
||||
|
||||
|
||||
# 更新会话历史
|
||||
self.conversations[session_id].append({
|
||||
"role": "user",
|
||||
"content": query
|
||||
})
|
||||
|
||||
|
||||
self.conversations[session_id].append({
|
||||
"role": "assistant",
|
||||
"content": answer
|
||||
})
|
||||
|
||||
|
||||
# 限制会话历史长度
|
||||
if len(self.conversations[session_id]) > self.max_history_length * 2:
|
||||
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
|
||||
|
||||
|
||||
# 统计token使用情况
|
||||
if total_tokens > 0:
|
||||
if user_id in self.token_usage:
|
||||
self.token_usage[user_id] += total_tokens
|
||||
else:
|
||||
self.token_usage[user_id] = total_tokens
|
||||
|
||||
self.LOG.info(f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens")
|
||||
|
||||
|
||||
self.LOG.info(
|
||||
f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens")
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
except Exception as e:
|
||||
self.LOG.error(f"处理Dify流式响应时出错: {str(e)}")
|
||||
return f"处理响应时出错: {str(e)}"
|
||||
@@ -289,4 +291,4 @@ class DifyChat:
|
||||
def reset_all_conversations(self) -> None:
|
||||
"""重置所有会话上下文"""
|
||||
self.conversations.clear()
|
||||
self.last_activity.clear()
|
||||
self.last_activity.clear()
|
||||
|
||||
Reference in New Issue
Block a user