diff --git a/admin/dashboard/blueprints/system.py b/admin/dashboard/blueprints/system.py index f9afe01..62c1566 100644 --- a/admin/dashboard/blueprints/system.py +++ b/admin/dashboard/blueprints/system.py @@ -10,6 +10,7 @@ import psutil from collections import deque import gzip import json +import yaml # 创建系统信息蓝图 system_bp = Blueprint('system', __name__) @@ -156,6 +157,52 @@ def get_current_user_info(): return jsonify(result) +@system_bp.route('/api/system/config/raw', methods=['GET']) +@login_required +def get_system_config_raw(): + try: + server = current_app.dashboard_server + config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'config.yaml')) + with open(config_path, 'r', encoding='utf-8') as f: + config_text = f.read() + robot_config = getattr(getattr(server, "robot", None), "config", None) + llm_config = getattr(robot_config, "llm", {}) if robot_config else {} + llm_backends = (llm_config or {}).get("backends", {}) + return jsonify({ + "success": True, + "data": config_text, + "path": config_path, + "llm_backends": list((llm_backends or {}).keys()), + }) + except Exception as e: + logger.error(f"读取系统配置失败: {e}") + return jsonify({"success": False, "message": str(e)}), 500 + + +@system_bp.route('/api/system/config/update', methods=['POST']) +@login_required +def update_system_config(): + try: + server = current_app.dashboard_server + data = request.get_json() or {} + config_text = data.get("config_text") + if config_text is None: + return jsonify({"success": False, "message": "缺少配置内容"}), 400 + + yaml.safe_load(config_text) + config_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'config.yaml')) + with open(config_path, 'w', encoding='utf-8') as f: + f.write(config_text) + + if getattr(server, "robot", None) and getattr(server.robot, "config", None): + server.robot.config.reload() + + return jsonify({"success": True, "message": "全局配置已保存"}) + except Exception as e: + logger.error(f"保存系统配置失败: {e}") + return jsonify({"success": False, "message": str(e)}), 500 + + @system_bp.route('/api/restart_service', methods=['POST']) @login_required def restart_service(): diff --git a/admin/dashboard/templates/system_status.html b/admin/dashboard/templates/system_status.html index 8968c87..26ced73 100644 --- a/admin/dashboard/templates/system_status.html +++ b/admin/dashboard/templates/system_status.html @@ -29,6 +29,29 @@ + + +
+
+

全局配置

+

集中维护 `config.yaml`,其中 `llm.backends` 用于统一管理所有模型后端。

