From 2a0cbb4e56d5f17a444c92f8ad440890a0e4aad7 Mon Sep 17 00:00:00 2001 From: liuwei Date: Tue, 7 Apr 2026 09:10:24 +0800 Subject: [PATCH] feat(message_summary): default to streaming mode --- plugins/message_summary/config.toml | 1 + plugins/message_summary/main.py | 95 ++++++++++++++++++++++++----- temp/cliproxy-src | 1 + temp/new-api-src | 1 + 4 files changed, 82 insertions(+), 16 deletions(-) create mode 160000 temp/cliproxy-src create mode 160000 temp/new-api-src diff --git a/plugins/message_summary/config.toml b/plugins/message_summary/config.toml index 0e738e2..5d9c1e7 100644 --- a/plugins/message_summary/config.toml +++ b/plugins/message_summary/config.toml @@ -6,6 +6,7 @@ enabled = true [api] api_key = "app-McGLzBhBjeBCSEi7n83MtuTo" api_url = "http://192.168.2.240/v1/chat-messages" +response_mode = "streaming" [output] output_dir = "output" diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index 10f9913..4316c4c 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -84,6 +84,7 @@ class MessageSummaryPlugin(MessagePluginInterface): api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") + self._response_mode = api_config.get("response_mode", "streaming") self.message_storage = MessageStorage() db_manager = context.get("db_manager") if db_manager: @@ -212,6 +213,68 @@ class MessageSummaryPlugin(MessagePluginInterface): sanitized_name = "群聊" return sanitized_name + async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Tuple[str, Dict[str, Any]]: + """解析 Dify 的 SSE 流式响应""" + answer_parts: List[str] = [] + metadata: Dict[str, Any] = {} + buffer = "" + + async for chunk in response.content.iter_any(): + if not chunk: + continue + + buffer += chunk.decode("utf-8", errors="ignore") + + while "\n\n" in buffer: + raw_event, buffer = buffer.split("\n\n", 1) + raw_event = raw_event.strip() + if not raw_event: + continue + + data_lines = [] + for line in raw_event.splitlines(): + line = line.strip() + if line.startswith("data:"): + data_lines.append(line[5:].strip()) + + if not data_lines: + continue + + payload_text = "\n".join(data_lines).strip() + if not payload_text or payload_text == "[DONE]": + continue + + try: + payload = json.loads(payload_text) + except json.JSONDecodeError: + self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}") + continue + + event_name = payload.get("event", "") + if event_name in {"message", "agent_message"}: + chunk_text = payload.get("answer", "") + if chunk_text: + answer_parts.append(chunk_text) + elif event_name in {"message_end", "workflow_finished"}: + metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata + elif event_name == "error": + raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败") + + answer = "".join(answer_parts) + return answer, metadata + + def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str: + """把 token 统计追加到总结文本末尾""" + usage = metadata.get("usage", {}) if metadata else {} + if not usage: + return answer + + prompt_tokens = usage.get("prompt_tokens", 0) + completion_tokens = usage.get("completion_tokens", 0) + total_tokens = usage.get("total_tokens", 0) + tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" + return answer + tokens_info + def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]: """优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个""" if self.revoke: @@ -280,7 +343,7 @@ class MessageSummaryPlugin(MessagePluginInterface): data = { "inputs": {}, "query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}", - "response_mode": "blocking", # 使用阻塞模式,直接获取完整响应 + "response_mode": self._response_mode, "conversation_id": "", "user": group_name if group_name is not None else "message_summary_bot", "files": [] # 不包含文件 @@ -290,7 +353,8 @@ class MessageSummaryPlugin(MessagePluginInterface): # 设置请求头 headers = { "Authorization": f"Bearer {self._api_key}", - "Content-Type": "application/json" + "Content-Type": "application/json", + "Accept": "text/event-stream" if self._response_mode == "streaming" else "application/json" } max_retries = 3 @@ -303,7 +367,14 @@ class MessageSummaryPlugin(MessagePluginInterface): async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session: async with session.post(self._api_url, headers=headers, json=data) as response: response.raise_for_status() # 检查请求是否成功 - response_data = await response.json() + if self._response_mode == "streaming": + answer, metadata = await self._parse_streaming_response(response) + response_data = { + "answer": answer, + "metadata": metadata, + } + else: + response_data = await response.json() self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") @@ -315,37 +386,29 @@ class MessageSummaryPlugin(MessagePluginInterface): spath = "" # 提取token使用情况 metadata = response_data.get("metadata", {}) - usage = metadata.get("usage", {}) + answer = self._append_usage_info(answer, metadata) - if usage: - prompt_tokens = usage.get("prompt_tokens", 0) - completion_tokens = usage.get("completion_tokens", 0) - total_tokens = usage.get("total_tokens", 0) - - # 添加token信息 - tokens_info = f"\n\n【tokens】输入: {prompt_tokens} 生成: {completion_tokens} 总: {total_tokens}" - answer += tokens_info + if answer and len(answer.strip()) > 0: try: # 使用唯一文件名并指定完整路径 timestamp = int(time.time()) output_path = f"summary_{timestamp}.png" - # 构建完整的输出路径 self.LOG.info(f"开始生成图片: {output_path}") spath = await convert_md_str_to_image(answer, output_path) self.LOG.info(f"成功生成图片: {spath}") except Exception as e: self.LOG.error(f"生成图片失败: {e}", exc_info=True) - # 如果图片生成失败,尝试发送纯文本消息 try: - # 截断过长的文本,避免消息太长 max_length = 2000 if len(answer) > max_length: answer = answer[:max_length] + "\n\n... (内容过长,已截断)" self.LOG.info("图片生成失败,将发送文本消息作为备选方案") - spath = None # 设置为None,让调用方知道需要发送文本 + spath = None except Exception as fallback_error: self.LOG.error(f"备选文本发送也失败: {fallback_error}") spath = None + else: + spath = None # 返回文本内容和图片路径 return answer, spath diff --git a/temp/cliproxy-src b/temp/cliproxy-src new file mode 160000 index 0000000..ab9ebea --- /dev/null +++ b/temp/cliproxy-src @@ -0,0 +1 @@ +Subproject commit ab9ebea5925d6c5b06d65d67f422eb3ad446f7ed diff --git a/temp/new-api-src b/temp/new-api-src new file mode 160000 index 0000000..c9611c4 --- /dev/null +++ b/temp/new-api-src @@ -0,0 +1 @@ +Subproject commit c9611c493f3112448331b43cf007ae1fc75217f5