refactor: centralize llm backend configuration

This commit is contained in:
liuwei
2026-04-08 13:43:41 +08:00
parent df1939d60b
commit aecb62cb4d
19 changed files with 945 additions and 792 deletions

View File

@@ -9,17 +9,7 @@ max_reply_sentences = 3
familiarity_hint = "有熟悉感,但不过度装熟"
[api]
provider = "openai_compatible"
api_base_url = "http://192.168.2.240:3000/v1"
endpoint = "chat/completions"
api_key = "sk-hC6WMLAsTdItpywyrYdxT6pQ4E7NARGbUKuPWRH0zMheen9e"
model = "gpt-5.4"
timeout_seconds = 45
temperature = 0.35
max_tokens = 120
stream = true
max_retries = 3
retry_delay_seconds = 1.0
backend = "openai_compatible_ai_auto_response"
[mode]
group_default_mode = "social"

View File

@@ -1,199 +1,6 @@
from __future__ import annotations
import json
import time
from typing import Dict, List, Optional
import requests
from utils.ai.unified_llm import UnifiedLLMClient
class LLMClient:
def __init__(self, config: Dict):
self.config = config or {}
self.provider = self.config.get("provider", "openai_compatible")
self.base_url = str(self.config.get("api_base_url", "")).rstrip("/")
self.endpoint = str(self.config.get("endpoint", "chat/completions")).lstrip("/")
self.api_key = self.config.get("api_key", "")
self.model = self.config.get("model", "")
self.timeout_seconds = int(self.config.get("timeout_seconds", 45))
self.temperature = float(self.config.get("temperature", 0.7))
self.max_tokens = int(self.config.get("max_tokens", 500))
self.stream = bool(self.config.get("stream", True))
self.max_retries = max(int(self.config.get("max_retries", 3) or 3), 1)
self.retry_delay_seconds = float(self.config.get("retry_delay_seconds", 1.0) or 1.0)
self.last_error = ""
class LLMClient(UnifiedLLMClient):
"""兼容旧调用方式的统一 LLM 客户端别名。"""
def chat(
self,
system_prompt: str,
user_prompt: str,
user_id: str,
image_urls: Optional[List[str]] = None,
) -> str:
self.last_error = ""
if not self.base_url:
self.last_error = "empty_base_url"
return ""
if self.provider == "openai_compatible":
return self._chat_openai_compatible(system_prompt, user_prompt, user_id, image_urls or [])
self.last_error = f"unsupported_provider:{self.provider}"
return ""
def _chat_openai_compatible(
self,
system_prompt: str,
user_prompt: str,
user_id: str,
image_urls: List[str],
) -> str:
if not self.model:
return ""
payload = {
"model": self.model,
"messages": self._build_messages(system_prompt, user_prompt, image_urls),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"user": user_id,
}
if self.stream:
payload["stream"] = True
headers = {
"Content-Type": "application/json",
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
for attempt in range(1, self.max_retries + 1):
try:
if self.stream:
text = self._chat_streaming(payload, headers)
else:
text = self._chat_non_streaming(payload, headers)
if text:
return text
except Exception as exc:
self.last_error = f"request_failed:attempt_{attempt}:{exc}"
if attempt < self.max_retries:
time.sleep(self.retry_delay_seconds * attempt)
return ""
def _chat_non_streaming(self, payload: Dict, headers: Dict[str, str]) -> str:
response = requests.post(
f"{self.base_url}/{self.endpoint}",
json=payload,
headers=headers,
timeout=self.timeout_seconds,
)
response.raise_for_status()
data = response.json()
text = self._extract_text(data)
if text:
return text
self.last_error = f"empty_model_output:{self.model}"
return ""
def _chat_streaming(self, payload: Dict, headers: Dict[str, str]) -> str:
chunks: List[str] = []
with requests.post(
f"{self.base_url}/{self.endpoint}",
json=payload,
headers=headers,
timeout=self.timeout_seconds,
stream=True,
) as response:
response.raise_for_status()
buffer = b""
for part in response.iter_content(chunk_size=None):
if not part:
continue
buffer += part
while b"\n\n" in buffer:
event, buffer = buffer.split(b"\n\n", 1)
try:
event_text = event.decode("utf-8")
except UnicodeDecodeError:
buffer = event + b"\n\n" + buffer
break
text_piece, done = self._parse_sse_event(event_text)
if text_piece:
chunks.append(text_piece)
if done:
final_text = "".join(chunks).strip()
if final_text:
return final_text
self.last_error = f"empty_stream_output:{self.model}"
return ""
final_text = "".join(chunks).strip()
if final_text:
return final_text
self.last_error = f"empty_stream_output:{self.model}"
return ""
@staticmethod
def _build_messages(system_prompt: str, user_prompt: str, image_urls: List[str]) -> List[Dict]:
user_content: str | List[Dict[str, object]]
if image_urls:
content_parts: List[Dict[str, object]] = [{"type": "text", "text": user_prompt}]
for image_url in image_urls:
if image_url:
content_parts.append({"type": "image_url", "image_url": {"url": image_url}})
user_content = content_parts
else:
user_content = user_prompt
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
]
@staticmethod
def _extract_text(data: Dict) -> str:
choices = data.get("choices") or []
if choices:
message = choices[0].get("message", {}) or {}
content = message.get("content")
if isinstance(content, str) and content.strip():
return content.strip()
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
text = item.get("text") or item.get("content")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
if parts:
return "\n".join(parts).strip()
for key in ("reasoning_content", "text", "output_text"):
value = message.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
for key in ("output_text", "text", "answer", "response"):
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return ""
@classmethod
def _parse_sse_event(cls, event_text: str) -> tuple[str, bool]:
lines = [line.strip() for line in event_text.splitlines() if line.strip()]
data_lines = [line[5:].strip() for line in lines if line.startswith("data:")]
if not data_lines:
return "", False
data = "\n".join(data_lines)
if data == "[DONE]":
return "", True
obj = json.loads(data)
choice = (obj.get("choices") or [{}])[0]
delta = choice.get("delta") or {}
content = delta.get("content")
if isinstance(content, str):
return content, False
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict):
text = item.get("text") or item.get("content")
if isinstance(text, str):
parts.append(text)
return "".join(parts), False
return "", False

View File

@@ -1,8 +1,6 @@
[Dify]
enable = true
api-key = "app-u5EnYq3ill19bm6pWJwGkY4D" # Dify的API Key
base-url = "http://192.168.2.240/v1" #Dify API接口base url
backend = "dify_workflow_chat"
commands = ["聊天"]
command-tip = """
@@ -17,4 +15,4 @@ http-proxy = ""
# 管理员和白名单用户是否免费使用
admin_ignore = true
whitelist_ignore = true
whitelist_ignore = true

View File

@@ -22,6 +22,7 @@ from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotMan
from utils.decorator.points_decorator import plugin_points_cost
from utils.media_downloader import MediaDownloader
from utils.string_utils import remove_reasoning_content, remove_trailing_content, remove_grok_render_tags
from utils.ai.unified_llm import UnifiedLLMClient
from wechat_ipad import WechatAPIClient
from wechat_ipad.models.message import MessageType
import aiohttp
@@ -97,12 +98,25 @@ class DifyPlugin(MessagePluginInterface):
self._commands = dify_config.get("commands", ["ai", "dify", "聊天", "AI"])
self.command_format = dify_config.get("command-tip", "聊天 请求内容")
self.enable = dify_config.get("enable", True)
self.api_key = dify_config.get("api-key", "")
self.base_url = dify_config.get("base-url", "")
self.price = dify_config.get("price", 0)
self.admin_ignore = dify_config.get("admin_ignore", False)
self.whitelist_ignore = dify_config.get("whitelist_ignore", False)
self.http_proxy = dify_config.get("http-proxy", "")
llm_config = dify_config.get("llm", {}) or {}
if not llm_config:
llm_config = {
"backend": dify_config.get("backend", ""),
"provider": "dify",
"mode": "workflow",
"api-key": self.api_key,
"base-url": self.base_url,
"endpoint": "workflows/run",
"response_mode": "blocking",
"request_timeout": 40,
}
self.llm_client = UnifiedLLMClient(llm_config)
self.api_key = self.llm_client.api_key
self.base_url = self.llm_client.base_url
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
@@ -445,13 +459,6 @@ class DifyPlugin(MessagePluginInterface):
if session_id not in self.conversations:
self.conversations[session_id] = []
# 准备请求头
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream" # 指定接受事件流
}
# 准备历史记录
history_text = ""
if self.conversations[session_id]:
@@ -471,122 +478,72 @@ class DifyPlugin(MessagePluginInterface):
# 如果有历史记录添加到inputs_params中
if history_text:
inputs_params["history"] = history_text
if self.conversations[session_id]:
inputs_params["conversation_history"] = self.conversations[session_id]
if files is None:
files = []
self.LOG.debug(f"Dify请求准备: files={len(files)}")
# 准备请求数据
data = {
"files": files,
"user": user_id,
"inputs": inputs_params,
"response_mode": "blocking" # 使用阻塞响应模式
}
# 如果有历史记录同时添加到conversation_history中
if self.conversations[session_id]:
data["conversation_history"] = self.conversations[session_id]
# 设置代理
proxy = self.http_proxy if self.http_proxy else None
# 发送请求
url = f"{self.base_url}/workflows/run"
self.LOG.info(f"发送请求到Dify API: {url}")
self.LOG.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}")
self.LOG.info(f"Dify请求准备: session_id={session_id}, query_len={len(query)}, files={len(files)}")
try:
async with aiohttp.ClientSession() as session:
response = await session.post(url, headers=headers, json=data, proxy=proxy, timeout=40)
if response.status != 200:
error_text = await response.text()
self.LOG.error(f"Dify API请求失败: {response.status} {error_text}")
return False, f"请求失败,状态码: {response.status}"
response = await asyncio.to_thread(
self.llm_client.generate,
query,
user_id,
inputs_params,
f"dify:{session_id}",
"",
"",
None,
files,
)
if not response:
self.LOG.error(f"Dify API请求失败: {self.llm_client.last_error}")
return False, "请求失败"
# 解析响应
response_data = await response.json()
self.LOG.info(f"收到Dify API响应: {json.dumps(response_data, ensure_ascii=False)}")
answer = response.get("text", "") or ""
total_tokens = int((response.get("usage", {}) or {}).get("total_tokens") or 0)
raw_data = response.get("raw", {}) or {}
outputs = ((raw_data.get("data") or {}).get("outputs") or {}) if isinstance(raw_data, dict) else {}
# 提取回答内容
answer = ""
total_tokens = 0
if outputs and "result" in outputs and "type" in outputs:
if outputs["type"] in {"image", "video"}:
downloader = MediaDownloader()
media_path = await downloader.download_media(outputs["result"])
answer = media_path
# 获取输出内容
outputs = response_data.get("data", {}).get("outputs", {})
if outputs:
# 处理媒体类型返回
if "result" in outputs and "type" in outputs:
if outputs["type"] == "image":
downloader = MediaDownloader()
image_url = outputs["result"]
image_path = await downloader.download_media(image_url)
answer = image_path
if outputs["type"] == "video":
downloader = MediaDownloader()
image_url = outputs["result"]
image_path = await downloader.download_media(image_url)
answer = image_path
# 处理文本类型返回
elif "text" in outputs and isinstance(outputs["text"], str):
answer = outputs["text"]
# 兼容旧版处理逻辑
else:
for key, value in outputs.items():
if isinstance(value, str) and value.strip():
answer += value
elif isinstance(value, dict):
# 处理嵌套字典的情况
for sub_key, sub_value in value.items():
if isinstance(sub_value, str) and sub_value.strip():
answer += sub_value
elif isinstance(value, list):
# 处理列表的情况
for item in value:
if isinstance(item, str) and item.strip():
answer += item
elif isinstance(item, dict):
# 处理列表中的字典
for item_key, item_value in item.items():
if isinstance(item_value, str) and item_value.strip():
answer += item_value
if answer and not os.path.isfile(answer):
answer = remove_reasoning_content(answer)
answer = remove_trailing_content(answer)
answer = remove_grok_render_tags(answer)
answer = re.sub(r'\n{3,}', '\n\n', answer).strip()
# 获取token使用情况
total_tokens = response_data.get("data", {}).get("total_tokens", 0)
# 更新会话历史
self.conversations[session_id].append({
"role": "user",
"content": query
})
if answer and not os.path.isfile(answer):
answer = remove_reasoning_content(answer)
answer = remove_trailing_content(answer)
answer = remove_grok_render_tags(answer)
answer = re.sub(r'\n{3,}', '\n\n', answer).strip()
self.conversations[session_id].append({
"role": "assistant",
"content": answer
})
# 更新会话历史
self.conversations[session_id].append({
"role": "user",
"content": query
})
# 限制会话历史长度
if len(self.conversations[session_id]) > self.max_history_length * 2:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
self.conversations[session_id].append({
"role": "assistant",
"content": answer
})
# 统计token使用情况
if total_tokens > 0:
if user_id in self.token_usage:
self.token_usage[user_id] += total_tokens
else:
self.token_usage[user_id] = total_tokens
# 限制会话历史长度
if len(self.conversations[session_id]) > self.max_history_length * 2:
self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:]
self.LOG.info(
f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens")
# 统计token使用情况
if total_tokens > 0:
if user_id in self.token_usage:
self.token_usage[user_id] += total_tokens
else:
self.token_usage[user_id] = total_tokens
self.LOG.info(
f"用户 {user_id} 本次消耗 {total_tokens} tokens累计 {self.token_usage[user_id]} tokens")
return True, answer
return True, answer
except Exception as e:
self.LOG.error(f"处理Dify响应时出错: {str(e)}")

View File

@@ -1,5 +1,6 @@
[GameTask]
enable = true
backend = "openai_compatible_game_task"
command = ["/t", "/a", "/s", "/r", "/l", "/h"]
command-format = """
🎮 百科问答指令:
@@ -10,8 +11,3 @@ command-format = """
/l - 查看活跃任务
/h - 查看未完成任务
"""
# AI获取题目确认答案用的配置信息
authorization = "Bearer b8586595-eb81-483d-8e91-a35cc789729e" # 请替换为真实的Authorization token
url = 'https://ark.cn-beijing.volces.com/api/v3/chat/completions'
model = "doubao-1-5-lite-32k-250115"

View File

@@ -12,8 +12,8 @@ from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotMan
from utils.decorator.points_decorator import points_reward_decorator
from db.connection import DBConnectionManager
from db.encyclopedia import EncyclopediaDB
import requests
import json
from utils.ai.unified_llm import UnifiedLLMClient
class GameTaskPlugin(MessagePluginInterface):
@@ -81,11 +81,25 @@ class GameTaskPlugin(MessagePluginInterface):
/l - 查看活跃任务
/h - 查看未完成任务
""")
self.authorization = self._config.get("GameTask", {}).get("authorization", "")
self.url = self._config.get("GameTask", {}).get("url", "")
self.model = self._config.get("GameTask", {}).get("model", "")
plugin_config = self._config.get("GameTask", {})
self.authorization = plugin_config.get("authorization", "")
self.url = plugin_config.get("url", "")
self.model = plugin_config.get("model", "")
llm_config = plugin_config.get("llm", {}) or {}
if not llm_config:
llm_config = {
"backend": plugin_config.get("backend", ""),
"provider": "openai_compatible",
"authorization": self.authorization,
"url": self.url,
"model": self.model,
"stream": False,
"temperature": 0.2,
"max_tokens": 1000,
}
self.llm_client = UnifiedLLMClient(llm_config)
self.enable = self._config.get("GameTask", {}).get("enable", True)
self.enable = plugin_config.get("enable", True)
# 初始化数据库连接
self.db_manager = DBConnectionManager.get_instance()
@@ -584,40 +598,14 @@ class GameTaskPlugin(MessagePluginInterface):
return None
def message_task_json(self, prompt, content):
# 设置Authorization和URL
authorization = self.authorization # 请替换为真实的Authorization token
url = self.url
data = {
# "stream": True,
"model": self.model,
"messages": [
{
"role": "system",
"content": f"{prompt}"
},
{
"role": "user",
"content": f"{content}"
}
]
}
# 设置请求头
headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": authorization
}
# 发送POST请求
response = requests.post(url, headers=headers, data=json.dumps(data), )
response.encoding = 'utf-8'
# 输出响应内容
print(response.status_code)
print(response.text)
return json.loads(self.extract_content(response.text))
response = self.llm_client.generate(
system_prompt=prompt,
user_prompt=str(content),
user="game_task_bot",
)
if not response or not response.get("text"):
raise RuntimeError(f"LLM 调用失败: {self.llm_client.last_error}")
return json.loads(response["text"])
def game_question_json(self, question):
fields = [

View File

@@ -1,11 +1,8 @@
[GlobalNews]
enable = true
command = ["全球新闻", "国际新闻", "环球新闻", "政经新闻", "政治经济新闻"]
backend = "dify_chat_global_news"
command-format = """
🌍全球新闻指令:
全球新闻 - 获取最新的全球政治经济新闻
"""
authorization = "Bearer app-rhhKkbvHd2IAQoGX7xTzXZJj" # 请替换为真实的Authorization token
url = 'http://192.168.2.240/v1/chat-messages'

View File

@@ -1,8 +1,6 @@
import asyncio
import json
import threading
import time # 添加这一行
import aiohttp
from typing import Dict, Any, List, Optional, Tuple
from base.plugin_common.message_plugin_interface import MessagePluginInterface
@@ -11,6 +9,7 @@ from utils.decorator.plugin_decorators import plugin_stats_decorator
from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager
from utils.decorator.points_decorator import plugin_points_cost
from utils.markdown_to_image import convert_md_str_to_image
from utils.ai.unified_llm import UnifiedLLMClient
from wechat_ipad import WechatAPIClient
# 导入新闻抓取函数
@@ -75,9 +74,19 @@ class GlobalNewsPlugin(MessagePluginInterface):
["全球新闻", "国际新闻", "环球新闻", "政经新闻"])
self.command_format = self._config.get("GlobalNews", {}).get("command-format",
"全球新闻 - 获取最新的全球政治经济新闻")
self.enable = self._config.get("GlobalNews", {}).get("enable", True)
self._key = self._config.get("GlobalNews", {}).get("authorization", "")
self._url = self._config.get("GlobalNews", {}).get("url", "")
plugin_config = self._config.get("GlobalNews", {})
self.enable = plugin_config.get("enable", True)
llm_config = plugin_config.get("llm", {}) or {}
if not llm_config:
llm_config = {
"backend": plugin_config.get("backend", ""),
"provider": "dify",
"mode": "chat",
"authorization": plugin_config.get("authorization", ""),
"url": plugin_config.get("url", ""),
"response_mode": "blocking",
}
self.llm_client = UnifiedLLMClient(llm_config)
self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}")
return True
@@ -186,9 +195,7 @@ class GlobalNewsPlugin(MessagePluginInterface):
news_titles = "\n".join(results)
# 使用AI分析新闻
markdown_news = await self._run_in_executor(
self.dify_news_title_analyze, news_titles
)
markdown_news = await self._run_in_executor(self.analyze_news_titles, news_titles)
# 转换为图片
image_path = await self._run_in_executor(
@@ -205,61 +212,15 @@ class GlobalNewsPlugin(MessagePluginInterface):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
async def dify_news_title_analyze(self, content: str) -> str:
"""步分析新闻标题
Args:
content: 新闻标题内容
Returns:
str: 分析后的内容
"""
# 设置Authorization和URL
data = {
"response_mode": "blocking",
"conversation_id": "",
"inputs": {},
"query": content,
"user": "a-bot-global_news"
}
# 设置请求头
headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": self._key
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(self._url, headers=headers, json=data) as response:
if response.status != 200:
self.LOG.error(f"新闻分析请求失败: {response.status}")
return None
response_data = await response.json()
self.LOG.debug(f"新闻分析响应: {response_data}")
return self.extract_content(response_data)
except Exception as e:
self.LOG.error(f"新闻分析请求出错: {e}")
return None
def extract_content(self, data):
"""解析API响应内容
Args:
data: API返回的响应数据可以是字典或字符串
Returns:
str: 提取的answer内容
"""
try:
# 如果是字符串,尝试解析为字典
if isinstance(data, str):
data = json.loads(data)
# 如果是字典直接获取answer
if isinstance(data, dict):
answer = data.get('answer', '')
if answer:
return answer
return None
except Exception as e:
self.LOG.error(f"解析响应失败: {str(e)}")
def analyze_news_titles(self, content: str) -> Optional[str]:
"""步分析新闻标题,便于在线程池中复用。"""
response = self.llm_client.run(
prompt=content,
user="a-bot-global_news",
inputs={"query": content},
tag="global_news",
)
if not response:
self.LOG.error(f"新闻分析请求失败: {self.llm_client.last_error}")
return None
return response.get("text") or None

View File

@@ -3,12 +3,7 @@ enable = true
[api]
enable = true
base_url = "http://192.168.2.240/v1"
api_key = "app-b2cj03DipGCIAmgBfcx7SKsT"
mode = "workflow"
endpoint = "workflows/run"
workflow_output_key = "text"
response_mode = "streaming"
backend = "dify_workflow_member_context"
request_timeout = 240
[profile]

View File

@@ -1,187 +1,6 @@
# -*- coding: utf-8 -*-
import json
from typing import Dict, Optional
import requests
from loguru import logger
from utils.ai.unified_llm import UnifiedLLMClient
class DifyClient:
"""Dify completion/workflow 通用调用客户端"""
class DifyClient(UnifiedLLMClient):
"""兼容旧 DifyClient 接口的统一客户端封装。"""
def __init__(self, api_config: Optional[Dict] = None):
api_config = api_config or {}
self.LOG = logger
self.enabled = bool(api_config.get("enable", api_config.get("enabled", False)))
self.base_url = (api_config.get("base_url") or "").rstrip("/")
self.api_key = api_config.get("api_key", "")
self.timeout = int(api_config.get("request_timeout", 60))
self.mode = str(api_config.get("mode", "completion")).strip().lower()
default_endpoint = "workflows/run" if self.mode == "workflow" else "completion-messages"
self.endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/")
self.workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip()
self.response_mode = str(api_config.get("response_mode", "blocking")).strip().lower()
def is_available(self) -> bool:
return self.enabled and bool(self.base_url and self.api_key)
def run(self, prompt: str, user: str, inputs: Optional[Dict] = None,
tag: str = "") -> Optional[Dict]:
if not self.is_available():
return None
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload_inputs = dict(inputs or {})
if self.mode == "completion":
payload_inputs.setdefault("query", prompt)
elif prompt and "query" not in payload_inputs:
payload_inputs["query"] = prompt
payload = {
"inputs": payload_inputs,
"response_mode": self.response_mode,
"user": user,
}
url = f"{self.base_url}/{self.endpoint}"
try:
self.LOG.info(
f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, response_mode={self.response_mode}, "
f"endpoint={self.endpoint}, tag={tag}"
)
if self.response_mode == "streaming":
parsed = self._run_streaming(url, headers, payload, tag)
else:
response = requests.post(url, headers=headers, json=payload, timeout=self.timeout)
response.raise_for_status()
data = response.json()
parsed = self._parse_response(data)
if parsed is not None:
return parsed
self.LOG.warning(f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}")
return None
except Exception as e:
self.LOG.warning(f"[成员交互摘要][Dify] 请求失败: mode={self.mode}, tag={tag}, error={e}")
return None
def _run_streaming(self, url: str, headers: Dict, payload: Dict, tag: str) -> Optional[Dict]:
with requests.post(url, headers=headers, json=payload, timeout=self.timeout, stream=True) as response:
response.raise_for_status()
event_name = ""
text_fragments = []
final_payload = None
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = str(raw_line).strip()
if not line:
continue
if line.startswith("event:"):
event_name = line[6:].strip()
continue
if not line.startswith("data:"):
continue
data_text = line[5:].strip()
if not data_text or data_text == "[DONE]":
continue
try:
chunk = json.loads(data_text)
except Exception:
continue
candidate_text = self._extract_stream_text(chunk)
if candidate_text:
text_fragments.append(candidate_text)
chunk_event = str(chunk.get("event") or event_name or "").strip()
if chunk_event in {"workflow_finished", "message_end"}:
final_payload = chunk
if final_payload:
parsed = self._parse_response(final_payload)
if parsed and parsed.get("text"):
return parsed
text = "".join(fragment for fragment in text_fragments if fragment)
if text:
return {
"text": text.strip(),
"usage": {},
"raw": final_payload or {},
}
self.LOG.warning(f"[成员交互摘要][Dify] 流式响应未产出有效内容: tag={tag}")
return None
def _parse_response(self, data: Dict) -> Optional[Dict]:
if self.mode == "workflow":
return self._parse_workflow_response(data)
answer = data.get("answer", "")
usage = (data.get("metadata") or {}).get("usage", {}) or {}
return {
"text": str(answer or "").strip(),
"usage": usage,
"raw": data,
}
def _parse_workflow_response(self, data: Dict) -> Optional[Dict]:
payload = (data or {}).get("data", {}) or {}
outputs = payload.get("outputs", {}) or {}
text = ""
if self.workflow_output_key and outputs.get(self.workflow_output_key) is not None:
value = outputs.get(self.workflow_output_key)
text = self._stringify_output(value)
elif outputs.get("text") is not None:
text = self._stringify_output(outputs.get("text"))
elif outputs.get("answer") is not None:
text = self._stringify_output(outputs.get("answer"))
elif outputs.get("result_json") is not None:
text = self._stringify_output(outputs.get("result_json"))
elif outputs.get("result") is not None:
text = self._stringify_output(outputs.get("result"))
else:
for value in outputs.values():
text = self._stringify_output(value)
if text:
break
usage = {
"total_tokens": payload.get("total_tokens"),
"latency": payload.get("elapsed_time"),
}
return {
"text": str(text or "").strip(),
"usage": usage,
"raw": data,
}
def _extract_stream_text(self, chunk: Dict) -> str:
if not isinstance(chunk, dict):
return ""
payload = (chunk.get("data") or {}) if isinstance(chunk.get("data"), dict) else {}
outputs = payload.get("outputs", {}) if isinstance(payload.get("outputs"), dict) else {}
for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]):
if outputs.get(key) is not None:
return self._stringify_output(outputs.get(key))
for key in ("text", "answer"):
if chunk.get(key) is not None:
return self._stringify_output(chunk.get(key))
return ""
@staticmethod
def _stringify_output(value) -> str:
if value is None:
return ""
if isinstance(value, str):
return value.strip()
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False)
return str(value).strip()

View File

@@ -513,7 +513,7 @@ class MemberContextService:
usage = response.get("usage", {}) or {}
parsed_meta = parsed.get("meta", {}) or {}
parsed_meta.update({
"ai_provider": "dify",
"ai_provider": self.dify_client.provider,
"ai_mode": self.dify_client.mode,
"ai_tokens": usage.get("total_tokens"),
"ai_latency": usage.get("latency"),

View File

@@ -4,14 +4,8 @@
enabled = true
[api]
api_key = "app-shCA6bo5l2VDmnvhg2BtuJbk"
api_base_url = "http://192.168.2.240/v1"
mode = "workflow"
endpoint = "workflows/run"
workflow_output_key = "text"
response_mode = "streaming"
backend = "dify_workflow_message_summary"
connect_timeout_seconds = 10
request_timeout_seconds = 180
retry_delays_seconds = [10, 20]
[output]

View File

@@ -6,8 +6,6 @@ from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, Tuple, Optional, List
import aiohttp
from aiohttp import ClientTimeout
from loguru import logger
from base.plugin_common.message_plugin_interface import MessagePluginInterface
@@ -22,6 +20,7 @@ from utils.markdown_to_image import convert_md_str_to_image
from utils.revoke.message_auto_revoke import MessageAutoRevoke
from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus
from utils.string_utils import remove_reasoning_content, remove_trailing_content
from utils.ai.unified_llm import UnifiedLLMClient
from utils.wechat.contact_manager import ContactManager
from utils.wechat.message_to_db import MessageStorage
from wechat_ipad import WechatAPIClient
@@ -93,6 +92,10 @@ class MessageSummaryPlugin(MessagePluginInterface):
self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10))
self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180))
self._retry_delays_seconds = api_config.get("retry_delays_seconds", [10, 20])
self.llm_client = UnifiedLLMClient(api_config)
self._api_mode = self.llm_client.mode or self._api_mode
self._response_mode = self.llm_client.response_mode or self._response_mode
self._workflow_output_key = self.llm_client.workflow_output_key or self._workflow_output_key
self.message_storage = MessageStorage()
db_manager = context.get("db_manager")
if db_manager:
@@ -221,81 +224,6 @@ class MessageSummaryPlugin(MessagePluginInterface):
sanitized_name = "群聊"
return sanitized_name
async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
"""解析 Dify 的 SSE 流式响应"""
answer_parts: List[str] = []
metadata: Dict[str, Any] = {}
final_payload: Dict[str, Any] = {}
buffer = ""
async for chunk in response.content.iter_any():
if not chunk:
continue
buffer += chunk.decode("utf-8", errors="ignore")
while "\n\n" in buffer:
raw_event, buffer = buffer.split("\n\n", 1)
raw_event = raw_event.strip()
if not raw_event:
continue
data_lines = []
for line in raw_event.splitlines():
line = line.strip()
if line.startswith("data:"):
data_lines.append(line[5:].strip())
if not data_lines:
continue
payload_text = "\n".join(data_lines).strip()
if not payload_text or payload_text == "[DONE]":
continue
try:
payload = json.loads(payload_text)
except json.JSONDecodeError:
self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}")
continue
event_name = str(payload.get("event", "")).strip()
if event_name in {"message", "agent_message"}:
chunk_text = payload.get("answer", "")
if chunk_text:
answer_parts.append(chunk_text)
elif event_name in {"message_end", "workflow_finished"}:
final_payload = payload
if self._api_mode == "workflow":
payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {}
outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {}
if outputs:
for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]):
if outputs.get(key) is not None:
answer_parts = [self._stringify_output(outputs.get(key))]
break
metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata
elif event_name == "error":
raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败")
else:
if self._api_mode == "workflow":
payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {}
outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {}
for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]):
if outputs.get(key) is not None:
chunk_text = self._stringify_output(outputs.get(key))
if chunk_text:
answer_parts.append(chunk_text)
break
answer = "".join(answer_parts)
return {
"answer": answer,
"metadata": metadata,
"data": final_payload.get("data", {}) if isinstance(final_payload, dict) else {},
"event": final_payload.get("event", "") if isinstance(final_payload, dict) else "",
}
def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str:
"""把 token 统计追加到总结文本末尾"""
if not answer or not answer.strip():
@@ -440,7 +368,6 @@ class MessageSummaryPlugin(MessagePluginInterface):
async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]:
"""生成总结"""
# Dify API配置
content_compress = chat_content
try:
content_compress = compress_chat_data(chat_content)
@@ -449,96 +376,50 @@ class MessageSummaryPlugin(MessagePluginInterface):
self.LOG.error(f"压缩内容失败:{e}")
prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}"
if self._api_mode == "workflow":
data = {
"inputs": {
"query": prompt,
"group_name": group_name,
"chat_content": content_compress,
},
"response_mode": self._response_mode,
"user": group_name if group_name is not None else "message_summary_bot",
}
else:
data = {
"inputs": {},
"query": prompt,
"response_mode": self._response_mode,
"conversation_id": "",
"user": group_name if group_name is not None else "message_summary_bot",
"files": []
}
self.LOG.info(f"群聊总结内容:{data}")
# 设置请求头
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream" if self._response_mode == "streaming" else "application/json"
inputs = {
"query": prompt,
"group_name": group_name,
"chat_content": content_compress,
}
self.LOG.info(f"群聊总结请求准备: group={group_name}, mode={self._api_mode}, response_mode={self._response_mode}")
max_retries = len(self._retry_delays_seconds) + 1
for attempt in range(1, max_retries + 1):
try:
custom_timeout = ClientTimeout(
total=None,
connect=self._connect_timeout_seconds,
sock_read=self._request_timeout_seconds
response = await asyncio.to_thread(
self.llm_client.run,
prompt,
group_name if group_name is not None else "message_summary_bot",
inputs,
f"message_summary:{group_name}",
)
conn = aiohttp.TCPConnector(keepalive_timeout=60) # 保持连接活跃
async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session:
async with session.post(self._api_url, headers=headers, json=data) as response:
response.raise_for_status() # 检查请求是否成功
if self._response_mode == "streaming":
response_data = await self._parse_streaming_response(response)
else:
response_data = await response.json()
if not response or not response.get("text"):
raise RuntimeError(self.llm_client.last_error or "LLM 未返回有效总结内容")
self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}")
self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}")
answer = self._clean_summary_output(response.get("text", ""))
metadata = {"usage": response.get("usage", {}) or {}}
spath = ""
answer = self._append_usage_info(answer, metadata)
if self._api_mode == "workflow":
answer, metadata = self._parse_workflow_response(response_data)
else:
answer = response_data.get("answer", "")
metadata = response_data.get("metadata", {})
if answer and len(answer.strip()) > 0:
try:
timestamp = int(time.time())
output_path = f"summary_{timestamp}.png"
self.LOG.info(f"开始生成图片: {output_path}")
spath = await convert_md_str_to_image(answer, output_path)
self.LOG.info(f"成功生成图片: {spath}")
except Exception as e:
self.LOG.error(f"生成图片失败: {e}", exc_info=True)
max_length = 2000
if len(answer) > max_length:
answer = answer[:max_length] + "\n\n... (内容过长,已截断)"
self.LOG.info("图片生成失败,将发送文本消息作为备选方案")
spath = None
else:
spath = None
return answer, spath
if not answer or not answer.strip():
raise RuntimeError("Dify 未返回有效总结内容")
answer = self._clean_summary_output(answer)
spath = ""
answer = self._append_usage_info(answer, metadata)
if answer and len(answer.strip()) > 0:
try:
# 使用唯一文件名并指定完整路径
timestamp = int(time.time())
output_path = f"summary_{timestamp}.png"
self.LOG.info(f"开始生成图片: {output_path}")
spath = await convert_md_str_to_image(answer, output_path)
self.LOG.info(f"成功生成图片: {spath}")
except Exception as e:
self.LOG.error(f"生成图片失败: {e}", exc_info=True)
try:
max_length = 2000
if len(answer) > max_length:
answer = answer[:max_length] + "\n\n... (内容过长,已截断)"
self.LOG.info("图片生成失败,将发送文本消息作为备选方案")
spath = None
except Exception as fallback_error:
self.LOG.error(f"备选文本发送也失败: {fallback_error}")
spath = None
else:
spath = None
# 返回文本内容和图片路径
return answer, spath
except aiohttp.ClientError as e:
self.LOG.error(f"请求Dify API时出错: attempt={attempt}/{max_retries}, error={e}")
except json.JSONDecodeError as e:
self.LOG.error(f"解析Dify API响应时出错: attempt={attempt}/{max_retries}, error={e}")
except Exception as e:
self.LOG.error(f"处理总结时出现未知错误: attempt={attempt}/{max_retries}, error={e}")