+
+
+ 刷新配置 + 保存配置 +
+
+
+ 配置文件:{% raw %}{{ configPath }}{% endraw %} + LLM 后端:{% raw %}{{ llmBackends.join(', ') }}{% endraw %} +
+ + +
{% endblock %} @@ -42,11 +65,15 @@ currentView: '14', showTimeRangeSelector: false, frameUrl: '{{ src_url }}', - restarting: false + restarting: false, + systemConfigText: '', + configPath: '', + llmBackends: [] } }, mounted() { this.currentView = '14'; + this.loadSystemConfig(); }, methods: { reloadIframe() { @@ -81,6 +108,35 @@ } finally { this.restarting = false; } + }, + async loadSystemConfig() { + try { + const response = await axios.get('/api/system/config/raw'); + if (response.data.success) { + this.systemConfigText = response.data.data || ''; + this.configPath = response.data.path || ''; + this.llmBackends = response.data.llm_backends || []; + } else { + this.$message.error(response.data.message || '读取全局配置失败'); + } + } catch (error) { + this.$message.error(error.response?.data?.message || '读取全局配置失败'); + } + }, + async saveSystemConfig() { + try { + const response = await axios.post('/api/system/config/update', { + config_text: this.systemConfigText + }); + if (response.data.success) { + this.$message.success(response.data.message || '保存成功'); + this.loadSystemConfig(); + } else { + this.$message.error(response.data.message || '保存失败'); + } + } catch (error) { + this.$message.error(error.response?.data?.message || '保存失败'); + } } } }); @@ -108,5 +164,7 @@ .iframe-shell-card .el-card__body { height: calc(100% - 73px); } .iframe-shell { height: 100%; border-radius: 18px; overflow: hidden; border: 1px solid rgba(148,163,184,0.12); background: rgba(248,250,252,0.82); } .iframe-shell iframe { width: 100%; height: 100%; border: none; display: block; background: #fff; } + .workspace-card .el-card__body { display: flex; flex-direction: column; gap: 12px; } + .config-meta { display: flex; justify-content: space-between; gap: 12px; color: #64748b; font-size: 12px; } {% endblock %} diff --git a/config.yaml b/config.yaml index 48a5f9c..78571bc 100644 --- a/config.yaml +++ b/config.yaml @@ -36,3 +36,62 @@ wx_config: #微信管理账号,用于接收部分管理员指令 #菜单调整和系统更新 admin: [ "Jyunere" ] + +llm: + default_backend: "dify_workflow_chat" + backends: + dify_workflow_chat: + provider: "dify" + mode: "workflow" + api_key: "app-u5EnYq3ill19bm6pWJwGkY4D" + api_base_url: "http://192.168.2.240/v1" + endpoint: "workflows/run" + response_mode: "blocking" + request_timeout: 40 + dify_workflow_member_context: + provider: "dify" + mode: "workflow" + api_key: "app-b2cj03DipGCIAmgBfcx7SKsT" + api_base_url: "http://192.168.2.240/v1" + endpoint: "workflows/run" + workflow_output_key: "text" + response_mode: "streaming" + request_timeout: 240 + dify_workflow_message_summary: + provider: "dify" + mode: "workflow" + api_key: "app-shCA6bo5l2VDmnvhg2BtuJbk" + api_base_url: "http://192.168.2.240/v1" + endpoint: "workflows/run" + workflow_output_key: "text" + response_mode: "streaming" + request_timeout: 180 + dify_chat_global_news: + provider: "dify" + mode: "chat" + api_key: "app-rhhKkbvHd2IAQoGX7xTzXZJj" + api_base_url: "http://192.168.2.240/v1" + endpoint: "chat-messages" + response_mode: "blocking" + request_timeout: 60 + openai_compatible_game_task: + provider: "openai_compatible" + api_url: "https://ark.cn-beijing.volces.com/api/v3/chat/completions" + api_key: "b8586595-eb81-483d-8e91-a35cc789729e" + model: "doubao-1-5-lite-32k-250115" + stream: false + temperature: 0.2 + max_tokens: 1000 + timeout_seconds: 60 + openai_compatible_ai_auto_response: + provider: "openai_compatible" + api_base_url: "http://192.168.2.240:3000/v1" + endpoint: "chat/completions" + api_key: "sk-hC6WMLAsTdItpywyrYdxT6pQ4E7NARGbUKuPWRH0zMheen9e" + model: "gpt-5.4" + stream: true + temperature: 0.35 + max_tokens: 120 + timeout_seconds: 45 + max_retries: 3 + retry_delay_seconds: 1.0 diff --git a/configuration.py b/configuration.py index 2c381f1..f4ad382 100644 --- a/configuration.py +++ b/configuration.py @@ -31,3 +31,5 @@ class Config(object): # wx 相关配置 self.wx_config = yconfig.get("wx_config", {}) + # LLM 集中配置 + self.llm = yconfig.get("llm", {}) diff --git a/plugins/ai_auto_response/config.toml b/plugins/ai_auto_response/config.toml index 642eb5d..c7898f1 100644 --- a/plugins/ai_auto_response/config.toml +++ b/plugins/ai_auto_response/config.toml @@ -9,17 +9,7 @@ max_reply_sentences = 3 familiarity_hint = "有熟悉感,但不过度装熟" [api] -provider = "openai_compatible" -api_base_url = "http://192.168.2.240:3000/v1" -endpoint = "chat/completions" -api_key = "sk-hC6WMLAsTdItpywyrYdxT6pQ4E7NARGbUKuPWRH0zMheen9e" -model = "gpt-5.4" -timeout_seconds = 45 -temperature = 0.35 -max_tokens = 120 -stream = true -max_retries = 3 -retry_delay_seconds = 1.0 +backend = "openai_compatible_ai_auto_response" [mode] group_default_mode = "social" diff --git a/plugins/ai_auto_response/llm_client.py b/plugins/ai_auto_response/llm_client.py index 6ab0725..e538597 100644 --- a/plugins/ai_auto_response/llm_client.py +++ b/plugins/ai_auto_response/llm_client.py @@ -1,199 +1,6 @@ -from __future__ import annotations - -import json -import time -from typing import Dict, List, Optional - -import requests +from utils.ai.unified_llm import UnifiedLLMClient -class LLMClient: - def __init__(self, config: Dict): - self.config = config or {} - self.provider = self.config.get("provider", "openai_compatible") - self.base_url = str(self.config.get("api_base_url", "")).rstrip("/") - self.endpoint = str(self.config.get("endpoint", "chat/completions")).lstrip("/") - self.api_key = self.config.get("api_key", "") - self.model = self.config.get("model", "") - self.timeout_seconds = int(self.config.get("timeout_seconds", 45)) - self.temperature = float(self.config.get("temperature", 0.7)) - self.max_tokens = int(self.config.get("max_tokens", 500)) - self.stream = bool(self.config.get("stream", True)) - self.max_retries = max(int(self.config.get("max_retries", 3) or 3), 1) - self.retry_delay_seconds = float(self.config.get("retry_delay_seconds", 1.0) or 1.0) - self.last_error = "" +class LLMClient(UnifiedLLMClient): + """兼容旧调用方式的统一 LLM 客户端别名。""" - def chat( - self, - system_prompt: str, - user_prompt: str, - user_id: str, - image_urls: Optional[List[str]] = None, - ) -> str: - self.last_error = "" - if not self.base_url: - self.last_error = "empty_base_url" - return "" - if self.provider == "openai_compatible": - return self._chat_openai_compatible(system_prompt, user_prompt, user_id, image_urls or []) - self.last_error = f"unsupported_provider:{self.provider}" - return "" - - def _chat_openai_compatible( - self, - system_prompt: str, - user_prompt: str, - user_id: str, - image_urls: List[str], - ) -> str: - if not self.model: - return "" - - payload = { - "model": self.model, - "messages": self._build_messages(system_prompt, user_prompt, image_urls), - "temperature": self.temperature, - "max_tokens": self.max_tokens, - "user": user_id, - } - if self.stream: - payload["stream"] = True - headers = { - "Content-Type": "application/json", - } - if self.api_key: - headers["Authorization"] = f"Bearer {self.api_key}" - - for attempt in range(1, self.max_retries + 1): - try: - if self.stream: - text = self._chat_streaming(payload, headers) - else: - text = self._chat_non_streaming(payload, headers) - if text: - return text - except Exception as exc: - self.last_error = f"request_failed:attempt_{attempt}:{exc}" - if attempt < self.max_retries: - time.sleep(self.retry_delay_seconds * attempt) - return "" - - def _chat_non_streaming(self, payload: Dict, headers: Dict[str, str]) -> str: - response = requests.post( - f"{self.base_url}/{self.endpoint}", - json=payload, - headers=headers, - timeout=self.timeout_seconds, - ) - response.raise_for_status() - data = response.json() - text = self._extract_text(data) - if text: - return text - self.last_error = f"empty_model_output:{self.model}" - return "" - - def _chat_streaming(self, payload: Dict, headers: Dict[str, str]) -> str: - chunks: List[str] = [] - with requests.post( - f"{self.base_url}/{self.endpoint}", - json=payload, - headers=headers, - timeout=self.timeout_seconds, - stream=True, - ) as response: - response.raise_for_status() - buffer = b"" - for part in response.iter_content(chunk_size=None): - if not part: - continue - buffer += part - while b"\n\n" in buffer: - event, buffer = buffer.split(b"\n\n", 1) - try: - event_text = event.decode("utf-8") - except UnicodeDecodeError: - buffer = event + b"\n\n" + buffer - break - text_piece, done = self._parse_sse_event(event_text) - if text_piece: - chunks.append(text_piece) - if done: - final_text = "".join(chunks).strip() - if final_text: - return final_text - self.last_error = f"empty_stream_output:{self.model}" - return "" - final_text = "".join(chunks).strip() - if final_text: - return final_text - self.last_error = f"empty_stream_output:{self.model}" - return "" - - @staticmethod - def _build_messages(system_prompt: str, user_prompt: str, image_urls: List[str]) -> List[Dict]: - user_content: str | List[Dict[str, object]] - if image_urls: - content_parts: List[Dict[str, object]] = [{"type": "text", "text": user_prompt}] - for image_url in image_urls: - if image_url: - content_parts.append({"type": "image_url", "image_url": {"url": image_url}}) - user_content = content_parts - else: - user_content = user_prompt - return [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_content}, - ] - - @staticmethod - def _extract_text(data: Dict) -> str: - choices = data.get("choices") or [] - if choices: - message = choices[0].get("message", {}) or {} - content = message.get("content") - if isinstance(content, str) and content.strip(): - return content.strip() - if isinstance(content, list): - parts = [] - for item in content: - if isinstance(item, dict): - text = item.get("text") or item.get("content") - if isinstance(text, str) and text.strip(): - parts.append(text.strip()) - if parts: - return "\n".join(parts).strip() - for key in ("reasoning_content", "text", "output_text"): - value = message.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - for key in ("output_text", "text", "answer", "response"): - value = data.get(key) - if isinstance(value, str) and value.strip(): - return value.strip() - return "" - - @classmethod - def _parse_sse_event(cls, event_text: str) -> tuple[str, bool]: - lines = [line.strip() for line in event_text.splitlines() if line.strip()] - data_lines = [line[5:].strip() for line in lines if line.startswith("data:")] - if not data_lines: - return "", False - data = "\n".join(data_lines) - if data == "[DONE]": - return "", True - obj = json.loads(data) - choice = (obj.get("choices") or [{}])[0] - delta = choice.get("delta") or {} - content = delta.get("content") - if isinstance(content, str): - return content, False - if isinstance(content, list): - parts = [] - for item in content: - if isinstance(item, dict): - text = item.get("text") or item.get("content") - if isinstance(text, str): - parts.append(text) - return "".join(parts), False - return "", False diff --git a/plugins/dify/config.toml b/plugins/dify/config.toml index ab957e3..86f7fdb 100644 --- a/plugins/dify/config.toml +++ b/plugins/dify/config.toml @@ -1,8 +1,6 @@ [Dify] enable = true - -api-key = "app-u5EnYq3ill19bm6pWJwGkY4D" # Dify的API Key -base-url = "http://192.168.2.240/v1" #Dify API接口base url +backend = "dify_workflow_chat" commands = ["聊天"] command-tip = """ @@ -17,4 +15,4 @@ http-proxy = "" # 管理员和白名单用户是否免费使用 admin_ignore = true -whitelist_ignore = true \ No newline at end of file +whitelist_ignore = true diff --git a/plugins/dify/main.py b/plugins/dify/main.py index af9d0cf..f29d745 100644 --- a/plugins/dify/main.py +++ b/plugins/dify/main.py @@ -22,6 +22,7 @@ from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotMan from utils.decorator.points_decorator import plugin_points_cost from utils.media_downloader import MediaDownloader from utils.string_utils import remove_reasoning_content, remove_trailing_content, remove_grok_render_tags +from utils.ai.unified_llm import UnifiedLLMClient from wechat_ipad import WechatAPIClient from wechat_ipad.models.message import MessageType import aiohttp @@ -97,12 +98,25 @@ class DifyPlugin(MessagePluginInterface): self._commands = dify_config.get("commands", ["ai", "dify", "聊天", "AI"]) self.command_format = dify_config.get("command-tip", "聊天 请求内容") self.enable = dify_config.get("enable", True) - self.api_key = dify_config.get("api-key", "") - self.base_url = dify_config.get("base-url", "") self.price = dify_config.get("price", 0) self.admin_ignore = dify_config.get("admin_ignore", False) self.whitelist_ignore = dify_config.get("whitelist_ignore", False) self.http_proxy = dify_config.get("http-proxy", "") + llm_config = dify_config.get("llm", {}) or {} + if not llm_config: + llm_config = { + "backend": dify_config.get("backend", ""), + "provider": "dify", + "mode": "workflow", + "api-key": self.api_key, + "base-url": self.base_url, + "endpoint": "workflows/run", + "response_mode": "blocking", + "request_timeout": 40, + } + self.llm_client = UnifiedLLMClient(llm_config) + self.api_key = self.llm_client.api_key + self.base_url = self.llm_client.base_url self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True @@ -445,13 +459,6 @@ class DifyPlugin(MessagePluginInterface): if session_id not in self.conversations: self.conversations[session_id] = [] - # 准备请求头 - headers = { - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json", - "Accept": "text/event-stream" # 指定接受事件流 - } - # 准备历史记录 history_text = "" if self.conversations[session_id]: @@ -471,122 +478,72 @@ class DifyPlugin(MessagePluginInterface): # 如果有历史记录,添加到inputs_params中 if history_text: inputs_params["history"] = history_text + if self.conversations[session_id]: + inputs_params["conversation_history"] = self.conversations[session_id] if files is None: files = [] self.LOG.debug(f"Dify请求准备: files={len(files)}") - - # 准备请求数据 - data = { - "files": files, - "user": user_id, - "inputs": inputs_params, - "response_mode": "blocking" # 使用阻塞响应模式 - } - - # 如果有历史记录,同时添加到conversation_history中 - if self.conversations[session_id]: - data["conversation_history"] = self.conversations[session_id] - - # 设置代理 - proxy = self.http_proxy if self.http_proxy else None - - # 发送请求 - url = f"{self.base_url}/workflows/run" - - self.LOG.info(f"发送请求到Dify API: {url}") - self.LOG.info(f"请求数据: {json.dumps(data, ensure_ascii=False)}") - + self.LOG.info(f"Dify请求准备: session_id={session_id}, query_len={len(query)}, files={len(files)}") try: - async with aiohttp.ClientSession() as session: - response = await session.post(url, headers=headers, json=data, proxy=proxy, timeout=40) - if response.status != 200: - error_text = await response.text() - self.LOG.error(f"Dify API请求失败: {response.status} {error_text}") - return False, f"请求失败,状态码: {response.status}" + response = await asyncio.to_thread( + self.llm_client.generate, + query, + user_id, + inputs_params, + f"dify:{session_id}", + "", + "", + None, + files, + ) + if not response: + self.LOG.error(f"Dify API请求失败: {self.llm_client.last_error}") + return False, "请求失败" - # 解析响应 - response_data = await response.json() - self.LOG.info(f"收到Dify API响应: {json.dumps(response_data, ensure_ascii=False)}") + answer = response.get("text", "") or "" + total_tokens = int((response.get("usage", {}) or {}).get("total_tokens") or 0) + raw_data = response.get("raw", {}) or {} + outputs = ((raw_data.get("data") or {}).get("outputs") or {}) if isinstance(raw_data, dict) else {} - # 提取回答内容 - answer = "" - total_tokens = 0 + if outputs and "result" in outputs and "type" in outputs: + if outputs["type"] in {"image", "video"}: + downloader = MediaDownloader() + media_path = await downloader.download_media(outputs["result"]) + answer = media_path - # 获取输出内容 - outputs = response_data.get("data", {}).get("outputs", {}) - if outputs: - # 处理媒体类型返回 - if "result" in outputs and "type" in outputs: - if outputs["type"] == "image": - downloader = MediaDownloader() - image_url = outputs["result"] - image_path = await downloader.download_media(image_url) - answer = image_path - if outputs["type"] == "video": - downloader = MediaDownloader() - image_url = outputs["result"] - image_path = await downloader.download_media(image_url) - answer = image_path - # 处理文本类型返回 - elif "text" in outputs and isinstance(outputs["text"], str): - answer = outputs["text"] - # 兼容旧版处理逻辑 - else: - for key, value in outputs.items(): - if isinstance(value, str) and value.strip(): - answer += value - elif isinstance(value, dict): - # 处理嵌套字典的情况 - for sub_key, sub_value in value.items(): - if isinstance(sub_value, str) and sub_value.strip(): - answer += sub_value - elif isinstance(value, list): - # 处理列表的情况 - for item in value: - if isinstance(item, str) and item.strip(): - answer += item - elif isinstance(item, dict): - # 处理列表中的字典 - for item_key, item_value in item.items(): - if isinstance(item_value, str) and item_value.strip(): - answer += item_value + if answer and not os.path.isfile(answer): + answer = remove_reasoning_content(answer) + answer = remove_trailing_content(answer) + answer = remove_grok_render_tags(answer) + answer = re.sub(r'\n{3,}', '\n\n', answer).strip() - # 获取token使用情况 - total_tokens = response_data.get("data", {}).get("total_tokens", 0) + # 更新会话历史 + self.conversations[session_id].append({ + "role": "user", + "content": query + }) - if answer and not os.path.isfile(answer): - answer = remove_reasoning_content(answer) - answer = remove_trailing_content(answer) - answer = remove_grok_render_tags(answer) - answer = re.sub(r'\n{3,}', '\n\n', answer).strip() + self.conversations[session_id].append({ + "role": "assistant", + "content": answer + }) - # 更新会话历史 - self.conversations[session_id].append({ - "role": "user", - "content": query - }) + # 限制会话历史长度 + if len(self.conversations[session_id]) > self.max_history_length * 2: + self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] - self.conversations[session_id].append({ - "role": "assistant", - "content": answer - }) + # 统计token使用情况 + if total_tokens > 0: + if user_id in self.token_usage: + self.token_usage[user_id] += total_tokens + else: + self.token_usage[user_id] = total_tokens - # 限制会话历史长度 - if len(self.conversations[session_id]) > self.max_history_length * 2: - self.conversations[session_id] = self.conversations[session_id][-self.max_history_length * 2:] + self.LOG.info( + f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") - # 统计token使用情况 - if total_tokens > 0: - if user_id in self.token_usage: - self.token_usage[user_id] += total_tokens - else: - self.token_usage[user_id] = total_tokens - - self.LOG.info( - f"用户 {user_id} 本次消耗 {total_tokens} tokens,累计 {self.token_usage[user_id]} tokens") - - return True, answer + return True, answer except Exception as e: self.LOG.error(f"处理Dify响应时出错: {str(e)}") diff --git a/plugins/game_task/config.toml b/plugins/game_task/config.toml index b169340..44b6d58 100644 --- a/plugins/game_task/config.toml +++ b/plugins/game_task/config.toml @@ -1,5 +1,6 @@ [GameTask] enable = true +backend = "openai_compatible_game_task" command = ["/t", "/a", "/s", "/r", "/l", "/h"] command-format = """ 🎮 百科问答指令: @@ -10,8 +11,3 @@ command-format = """ /l - 查看活跃任务 /h - 查看未完成任务 """ - -# AI获取题目确认答案用的配置信息 -authorization = "Bearer b8586595-eb81-483d-8e91-a35cc789729e" # 请替换为真实的Authorization token -url = 'https://ark.cn-beijing.volces.com/api/v3/chat/completions' -model = "doubao-1-5-lite-32k-250115" \ No newline at end of file diff --git a/plugins/game_task/main.py b/plugins/game_task/main.py index 1cb1506..9cbca33 100644 --- a/plugins/game_task/main.py +++ b/plugins/game_task/main.py @@ -12,8 +12,8 @@ from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotMan from utils.decorator.points_decorator import points_reward_decorator from db.connection import DBConnectionManager from db.encyclopedia import EncyclopediaDB -import requests import json +from utils.ai.unified_llm import UnifiedLLMClient class GameTaskPlugin(MessagePluginInterface): @@ -81,11 +81,25 @@ class GameTaskPlugin(MessagePluginInterface): /l - 查看活跃任务 /h - 查看未完成任务 """) - self.authorization = self._config.get("GameTask", {}).get("authorization", "") - self.url = self._config.get("GameTask", {}).get("url", "") - self.model = self._config.get("GameTask", {}).get("model", "") + plugin_config = self._config.get("GameTask", {}) + self.authorization = plugin_config.get("authorization", "") + self.url = plugin_config.get("url", "") + self.model = plugin_config.get("model", "") + llm_config = plugin_config.get("llm", {}) or {} + if not llm_config: + llm_config = { + "backend": plugin_config.get("backend", ""), + "provider": "openai_compatible", + "authorization": self.authorization, + "url": self.url, + "model": self.model, + "stream": False, + "temperature": 0.2, + "max_tokens": 1000, + } + self.llm_client = UnifiedLLMClient(llm_config) - self.enable = self._config.get("GameTask", {}).get("enable", True) + self.enable = plugin_config.get("enable", True) # 初始化数据库连接 self.db_manager = DBConnectionManager.get_instance() @@ -584,40 +598,14 @@ class GameTaskPlugin(MessagePluginInterface): return None def message_task_json(self, prompt, content): - # 设置Authorization和URL - authorization = self.authorization # 请替换为真实的Authorization token - url = self.url - - data = { - # "stream": True, - "model": self.model, - "messages": [ - { - "role": "system", - "content": f"{prompt}" - }, - { - "role": "user", - "content": f"{content}" - } - - ] - } - - # 设置请求头 - headers = { - "Content-Type": "application/json; charset=utf-8", - "Authorization": authorization - } - - # 发送POST请求 - response = requests.post(url, headers=headers, data=json.dumps(data), ) - response.encoding = 'utf-8' - - # 输出响应内容 - print(response.status_code) - print(response.text) - return json.loads(self.extract_content(response.text)) + response = self.llm_client.generate( + system_prompt=prompt, + user_prompt=str(content), + user="game_task_bot", + ) + if not response or not response.get("text"): + raise RuntimeError(f"LLM 调用失败: {self.llm_client.last_error}") + return json.loads(response["text"]) def game_question_json(self, question): fields = [ diff --git a/plugins/global_news/config.toml b/plugins/global_news/config.toml index 2b4e67c..4d11e61 100644 --- a/plugins/global_news/config.toml +++ b/plugins/global_news/config.toml @@ -1,11 +1,8 @@ [GlobalNews] enable = true command = ["全球新闻", "国际新闻", "环球新闻", "政经新闻", "政治经济新闻"] +backend = "dify_chat_global_news" command-format = """ 🌍全球新闻指令: 全球新闻 - 获取最新的全球政治经济新闻 """ - - -authorization = "Bearer app-rhhKkbvHd2IAQoGX7xTzXZJj" # 请替换为真实的Authorization token -url = 'http://192.168.2.240/v1/chat-messages' \ No newline at end of file diff --git a/plugins/global_news/main.py b/plugins/global_news/main.py index ecc20dc..34a96b1 100644 --- a/plugins/global_news/main.py +++ b/plugins/global_news/main.py @@ -1,8 +1,6 @@ import asyncio -import json import threading import time # 添加这一行 -import aiohttp from typing import Dict, Any, List, Optional, Tuple from base.plugin_common.message_plugin_interface import MessagePluginInterface @@ -11,6 +9,7 @@ from utils.decorator.plugin_decorators import plugin_stats_decorator from utils.robot_cmd.robot_command import Feature, PermissionStatus, GroupBotManager from utils.decorator.points_decorator import plugin_points_cost from utils.markdown_to_image import convert_md_str_to_image +from utils.ai.unified_llm import UnifiedLLMClient from wechat_ipad import WechatAPIClient # 导入新闻抓取函数 @@ -75,9 +74,19 @@ class GlobalNewsPlugin(MessagePluginInterface): ["全球新闻", "国际新闻", "环球新闻", "政经新闻"]) self.command_format = self._config.get("GlobalNews", {}).get("command-format", "全球新闻 - 获取最新的全球政治经济新闻") - self.enable = self._config.get("GlobalNews", {}).get("enable", True) - self._key = self._config.get("GlobalNews", {}).get("authorization", "") - self._url = self._config.get("GlobalNews", {}).get("url", "") + plugin_config = self._config.get("GlobalNews", {}) + self.enable = plugin_config.get("enable", True) + llm_config = plugin_config.get("llm", {}) or {} + if not llm_config: + llm_config = { + "backend": plugin_config.get("backend", ""), + "provider": "dify", + "mode": "chat", + "authorization": plugin_config.get("authorization", ""), + "url": plugin_config.get("url", ""), + "response_mode": "blocking", + } + self.llm_client = UnifiedLLMClient(llm_config) self.LOG.debug(f"[{self.name}] 插件初始化完成,指令:{self._commands}") return True @@ -186,9 +195,7 @@ class GlobalNewsPlugin(MessagePluginInterface): news_titles = "\n".join(results) # 使用AI分析新闻 - markdown_news = await self._run_in_executor( - self.dify_news_title_analyze, news_titles - ) + markdown_news = await self._run_in_executor(self.analyze_news_titles, news_titles) # 转换为图片 image_path = await self._run_in_executor( @@ -205,61 +212,15 @@ class GlobalNewsPlugin(MessagePluginInterface): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, func, *args) - async def dify_news_title_analyze(self, content: str) -> str: - """异步分析新闻标题 - Args: - content: 新闻标题内容 - Returns: - str: 分析后的内容 - """ - # 设置Authorization和URL - data = { - "response_mode": "blocking", - "conversation_id": "", - "inputs": {}, - "query": content, - "user": "a-bot-global_news" - } - - # 设置请求头 - headers = { - "Content-Type": "application/json; charset=utf-8", - "Authorization": self._key - } - - try: - async with aiohttp.ClientSession() as session: - async with session.post(self._url, headers=headers, json=data) as response: - if response.status != 200: - self.LOG.error(f"新闻分析请求失败: {response.status}") - return None - - response_data = await response.json() - self.LOG.debug(f"新闻分析响应: {response_data}") - return self.extract_content(response_data) - - except Exception as e: - self.LOG.error(f"新闻分析请求出错: {e}") - return None - - def extract_content(self, data): - """解析API响应内容 - Args: - data: API返回的响应数据,可以是字典或字符串 - Returns: - str: 提取的answer内容 - """ - try: - # 如果是字符串,尝试解析为字典 - if isinstance(data, str): - data = json.loads(data) - # 如果是字典,直接获取answer - if isinstance(data, dict): - answer = data.get('answer', '') - if answer: - return answer - - return None - except Exception as e: - self.LOG.error(f"解析响应失败: {str(e)}") + def analyze_news_titles(self, content: str) -> Optional[str]: + """同步分析新闻标题,便于在线程池中复用。""" + response = self.llm_client.run( + prompt=content, + user="a-bot-global_news", + inputs={"query": content}, + tag="global_news", + ) + if not response: + self.LOG.error(f"新闻分析请求失败: {self.llm_client.last_error}") return None + return response.get("text") or None diff --git a/plugins/member_context/config.toml b/plugins/member_context/config.toml index da7b128..de9bb0b 100644 --- a/plugins/member_context/config.toml +++ b/plugins/member_context/config.toml @@ -3,12 +3,7 @@ enable = true [api] enable = true -base_url = "http://192.168.2.240/v1" -api_key = "app-b2cj03DipGCIAmgBfcx7SKsT" -mode = "workflow" -endpoint = "workflows/run" -workflow_output_key = "text" -response_mode = "streaming" +backend = "dify_workflow_member_context" request_timeout = 240 [profile] diff --git a/plugins/member_context/dify_client.py b/plugins/member_context/dify_client.py index 5c183c2..c1c138e 100644 --- a/plugins/member_context/dify_client.py +++ b/plugins/member_context/dify_client.py @@ -1,187 +1,6 @@ -# -*- coding: utf-8 -*- -import json -from typing import Dict, Optional - -import requests -from loguru import logger +from utils.ai.unified_llm import UnifiedLLMClient -class DifyClient: - """Dify completion/workflow 通用调用客户端""" +class DifyClient(UnifiedLLMClient): + """兼容旧 DifyClient 接口的统一客户端封装。""" - def __init__(self, api_config: Optional[Dict] = None): - api_config = api_config or {} - self.LOG = logger - self.enabled = bool(api_config.get("enable", api_config.get("enabled", False))) - self.base_url = (api_config.get("base_url") or "").rstrip("/") - self.api_key = api_config.get("api_key", "") - self.timeout = int(api_config.get("request_timeout", 60)) - self.mode = str(api_config.get("mode", "completion")).strip().lower() - default_endpoint = "workflows/run" if self.mode == "workflow" else "completion-messages" - self.endpoint = str(api_config.get("endpoint", default_endpoint)).lstrip("/") - self.workflow_output_key = str(api_config.get("workflow_output_key", "text")).strip() - self.response_mode = str(api_config.get("response_mode", "blocking")).strip().lower() - - def is_available(self) -> bool: - return self.enabled and bool(self.base_url and self.api_key) - - def run(self, prompt: str, user: str, inputs: Optional[Dict] = None, - tag: str = "") -> Optional[Dict]: - if not self.is_available(): - return None - - headers = { - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json", - } - payload_inputs = dict(inputs or {}) - if self.mode == "completion": - payload_inputs.setdefault("query", prompt) - elif prompt and "query" not in payload_inputs: - payload_inputs["query"] = prompt - - payload = { - "inputs": payload_inputs, - "response_mode": self.response_mode, - "user": user, - } - url = f"{self.base_url}/{self.endpoint}" - try: - self.LOG.info( - f"[成员交互摘要][Dify] 发起请求: mode={self.mode}, response_mode={self.response_mode}, " - f"endpoint={self.endpoint}, tag={tag}" - ) - if self.response_mode == "streaming": - parsed = self._run_streaming(url, headers, payload, tag) - else: - response = requests.post(url, headers=headers, json=payload, timeout=self.timeout) - response.raise_for_status() - data = response.json() - parsed = self._parse_response(data) - if parsed is not None: - return parsed - self.LOG.warning(f"[成员交互摘要][Dify] 响应内容为空: mode={self.mode}, tag={tag}") - return None - except Exception as e: - self.LOG.warning(f"[成员交互摘要][Dify] 请求失败: mode={self.mode}, tag={tag}, error={e}") - return None - - def _run_streaming(self, url: str, headers: Dict, payload: Dict, tag: str) -> Optional[Dict]: - with requests.post(url, headers=headers, json=payload, timeout=self.timeout, stream=True) as response: - response.raise_for_status() - event_name = "" - text_fragments = [] - final_payload = None - - for raw_line in response.iter_lines(decode_unicode=True): - if raw_line is None: - continue - line = str(raw_line).strip() - if not line: - continue - if line.startswith("event:"): - event_name = line[6:].strip() - continue - if not line.startswith("data:"): - continue - - data_text = line[5:].strip() - if not data_text or data_text == "[DONE]": - continue - try: - chunk = json.loads(data_text) - except Exception: - continue - - candidate_text = self._extract_stream_text(chunk) - if candidate_text: - text_fragments.append(candidate_text) - - chunk_event = str(chunk.get("event") or event_name or "").strip() - if chunk_event in {"workflow_finished", "message_end"}: - final_payload = chunk - - if final_payload: - parsed = self._parse_response(final_payload) - if parsed and parsed.get("text"): - return parsed - - text = "".join(fragment for fragment in text_fragments if fragment) - if text: - return { - "text": text.strip(), - "usage": {}, - "raw": final_payload or {}, - } - - self.LOG.warning(f"[成员交互摘要][Dify] 流式响应未产出有效内容: tag={tag}") - return None - - def _parse_response(self, data: Dict) -> Optional[Dict]: - if self.mode == "workflow": - return self._parse_workflow_response(data) - answer = data.get("answer", "") - usage = (data.get("metadata") or {}).get("usage", {}) or {} - return { - "text": str(answer or "").strip(), - "usage": usage, - "raw": data, - } - - def _parse_workflow_response(self, data: Dict) -> Optional[Dict]: - payload = (data or {}).get("data", {}) or {} - outputs = payload.get("outputs", {}) or {} - text = "" - - if self.workflow_output_key and outputs.get(self.workflow_output_key) is not None: - value = outputs.get(self.workflow_output_key) - text = self._stringify_output(value) - elif outputs.get("text") is not None: - text = self._stringify_output(outputs.get("text")) - elif outputs.get("answer") is not None: - text = self._stringify_output(outputs.get("answer")) - elif outputs.get("result_json") is not None: - text = self._stringify_output(outputs.get("result_json")) - elif outputs.get("result") is not None: - text = self._stringify_output(outputs.get("result")) - else: - for value in outputs.values(): - text = self._stringify_output(value) - if text: - break - - usage = { - "total_tokens": payload.get("total_tokens"), - "latency": payload.get("elapsed_time"), - } - return { - "text": str(text or "").strip(), - "usage": usage, - "raw": data, - } - - def _extract_stream_text(self, chunk: Dict) -> str: - if not isinstance(chunk, dict): - return "" - payload = (chunk.get("data") or {}) if isinstance(chunk.get("data"), dict) else {} - outputs = payload.get("outputs", {}) if isinstance(payload.get("outputs"), dict) else {} - - for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]): - if outputs.get(key) is not None: - return self._stringify_output(outputs.get(key)) - - for key in ("text", "answer"): - if chunk.get(key) is not None: - return self._stringify_output(chunk.get(key)) - - return "" - - @staticmethod - def _stringify_output(value) -> str: - if value is None: - return "" - if isinstance(value, str): - return value.strip() - if isinstance(value, (dict, list)): - return json.dumps(value, ensure_ascii=False) - return str(value).strip() diff --git a/plugins/member_context/service.py b/plugins/member_context/service.py index 5a35531..181bf59 100644 --- a/plugins/member_context/service.py +++ b/plugins/member_context/service.py @@ -513,7 +513,7 @@ class MemberContextService: usage = response.get("usage", {}) or {} parsed_meta = parsed.get("meta", {}) or {} parsed_meta.update({ - "ai_provider": "dify", + "ai_provider": self.dify_client.provider, "ai_mode": self.dify_client.mode, "ai_tokens": usage.get("total_tokens"), "ai_latency": usage.get("latency"), diff --git a/plugins/message_summary/config.toml b/plugins/message_summary/config.toml index 92df869..e04cda3 100644 --- a/plugins/message_summary/config.toml +++ b/plugins/message_summary/config.toml @@ -4,14 +4,8 @@ enabled = true [api] -api_key = "app-shCA6bo5l2VDmnvhg2BtuJbk" -api_base_url = "http://192.168.2.240/v1" -mode = "workflow" -endpoint = "workflows/run" -workflow_output_key = "text" -response_mode = "streaming" +backend = "dify_workflow_message_summary" connect_timeout_seconds = 10 -request_timeout_seconds = 180 retry_delays_seconds = [10, 20] [output] diff --git a/plugins/message_summary/main.py b/plugins/message_summary/main.py index be38392..011b616 100644 --- a/plugins/message_summary/main.py +++ b/plugins/message_summary/main.py @@ -6,8 +6,6 @@ from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Tuple, Optional, List -import aiohttp -from aiohttp import ClientTimeout from loguru import logger from base.plugin_common.message_plugin_interface import MessagePluginInterface @@ -22,6 +20,7 @@ from utils.markdown_to_image import convert_md_str_to_image from utils.revoke.message_auto_revoke import MessageAutoRevoke from utils.robot_cmd.robot_command import GroupBotManager, PermissionStatus from utils.string_utils import remove_reasoning_content, remove_trailing_content +from utils.ai.unified_llm import UnifiedLLMClient from utils.wechat.contact_manager import ContactManager from utils.wechat.message_to_db import MessageStorage from wechat_ipad import WechatAPIClient @@ -93,6 +92,10 @@ class MessageSummaryPlugin(MessagePluginInterface): self._connect_timeout_seconds = int(api_config.get("connect_timeout_seconds", 10)) self._request_timeout_seconds = int(api_config.get("request_timeout_seconds", 180)) self._retry_delays_seconds = api_config.get("retry_delays_seconds", [10, 20]) + self.llm_client = UnifiedLLMClient(api_config) + self._api_mode = self.llm_client.mode or self._api_mode + self._response_mode = self.llm_client.response_mode or self._response_mode + self._workflow_output_key = self.llm_client.workflow_output_key or self._workflow_output_key self.message_storage = MessageStorage() db_manager = context.get("db_manager") if db_manager: @@ -221,81 +224,6 @@ class MessageSummaryPlugin(MessagePluginInterface): sanitized_name = "群聊" return sanitized_name - async def _parse_streaming_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]: - """解析 Dify 的 SSE 流式响应""" - answer_parts: List[str] = [] - metadata: Dict[str, Any] = {} - final_payload: Dict[str, Any] = {} - buffer = "" - - async for chunk in response.content.iter_any(): - if not chunk: - continue - - buffer += chunk.decode("utf-8", errors="ignore") - - while "\n\n" in buffer: - raw_event, buffer = buffer.split("\n\n", 1) - raw_event = raw_event.strip() - if not raw_event: - continue - - data_lines = [] - for line in raw_event.splitlines(): - line = line.strip() - if line.startswith("data:"): - data_lines.append(line[5:].strip()) - - if not data_lines: - continue - - payload_text = "\n".join(data_lines).strip() - if not payload_text or payload_text == "[DONE]": - continue - - try: - payload = json.loads(payload_text) - except json.JSONDecodeError: - self.LOG.warning(f"无法解析流式响应片段: {payload_text[:200]}") - continue - - event_name = str(payload.get("event", "")).strip() - if event_name in {"message", "agent_message"}: - chunk_text = payload.get("answer", "") - if chunk_text: - answer_parts.append(chunk_text) - elif event_name in {"message_end", "workflow_finished"}: - final_payload = payload - if self._api_mode == "workflow": - payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {} - outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {} - if outputs: - for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]): - if outputs.get(key) is not None: - answer_parts = [self._stringify_output(outputs.get(key))] - break - metadata = payload.get("metadata", {}) or payload.get("data", {}).get("metadata", {}) or metadata - elif event_name == "error": - raise RuntimeError(payload.get("message") or payload.get("error") or "流式总结生成失败") - else: - if self._api_mode == "workflow": - payload_data = payload.get("data", {}) if isinstance(payload.get("data"), dict) else {} - outputs = payload_data.get("outputs", {}) if isinstance(payload_data.get("outputs"), dict) else {} - for key in filter(None, [self._workflow_output_key, "text", "answer", "result_json", "result"]): - if outputs.get(key) is not None: - chunk_text = self._stringify_output(outputs.get(key)) - if chunk_text: - answer_parts.append(chunk_text) - break - - answer = "".join(answer_parts) - return { - "answer": answer, - "metadata": metadata, - "data": final_payload.get("data", {}) if isinstance(final_payload, dict) else {}, - "event": final_payload.get("event", "") if isinstance(final_payload, dict) else "", - } - def _append_usage_info(self, answer: str, metadata: Dict[str, Any]) -> str: """把 token 统计追加到总结文本末尾""" if not answer or not answer.strip(): @@ -440,7 +368,6 @@ class MessageSummaryPlugin(MessagePluginInterface): async def _generate_summary(self, chat_content: str, group_name: str) -> Tuple[str, Optional[str]]: """生成总结""" - # Dify API配置 content_compress = chat_content try: content_compress = compress_chat_data(chat_content) @@ -449,96 +376,50 @@ class MessageSummaryPlugin(MessagePluginInterface): self.LOG.error(f"压缩内容失败:{e}") prompt = f"请根据[{group_name}]群的群聊记录生成一份总结:\n\n{content_compress}" - if self._api_mode == "workflow": - data = { - "inputs": { - "query": prompt, - "group_name": group_name, - "chat_content": content_compress, - }, - "response_mode": self._response_mode, - "user": group_name if group_name is not None else "message_summary_bot", - } - else: - data = { - "inputs": {}, - "query": prompt, - "response_mode": self._response_mode, - "conversation_id": "", - "user": group_name if group_name is not None else "message_summary_bot", - "files": [] - } - - self.LOG.info(f"群聊总结内容:{data}") - # 设置请求头 - headers = { - "Authorization": f"Bearer {self._api_key}", - "Content-Type": "application/json", - "Accept": "text/event-stream" if self._response_mode == "streaming" else "application/json" + inputs = { + "query": prompt, + "group_name": group_name, + "chat_content": content_compress, } + self.LOG.info(f"群聊总结请求准备: group={group_name}, mode={self._api_mode}, response_mode={self._response_mode}") max_retries = len(self._retry_delays_seconds) + 1 for attempt in range(1, max_retries + 1): try: - custom_timeout = ClientTimeout( - total=None, - connect=self._connect_timeout_seconds, - sock_read=self._request_timeout_seconds + response = await asyncio.to_thread( + self.llm_client.run, + prompt, + group_name if group_name is not None else "message_summary_bot", + inputs, + f"message_summary:{group_name}", ) - conn = aiohttp.TCPConnector(keepalive_timeout=60) # 保持连接活跃 - async with aiohttp.ClientSession(connector=conn, timeout=custom_timeout) as session: - async with session.post(self._api_url, headers=headers, json=data) as response: - response.raise_for_status() # 检查请求是否成功 - if self._response_mode == "streaming": - response_data = await self._parse_streaming_response(response) - else: - response_data = await response.json() + if not response or not response.get("text"): + raise RuntimeError(self.llm_client.last_error or "LLM 未返回有效总结内容") - self.LOG.info(f"Dify API响应状态码: {response.status}, attempt={attempt}") - self.LOG.debug(f"响应数据: {json.dumps(response_data, ensure_ascii=False, indent=2)}") + answer = self._clean_summary_output(response.get("text", "")) + metadata = {"usage": response.get("usage", {}) or {}} + spath = "" + answer = self._append_usage_info(answer, metadata) - if self._api_mode == "workflow": - answer, metadata = self._parse_workflow_response(response_data) - else: - answer = response_data.get("answer", "") - metadata = response_data.get("metadata", {}) + if answer and len(answer.strip()) > 0: + try: + timestamp = int(time.time()) + output_path = f"summary_{timestamp}.png" + self.LOG.info(f"开始生成图片: {output_path}") + spath = await convert_md_str_to_image(answer, output_path) + self.LOG.info(f"成功生成图片: {spath}") + except Exception as e: + self.LOG.error(f"生成图片失败: {e}", exc_info=True) + max_length = 2000 + if len(answer) > max_length: + answer = answer[:max_length] + "\n\n... (内容过长,已截断)" + self.LOG.info("图片生成失败,将发送文本消息作为备选方案") + spath = None + else: + spath = None + return answer, spath - if not answer or not answer.strip(): - raise RuntimeError("Dify 未返回有效总结内容") - - answer = self._clean_summary_output(answer) - spath = "" - answer = self._append_usage_info(answer, metadata) - - if answer and len(answer.strip()) > 0: - try: - # 使用唯一文件名并指定完整路径 - timestamp = int(time.time()) - output_path = f"summary_{timestamp}.png" - self.LOG.info(f"开始生成图片: {output_path}") - spath = await convert_md_str_to_image(answer, output_path) - self.LOG.info(f"成功生成图片: {spath}") - except Exception as e: - self.LOG.error(f"生成图片失败: {e}", exc_info=True) - try: - max_length = 2000 - if len(answer) > max_length: - answer = answer[:max_length] + "\n\n... (内容过长,已截断)" - self.LOG.info("图片生成失败,将发送文本消息作为备选方案") - spath = None - except Exception as fallback_error: - self.LOG.error(f"备选文本发送也失败: {fallback_error}") - spath = None - else: - spath = None - # 返回文本内容和图片路径 - return answer, spath - - except aiohttp.ClientError as e: - self.LOG.error(f"请求Dify API时出错: attempt={attempt}/{max_retries}, error={e}") - except json.JSONDecodeError as e: - self.LOG.error(f"解析Dify API响应时出错: attempt={attempt}/{max_retries}, error={e}") except Exception as e: self.LOG.error(f"处理总结时出现未知错误: attempt={attempt}/{max_retries}, error={e}") diff --git a/utils/ai/llm_registry.py b/utils/ai/llm_registry.py new file mode 100644 index 0000000..353fc69 --- /dev/null +++ b/utils/ai/llm_registry.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, Optional + +import yaml + + +class LLMRegistry: + """从项目根 config.yaml 读取集中式 LLM 后端配置。""" + + _cache: Dict[str, Any] = {"mtime": None, "data": {}} + + @classmethod + def get_root_config_path(cls) -> Path: + return Path(__file__).resolve().parents[2] / "config.yaml" + + @classmethod + def load_root_config(cls) -> Dict[str, Any]: + path = cls.get_root_config_path() + if not path.exists(): + return {} + + stat = path.stat() + if cls._cache["mtime"] == stat.st_mtime and cls._cache["data"]: + return cls._cache["data"] + + with open(path, "r", encoding="utf-8") as fp: + data = yaml.safe_load(fp) or {} + cls._cache = {"mtime": stat.st_mtime, "data": data} + return data + + @classmethod + def get_llm_config(cls) -> Dict[str, Any]: + config = cls.load_root_config() + llm_config = config.get("llm", {}) or {} + return llm_config if isinstance(llm_config, dict) else {} + + @classmethod + def get_backend(cls, backend_name: str) -> Dict[str, Any]: + if not backend_name: + return {} + llm_config = cls.get_llm_config() + backends = llm_config.get("backends", {}) or {} + backend = backends.get(backend_name, {}) or {} + return dict(backend) if isinstance(backend, dict) else {} + + @classmethod + def resolve(cls, local_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + local = dict(local_config or {}) + backend_name = ( + local.get("backend") + or local.get("backend_name") + or local.get("backend_ref") + or "" + ) + if not backend_name: + return local + + merged = cls.get_backend(str(backend_name).strip()) + merged.update(local) + merged["backend"] = backend_name + return merged + diff --git a/utils/ai/unified_llm.py b/utils/ai/unified_llm.py new file mode 100644 index 0000000..902b0a9 --- /dev/null +++ b/utils/ai/unified_llm.py @@ -0,0 +1,540 @@ +from __future__ import annotations + +import json +import time +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse + +import requests +from loguru import logger + +from utils.ai.llm_registry import LLMRegistry + + +class UnifiedLLMClient: + """统一的 LLM 调用客户端,兼容 OpenAI-compatible 与 Dify。""" + + def __init__(self, config: Optional[Dict[str, Any]] = None): + self.LOG = logger + self.raw_config = config or {} + self.config = self._normalize_config(self.raw_config) + self.enabled = bool(self.config.get("enabled", True)) + self.provider = str(self.config.get("provider", "openai_compatible")).strip().lower() + self.base_url = str(self.config.get("base_url", "")).rstrip("/") + self.endpoint = str(self.config.get("endpoint", "")).lstrip("/") + self.api_key = str(self.config.get("api_key", "")).strip() + self.model = str(self.config.get("model", "")).strip() + self.timeout_seconds = int(self.config.get("timeout_seconds", 60)) + self.timeout = self.timeout_seconds + self.temperature = float(self.config.get("temperature", 0.7)) + self.max_tokens = int(self.config.get("max_tokens", 1024)) + self.stream = bool(self.config.get("stream", False)) + self.max_retries = max(int(self.config.get("max_retries", 3) or 3), 1) + self.retry_delay_seconds = float(self.config.get("retry_delay_seconds", 1.0) or 1.0) + self.mode = str(self.config.get("mode", "chat")).strip().lower() + self.response_mode = str(self.config.get("response_mode", "blocking")).strip().lower() + self.workflow_output_key = str(self.config.get("workflow_output_key", "text")).strip() + self.default_system_prompt = str(self.config.get("system_prompt", "")).strip() + self.last_error = "" + + def is_available(self) -> bool: + if not self.enabled: + return False + if self.provider == "openai_compatible": + return bool(self.base_url and self.endpoint and self.model) + if self.provider == "dify": + return bool(self.base_url and self.endpoint and self.api_key) + return False + + def chat( + self, + system_prompt: str, + user_prompt: str, + user_id: str, + image_urls: Optional[List[str]] = None, + ) -> str: + result = self.generate( + system_prompt=system_prompt, + user_prompt=user_prompt, + user=user_id, + image_urls=image_urls or [], + ) + return (result or {}).get("text", "") or "" + + def run( + self, + prompt: str, + user: str, + inputs: Optional[Dict[str, Any]] = None, + tag: str = "", + ) -> Optional[Dict[str, Any]]: + if self.provider == "dify": + return self.generate(prompt=prompt, user=user, inputs=inputs or {}, tag=tag) + + effective_prompt = prompt or self._stringify_inputs(inputs or {}) + return self.generate( + system_prompt=self.default_system_prompt, + user_prompt=effective_prompt, + user=user, + inputs=inputs or {}, + tag=tag, + ) + + def generate( + self, + prompt: str = "", + user: str = "", + inputs: Optional[Dict[str, Any]] = None, + tag: str = "", + system_prompt: str = "", + user_prompt: str = "", + image_urls: Optional[List[str]] = None, + files: Optional[List[Dict[str, Any]]] = None, + ) -> Optional[Dict[str, Any]]: + self.last_error = "" + if not self.is_available(): + self.last_error = "client_unavailable" + return None + + if self.provider == "dify": + return self._generate_dify( + prompt=prompt, + user=user, + inputs=inputs or {}, + tag=tag, + files=files or [], + ) + if self.provider == "openai_compatible": + return self._generate_openai( + system_prompt=system_prompt, + user_prompt=user_prompt or prompt, + user=user, + image_urls=image_urls or [], + ) + + self.last_error = f"unsupported_provider:{self.provider}" + return None + + def _generate_openai( + self, + system_prompt: str, + user_prompt: str, + user: str, + image_urls: List[str], + ) -> Optional[Dict[str, Any]]: + payload = { + "model": self.model, + "messages": self._build_messages(system_prompt or self.default_system_prompt, user_prompt, image_urls), + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "user": user, + "stream": self.stream, + } + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["Authorization"] = self._build_auth_header(self.api_key) + + url = f"{self.base_url}/{self.endpoint}" + for attempt in range(1, self.max_retries + 1): + try: + if self.stream: + text, raw = self._request_openai_stream(url, payload, headers) + else: + text, raw = self._request_openai_json(url, payload, headers) + if text: + return { + "text": text, + "usage": self._extract_openai_usage(raw), + "raw": raw, + } + self.last_error = f"empty_model_output:{self.model}" + except Exception as exc: + self.last_error = f"request_failed:attempt_{attempt}:{exc}" + if attempt < self.max_retries: + time.sleep(self.retry_delay_seconds * attempt) + return None + + def _generate_dify( + self, + prompt: str, + user: str, + inputs: Dict[str, Any], + tag: str, + files: List[Dict[str, Any]], + ) -> Optional[Dict[str, Any]]: + headers = { + "Authorization": self._build_auth_header(self.api_key), + "Content-Type": "application/json", + } + payload_inputs = dict(inputs or {}) + if self.mode == "workflow": + if prompt and "query" not in payload_inputs: + payload_inputs["query"] = prompt + payload = { + "inputs": payload_inputs, + "response_mode": self.response_mode, + "user": user, + "files": files, + } + elif self.mode == "completion": + payload = { + "inputs": payload_inputs, + "query": prompt, + "response_mode": self.response_mode, + "user": user, + "files": files, + } + else: + payload = { + "inputs": payload_inputs, + "query": prompt, + "response_mode": self.response_mode, + "conversation_id": "", + "user": user, + "files": files, + } + + url = f"{self.base_url}/{self.endpoint}" + for attempt in range(1, self.max_retries + 1): + try: + if self.response_mode == "streaming": + parsed = self._request_dify_stream(url, payload, headers, tag) + else: + response = requests.post(url, headers=headers, json=payload, timeout=self.timeout_seconds) + response.raise_for_status() + parsed = self._parse_dify_response(response.json()) + if parsed and parsed.get("text"): + return parsed + self.last_error = f"empty_model_output:{self.mode}" + except Exception as exc: + self.last_error = f"request_failed:attempt_{attempt}:{exc}" + self.LOG.warning(f"[UnifiedLLMClient] Dify 请求失败: tag={tag}, attempt={attempt}, error={exc}") + if attempt < self.max_retries: + time.sleep(self.retry_delay_seconds * attempt) + return None + + def _request_openai_json(self, url: str, payload: Dict[str, Any], headers: Dict[str, str]) -> Tuple[str, Dict[str, Any]]: + response = requests.post(url, json=payload, headers=headers, timeout=self.timeout_seconds) + response.raise_for_status() + data = response.json() + return self._extract_openai_text(data), data + + def _request_openai_stream( + self, + url: str, + payload: Dict[str, Any], + headers: Dict[str, str], + ) -> Tuple[str, Dict[str, Any]]: + chunks: List[str] = [] + with requests.post(url, json=payload, headers=headers, timeout=self.timeout_seconds, stream=True) as response: + response.raise_for_status() + buffer = b"" + for part in response.iter_content(chunk_size=None): + if not part: + continue + buffer += part + while b"\n\n" in buffer: + event, buffer = buffer.split(b"\n\n", 1) + try: + text_piece, done = self._parse_openai_sse_event(event.decode("utf-8")) + except UnicodeDecodeError: + buffer = event + b"\n\n" + buffer + break + if text_piece: + chunks.append(text_piece) + if done: + break + return "".join(chunks).strip(), {"stream_text": "".join(chunks).strip()} + + def _request_dify_stream( + self, + url: str, + payload: Dict[str, Any], + headers: Dict[str, str], + tag: str, + ) -> Optional[Dict[str, Any]]: + with requests.post(url, headers=headers, json=payload, timeout=self.timeout_seconds, stream=True) as response: + response.raise_for_status() + event_name = "" + text_fragments: List[str] = [] + final_payload = None + + for raw_line in response.iter_lines(decode_unicode=True): + if raw_line is None: + continue + line = str(raw_line).strip() + if not line: + continue + if line.startswith("event:"): + event_name = line[6:].strip() + continue + if not line.startswith("data:"): + continue + + data_text = line[5:].strip() + if not data_text or data_text == "[DONE]": + continue + try: + chunk = json.loads(data_text) + except Exception: + continue + + candidate_text = self._extract_dify_stream_text(chunk) + if candidate_text: + text_fragments.append(candidate_text) + + chunk_event = str(chunk.get("event") or event_name or "").strip() + if chunk_event in {"workflow_finished", "message_end"}: + final_payload = chunk + + if final_payload: + parsed = self._parse_dify_response(final_payload) + if parsed and parsed.get("text"): + return parsed + + text = "".join(fragment for fragment in text_fragments if fragment).strip() + if text: + return {"text": text, "usage": {}, "raw": final_payload or {}} + + self.LOG.warning(f"[UnifiedLLMClient] Dify 流式响应未产出有效内容: tag={tag}") + return None + + @staticmethod + def _build_messages(system_prompt: str, user_prompt: str, image_urls: List[str]) -> List[Dict[str, Any]]: + user_content: str | List[Dict[str, Any]] + if image_urls: + content_parts: List[Dict[str, Any]] = [{"type": "text", "text": user_prompt}] + for image_url in image_urls: + if image_url: + content_parts.append({"type": "image_url", "image_url": {"url": image_url}}) + user_content = content_parts + else: + user_content = user_prompt + messages: List[Dict[str, Any]] = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": user_content}) + return messages + + @staticmethod + def _extract_openai_text(data: Dict[str, Any]) -> str: + choices = data.get("choices") or [] + if choices: + message = choices[0].get("message", {}) or {} + content = message.get("content") + if isinstance(content, str) and content.strip(): + return content.strip() + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") or item.get("content") + if isinstance(text, str) and text.strip(): + parts.append(text.strip()) + if parts: + return "\n".join(parts).strip() + for key in ("reasoning_content", "text", "output_text"): + value = message.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + for key in ("output_text", "text", "answer", "response"): + value = data.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "" + + @classmethod + def _parse_openai_sse_event(cls, event_text: str) -> Tuple[str, bool]: + lines = [line.strip() for line in event_text.splitlines() if line.strip()] + data_lines = [line[5:].strip() for line in lines if line.startswith("data:")] + if not data_lines: + return "", False + data = "\n".join(data_lines) + if data == "[DONE]": + return "", True + obj = json.loads(data) + choice = (obj.get("choices") or [{}])[0] + delta = choice.get("delta") or {} + content = delta.get("content") + if isinstance(content, str): + return content, False + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") or item.get("content") + if isinstance(text, str): + parts.append(text) + return "".join(parts), False + return "", False + + def _parse_dify_response(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + if self.mode == "workflow": + return self._parse_dify_workflow_response(data) + answer = str(data.get("answer", "") or "").strip() + usage = (data.get("metadata") or {}).get("usage", {}) or {} + return {"text": answer, "usage": usage, "raw": data} + + def _parse_dify_workflow_response(self, data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + payload = (data or {}).get("data", {}) or {} + outputs = payload.get("outputs", {}) or {} + text = "" + + for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + text = self._stringify_output(outputs.get(key)) + if text: + break + + if not text: + for value in outputs.values(): + text = self._stringify_output(value) + if text: + break + + usage = { + "total_tokens": payload.get("total_tokens"), + "latency": payload.get("elapsed_time"), + } + return {"text": text.strip(), "usage": usage, "raw": data} + + def _extract_dify_stream_text(self, chunk: Dict[str, Any]) -> str: + if not isinstance(chunk, dict): + return "" + payload = (chunk.get("data") or {}) if isinstance(chunk.get("data"), dict) else {} + outputs = payload.get("outputs", {}) if isinstance(payload.get("outputs"), dict) else {} + + for key in filter(None, [self.workflow_output_key, "text", "answer", "result_json", "result"]): + if outputs.get(key) is not None: + return self._stringify_output(outputs.get(key)) + + for key in ("text", "answer"): + if chunk.get(key) is not None: + return self._stringify_output(chunk.get(key)) + + return "" + + @staticmethod + def _extract_openai_usage(data: Dict[str, Any]) -> Dict[str, Any]: + usage = data.get("usage", {}) or {} + if usage: + return usage + return {} + + @staticmethod + def _stringify_output(value: Any) -> str: + if value is None: + return "" + if isinstance(value, str): + return value.strip() + if isinstance(value, (dict, list)): + return json.dumps(value, ensure_ascii=False) + return str(value).strip() + + @classmethod + def _normalize_config(cls, config: Dict[str, Any]) -> Dict[str, Any]: + normalized = LLMRegistry.resolve(config or {}) + normalized["enabled"] = bool( + normalized.get("enabled", normalized.get("enable", True)) + ) + + if not normalized.get("provider"): + normalized["provider"] = cls._guess_provider(normalized) + + parsed_url = cls._split_url( + normalized.get("api_url") + or normalized.get("url") + ) + base_url = ( + normalized.get("base_url") + or normalized.get("api_base_url") + or parsed_url[0] + or "" + ) + endpoint = ( + normalized.get("endpoint") + or parsed_url[1] + or "" + ) + + normalized["base_url"] = str(base_url).rstrip("/") + normalized["endpoint"] = str(endpoint).lstrip("/") + normalized["api_key"] = ( + normalized.get("api_key") + or normalized.get("api-key") + or normalized.get("authorization") + or "" + ) + normalized["timeout_seconds"] = int( + normalized.get("timeout_seconds") + or normalized.get("request_timeout_seconds") + or normalized.get("request_timeout") + or 60 + ) + normalized["max_retries"] = int(normalized.get("max_retries", len(normalized.get("retry_delays_seconds", [])) + 1 or 3)) + normalized["retry_delay_seconds"] = float(normalized.get("retry_delay_seconds", 1.0)) + normalized["response_mode"] = normalized.get("response_mode", "blocking") + normalized["workflow_output_key"] = normalized.get("workflow_output_key", "text") + + if normalized["provider"] == "dify": + default_endpoint = cls._guess_dify_endpoint(normalized) + if not normalized["endpoint"]: + normalized["endpoint"] = default_endpoint + else: + if not normalized["endpoint"]: + normalized["endpoint"] = "chat/completions" + + return normalized + + @staticmethod + def _guess_provider(config: Dict[str, Any]) -> str: + api_key = str( + config.get("api_key") + or config.get("api-key") + or config.get("authorization") + or "" + ).strip() + url = str(config.get("api_url") or config.get("url") or config.get("endpoint") or "").lower() + mode = str(config.get("mode", "")).lower() + if "workflows/run" in url or "chat-messages" in url or "completion-messages" in url: + return "dify" + if api_key.startswith("app-") or mode in {"workflow", "completion"}: + return "dify" + return "openai_compatible" + + @staticmethod + def _guess_dify_endpoint(config: Dict[str, Any]) -> str: + mode = str(config.get("mode", "chat")).strip().lower() + if mode == "workflow": + return "workflows/run" + if mode == "completion": + return "completion-messages" + return "chat-messages" + + @staticmethod + def _split_url(url: Optional[str]) -> Tuple[str, str]: + if not url: + return "", "" + parsed = urlparse(str(url)) + if not parsed.scheme or not parsed.netloc: + return "", str(url) + base = f"{parsed.scheme}://{parsed.netloc}" + return base, parsed.path.lstrip("/") + + @staticmethod + def _build_auth_header(value: str) -> str: + token = str(value or "").strip() + if not token: + return "" + if token.lower().startswith("bearer "): + return token + return f"Bearer {token}" + + @staticmethod + def _stringify_inputs(inputs: Dict[str, Any]) -> str: + if not inputs: + return "" + try: + return json.dumps(inputs, ensure_ascii=False) + except Exception: + return str(inputs)