feat(message_summary): default to streaming mode
This commit is contained in:
@@ -6,6 +6,7 @@ enabled = true
|
||||
[api]
|
||||
api_key = "app-McGLzBhBjeBCSEi7n83MtuTo"
|
||||
api_url = "http://192.168.2.240/v1/chat-messages"
|
||||
response_mode = "streaming"
|
||||
|
||||
[output]
|
||||
output_dir = "output"
|
||||
|
||||
@@ -84,6 +84,7 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
api_config = self._config.get("api", {})
|
||||
self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo")
|
||||
self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages")
|
||||
self._response_mode = api_config.get("response_mode", "streaming")
|
||||
self.message_storage = MessageStorage()
|
||||
db_manager = context.get("db_manager")
|
||||
if db_manager:
|
||||
@@ -212,6 +213,68 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
sanitized_name = "群聊"
|
||||
return sanitized_name
|
||||
|
||||
async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Tuple[str, Dict[str, Any]]:
|
||||
"""解析 Dify 的 SSE 流式响应"""
|
||||
answer_parts: List[str] = []
|
||||
metadata: Dict[str, Any] = {}
|
||||
buffer = ""
|
||||
|
||||
async for chunk in response.content.iter_any():
|
||||
if not chunk:
|
||||
continue
|
||||
|
||||
buffer += chunk.decode("utf-8", errors="ignore")
|
||||
|
||||
while "\n\n" in buffer:
|
||||
raw_event, buffer = buffer.split("\n\n", 1)
|
||||
raw_event = raw_event.strip()
|
||||
if not raw_event:
|
||||
continue
|
||||
|
||||
data_lines = []
|
||||
for line in raw_event.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("data:"):
|
||||
data_lines.append(line[5:].strip())
|
||||
|
||||
if not data_lines:
|
||||
continue
|
||||
|
||||
payload_text = "\n".join(data_lines).strip()
|
||||
if not payload_text or payload_text == "[DONE]":
|
||||
continue
|
||||
|
||||
try:
|
||||
payload = json.loads(payload_text)
|
||||
except json.JSONDecodeError:
|
||||
self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}")
|
||||
continue
|
||||
|
||||
event_name = payload.get("event", "")
|
||||
if event_name in {"message", "agent_message"}:
|
||||
chunk_text = payload.get("answer", "")
|
||||
if chunk_text:
|
||||
answer_parts.append(chunk_text)
|
||||
elif event_name in {"message_end", "workflow_finished"}:
|
||||
metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata
|
||||
elif event_name == "error":
|
||||
raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败")
|
||||
|
||||
answer = "".join(answer_parts)
|
||||
return answer, metadata
|
||||
|
||||
def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str:
|
||||
"""把 token 统计追加到总结文本末尾"""
|
||||
usage = metadata.get("usage", {}) if metadata else {}
|
||||
if not usage:
|
||||
return answer
|
||||
|
||||
prompt_tokens = usage.get("prompt_tokens", 0)
|
||||
completion_tokens = usage.get("completion_tokens", 0)
|
||||
total_tokens = usage.get("total_tokens", 0)
|
||||
tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
|
||||
return answer + tokens_info
|
||||
|
||||
def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]:
|
||||
"""优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个"""
|
||||
if self.revoke:
|
||||
@@ -280,7 +343,7 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
data = {
|
||||
"inputs": {},
|
||||
"query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}",
|
||||
"response_mode": "blocking", # 使用阻塞模式,直接获取完整响应
|
||||
"response_mode": self._response_mode,
|
||||
"conversation_id": "",
|
||||
"user": group_name if group_name is not None else "message_summary_bot",
|
||||
"files": [] # 不包含文件
|
||||
@@ -290,7 +353,8 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
# 设置请求头
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self._api_key}",
|
||||
"Content-Type": "application/json"
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "text/event-stream" if self._response_mode == "streaming" else "application/json"
|
||||
}
|
||||
|
||||
max_retries = 3
|
||||
@@ -303,7 +367,14 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session:
|
||||
async with session.post(self._api_url, headers=headers, json=data) as response:
|
||||
response.raise_for_status() # 检查请求是否成功
|
||||
response_data = await response.json()
|
||||
if self._response_mode == "streaming":
|
||||
answer, metadata = await self._parse_streaming_response(response)
|
||||
response_data = {
|
||||
"answer": answer,
|
||||
"metadata": metadata,
|
||||
}
|
||||
else:
|
||||
response_data = await response.json()
|
||||
|
||||
self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}")
|
||||
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
|
||||
@@ -315,37 +386,29 @@ class MessageSummaryPlugin(MessagePluginInterface):
|
||||
spath = ""
|
||||
# 提取token使用情况
|
||||
metadata = response_data.get("metadata", {})
|
||||
usage = metadata.get("usage", {})
|
||||
answer = self._append_usage_info(answer, metadata)
|
||||
|
||||
if usage:
|
||||
prompt_tokens = usage.get("prompt_tokens", 0)
|
||||
completion_tokens = usage.get("completion_tokens", 0)
|
||||
total_tokens = usage.get("total_tokens", 0)
|
||||
|
||||
# 添加token信息
|
||||
tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}"
|
||||
answer += tokens_info
|
||||
if answer and len(answer.strip()) > 0:
|
||||
try:
|
||||
# 使用唯一文件名并指定完整路径
|
||||
timestamp = int(time.time())
|
||||
output_path = f"summary_{timestamp}.png"
|
||||
# 构建完整的输出路径
|
||||
self.LOG.info(f"开始生成图片: {output_path}")
|
||||
spath = await convert_md_str_to_image(answer, output_path)
|
||||
self.LOG.info(f"成功生成图片: {spath}")
|
||||
except Exception as e:
|
||||
self.LOG.error(f"生成图片失败: {e}", exc_info=True)
|
||||
# 如果图片生成失败,尝试发送纯文本消息
|
||||
try:
|
||||
# 截断过长的文本,避免消息太长
|
||||
max_length = 2000
|
||||
if len(answer) > max_length:
|
||||
answer = answer[:max_length] + "\n\n... (内容过长,已截断)"
|
||||
self.LOG.info("图片生成失败,将发送文本消息作为备选方案")
|
||||
spath = None # 设置为None,让调用方知道需要发送文本
|
||||
spath = None
|
||||
except Exception as fallback_error:
|
||||
self.LOG.error(f"备选文本发送也失败: {fallback_error}")
|
||||
spath = None
|
||||
else:
|
||||
spath = None
|
||||
# 返回文本内容和图片路径
|
||||
return answer, spath
|
||||
|
||||
|
||||
1
temp/cliproxy-src
Submodule
1
temp/cliproxy-src
Submodule
Submodule temp/cliproxy-src added at ab9ebea592
1
temp/new-api-src
Submodule
1
temp/new-api-src
Submodule
Submodule temp/new-api-src added at c9611c493f
Reference in New Issue
Block a user