feat(message_summary): switch to streaming workflow

This commit is contained in:
liuwei
2026-04-07 09:55:17 +08:00
parent 51fe971cda
commit acc1e7f20b
2 changed files with 107 additions and 25 deletions

View File

@@ -83,7 +83,12 @@ class MessageSummaryPlugin(MessagePluginInterface):
# 从插件配置中获取API密钥和URL
api_config = self._config.get("api", {})
self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo")
self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages")
self._api_base_url = str(api_config.get("api_base_url", "http://192.168.2.240/v1")).rstrip("/")
self._api_mode = str(api_config.get("mode", "workflow")).strip().lower()
default_endpoint = "workflows/run" if self._api_mode == "workflow" else "chat-messages"
self._api_endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/")
self._api_url = api_config.get("api_url", f"{self._api_base_url}/{self._api_endpoint}")
self._workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip()
self._response_mode = api_config.get("response_mode", "blocking")
self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10))
self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180))
@@ -216,10 +221,11 @@ class MessageSummaryPlugin(MessagePluginInterface):
sanitized_name = "群聊"
return sanitized_name
async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Tuple[str, Dict[str, Any]]:
async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
"""解析 Dify 的 SSE 流式响应"""
answer_parts: List[str] = []
metadata: Dict[str, Any] = {}
final_payload: Dict[str, Any] = {}
buffer = ""
async for chunk in response.content.iter_any():
@@ -253,18 +259,42 @@ class MessageSummaryPlugin(MessagePluginInterface):
self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}")
continue
event_name = payload.get("event", "")
event_name = str(payload.get("event", "")).strip()
if event_name in {"message", "agent_message"}:
chunk_text = payload.get("answer", "")
if chunk_text:
answer_parts.append(chunk_text)
elif event_name in {"message_end", "workflow_finished"}:
final_payload = payload
if self._api_mode == "workflow":
payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {}
outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {}
if outputs:
for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]):
if outputs.get(key) is not None:
answer_parts = [self._stringify_output(outputs.get(key))]
break
metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata
elif event_name == "error":
raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败")
else:
if self._api_mode == "workflow":
payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {}
outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {}
for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]):
if outputs.get(key) is not None:
chunk_text = self._stringify_output(outputs.get(key))
if chunk_text:
answer_parts.append(chunk_text)
break
answer = "".join(answer_parts)
return answer, metadata
return {
"answer": answer,
"metadata": metadata,
"data": final_payload.get("data", {}) if isinstance(final_payload, dict) else {},
"event": final_payload.get("event", "") if isinstance(final_payload, dict) else "",
}
def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str:
"""把 token 统计追加到总结文本末尾"""
@@ -288,6 +318,46 @@ class MessageSummaryPlugin(MessagePluginInterface):
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip()
return cleaned
def _stringify_output(self, value: Any) -> str:
"""把 workflow 输出统一转成文本"""
if value is None:
return ""
if isinstance(value, str):
return value.strip()
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value).strip()
def _parse_workflow_response(self, response_data: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
"""解析 Dify workflow 响应"""
payload = (response_data or {}).get("data", {}) or {}
outputs = payload.get("outputs", {}) or {}
answer = ""
if self._workflow_output_key and outputs.get(self._workflow_output_key) is not None:
answer = self._stringify_output(outputs.get(self._workflow_output_key))
elif outputs.get("text") is not None:
answer = self._stringify_output(outputs.get("text"))
elif outputs.get("answer") is not None:
answer = self._stringify_output(outputs.get("answer"))
elif outputs.get("result_json") is not None:
answer = self._stringify_output(outputs.get("result_json"))
elif outputs.get("result") is not None:
answer = self._stringify_output(outputs.get("result"))
else:
for value in outputs.values():
answer = self._stringify_output(value)
if answer:
break
metadata = {
"usage": {
"total_tokens": payload.get("total_tokens", 0),
"latency": payload.get("elapsed_time", 0),
}
}
return answer, metadata
def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]:
"""优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个"""
if self.revoke:
@@ -352,15 +422,26 @@ class MessageSummaryPlugin(MessagePluginInterface):
except Exception as e:
self.LOG.error(f"压缩内容失败:{e}")
# 准备请求数据
data = {
"inputs": {},
"query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}",
"response_mode": self._response_mode,
"conversation_id": "",
"user": group_name if group_name is not None else "message_summary_bot",
"files": [] # 不包含文件
}
prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}"
if self._api_mode == "workflow":
data = {
"inputs": {
"query": prompt,
"group_name": group_name,
"chat_content": content_compress,
},
"response_mode": self._response_mode,
"user": group_name if group_name is not None else "message_summary_bot",
}
else:
data = {
"inputs": {},
"query": prompt,
"response_mode": self._response_mode,
"conversation_id": "",
"user": group_name if group_name is not None else "message_summary_bot",
"files": []
}
self.LOG.info(f"群聊总结内容:{data}")
# 设置请求头
@@ -384,23 +465,21 @@ class MessageSummaryPlugin(MessagePluginInterface):
async with session.post(self._api_url, headers=headers, json=data) as response:
response.raise_for_status() # 检查请求是否成功
if self._response_mode == "streaming":
answer, metadata = await self._parse_streaming_response(response)
response_data = {
"answer": answer,
"metadata": metadata,
}
response_data = await self._parse_streaming_response(response)
else:
response_data = await response.json()
self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}")
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
# 提取回答内容
answer = response_data.get("answer", "")
if self._api_mode == "workflow":
answer, metadata = self._parse_workflow_response(response_data)
else:
answer = response_data.get("answer", "")
metadata = response_data.get("metadata", {})
answer = self._clean_summary_output(answer)
spath = ""
# 提取token使用情况
metadata = response_data.get("metadata", {})
answer = self._append_usage_info(answer, metadata)
if answer and len(answer.strip()) > 0: