diff --git a/plugins/message_summary/config.toml b/plugins/message_summary/config.toml index 3201a5d..92df869 100644 --- a/plugins/message_summary/config.toml +++ b/plugins/message_summary/config.toml @@ -4,9 +4,12 @@ enabled = true [api] -api_key = "app-McGLzBhBjeBCSEi7n83MtuTo" -api_url = "http://192.168.2.240/v1/chat-messages" -response_mode = "blocking" +api_key = "app-shCA6bo5l2VDmnvhg2BtuJbk" +api_base_url = "http://192.168.2.240/v1" +mode = "workflow" +endpoint = "workflows/run" +workflow_output_key = "text" +response_mode = "streaming" connect_timeout_seconds = 10 request_timeout_seconds = 180 retry_delays_seconds = [10, 20] diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index 5f185bb..7e705d4 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -83,7 +83,12 @@ class MessageSummaryPlugin(MessagePluginInterface): # 从插件配置中获取API密钥和URL api_config = self._config.get("api", {}) self._api_key = api_config.get("api_key", "app-McGLzBhBjeBCSEi7n83MtuTo") - self._api_url = api_config.get("api_url", "http://192.168.2.240/v1/chat-messages") + self._api_base_url = str(api_config.get("api_base_url", "http://192.168.2.240/v1")).rstrip("/") + self._api_mode = str(api_config.get("mode", "workflow")).strip().lower() + default_endpoint = "workflows/run" if self._api_mode == "workflow" else "chat-messages" + self._api_endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") + self._api_url = api_config.get("api_url", f"{self._api_base_url}/{self._api_endpoint}") + self._workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() self._response_mode = api_config.get("response_mode", "blocking") self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10)) self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180)) @@ -216,10 +221,11 @@ class MessageSummaryPlugin(MessagePluginInterface): sanitized_name = "群聊" return sanitized_name - async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Tuple[str, Dict[str, Any]]: + async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]: """解析 Dify 的 SSE 流式响应""" answer_parts: List[str] = [] metadata: Dict[str, Any] = {} + final_payload: Dict[str, Any] = {} buffer = "" async for chunk in response.content.iter_any(): @@ -253,18 +259,42 @@ class MessageSummaryPlugin(MessagePluginInterface): self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}") continue - event_name = payload.get("event", "") + event_name = str(payload.get("event", "")).strip() if event_name in {"message", "agent_message"}: chunk_text = payload.get("answer", "") if chunk_text: answer_parts.append(chunk_text) elif event_name in {"message_end", "workflow_finished"}: + final_payload = payload + if self._api_mode == "workflow": + payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {} + outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {} + if outputs: + for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + answer_parts = [self._stringify_output(outputs.get(key))] + break metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata elif event_name == "error": raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败") + else: + if self._api_mode == "workflow": + payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {} + outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {} + for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + chunk_text = self._stringify_output(outputs.get(key)) + if chunk_text: + answer_parts.append(chunk_text) + break answer = "".join(answer_parts) - return answer, metadata + return { + "answer": answer, + "metadata": metadata, + "data": final_payload.get("data", {}) if isinstance(final_payload, dict) else {}, + "event": final_payload.get("event", "") if isinstance(final_payload, dict) else "", + } def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str: """把 token 统计追加到总结文本末尾""" @@ -288,6 +318,46 @@ class MessageSummaryPlugin(MessagePluginInterface): cleaned = re.sub(r'\n{3,}', '\n\n', cleaned).strip() return cleaned + def _stringify_output(self, value: Any) -> str: + """把 workflow 输出统一转成文本""" + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False) + return str(value).strip() + + def _parse_workflow_response(self, response_data: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: + """解析 Dify workflow 响应""" + payload = (response_data or {}).get("data", {}) or {} + outputs = payload.get("outputs", {}) or {} + answer = "" + + if self._workflow_output_key and outputs.get(self._workflow_output_key) is not None: + answer = self._stringify_output(outputs.get(self._workflow_output_key)) + elif outputs.get("text") is not None: + answer = self._stringify_output(outputs.get("text")) + elif outputs.get("answer") is not None: + answer = self._stringify_output(outputs.get("answer")) + elif outputs.get("result_json") is not None: + answer = self._stringify_output(outputs.get("result_json")) + elif outputs.get("result") is not None: + answer = self._stringify_output(outputs.get("result")) + else: + for value in outputs.values(): + answer = self._stringify_output(value) + if answer: + break + + metadata = { + "usage": { + "total_tokens": payload.get("total_tokens", 0), + "latency": payload.get("elapsed_time", 0), + } + } + return answer, metadata + def _get_revoke_manager(self) -> Optional[MessageAutoRevoke]: """优先使用消息上下文中的撤回器,定时任务场景则懒初始化一个""" if self.revoke: @@ -352,15 +422,26 @@ class MessageSummaryPlugin(MessagePluginInterface): except Exception as e: self.LOG.error(f"压缩内容失败:{e}") - # 准备请求数据 - data = { - "inputs": {}, - "query": f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}", - "response_mode": self._response_mode, - "conversation_id": "", - "user": group_name if group_name is not None else "message_summary_bot", - "files": [] # 不包含文件 - } + prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}" + if self._api_mode == "workflow": + data = { + "inputs": { + "query": prompt, + "group_name": group_name, + "chat_content": content_compress, + }, + "response_mode": self._response_mode, + "user": group_name if group_name is not None else "message_summary_bot", + } + else: + data = { + "inputs": {}, + "query": prompt, + "response_mode": self._response_mode, + "conversation_id": "", + "user": group_name if group_name is not None else "message_summary_bot", + "files": [] + } self.LOG.info(f"群聊总结内容:{data}") # 设置请求头 @@ -384,23 +465,21 @@ class MessageSummaryPlugin(MessagePluginInterface): async with session.post(self._api_url, headers=headers, json=data) as response: response.raise_for_status() # 检查请求是否成功 if self._response_mode == "streaming": - answer, metadata = await self._parse_streaming_response(response) - response_data = { - "answer": answer, - "metadata": metadata, - } + response_data = await self._parse_streaming_response(response) else: response_data = await response.json() self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}") self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") - # 提取回答内容 - answer = response_data.get("answer", "") + if self._api_mode == "workflow": + answer, metadata = self._parse_workflow_response(response_data) + else: + answer = response_data.get("answer", "") + metadata = response_data.get("metadata", {}) + answer = self._clean_summary_output(answer) spath = "" - # 提取token使用情况 - metadata = response_data.get("metadata", {}) answer = self._append_usage_info(answer, metadata) if answer and len(answer.strip()) > 0: