From 1446bf5f39ea09e44ddd2bbacec19fd977e11d35 Mon Sep 17 00:00:00 2001 From: liuwei Date: Mon, 20 Apr 2026 14:51:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=B0=86LLM=E9=85=8D=E7=BD=AE=E4=B8=BB?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E8=BF=81=E7=A7=BB=E5=88=B0MySQL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 变更项: 1) 新增 t_llm_config 数据访问层与建表逻辑。 2) Robot 启动时自动初始化并在空库时从 YAML 导入。 3) 后台 system LLM API 改为读写 MySQL。 4) LLMRegistry 改为优先 MySQL 读取并回退 YAML。 5) DashboardServer 挂载 llm_config_db 提供后台访问。 --- admin/dashboard/blueprints/system.py | 56 +++++++++--- admin/dashboard/server.py | 1 + db/llm_config_db.py | 125 +++++++++++++++++++++++++++ robot.py | 8 ++ utils/ai/llm_registry.py | 121 +++++++++++++++++++++++--- 5 files changed, 287 insertions(+), 24 deletions(-) create mode 100644 db/llm_config_db.py diff --git a/admin/dashboard/blueprints/system.py b/admin/dashboard/blueprints/system.py index dc7edf7..e099747 100644 --- a/admin/dashboard/blueprints/system.py +++ b/admin/dashboard/blueprints/system.py @@ -39,6 +39,43 @@ def _save_system_yaml(config_obj: dict) -> None: yaml.safe_dump(config_obj, f, allow_unicode=True, sort_keys=False) +def _load_llm_config_runtime() -> dict: + """读取运行时 LLM 配置。 + + 读取优先级: + 1. 优先从机器人挂载的 MySQL 配置读取(主数据源); + 2. 若数据库对象不可用或读取异常,回退到 config.yaml(兜底)。 + """ + try: + server = current_app.dashboard_server + llm_config_db = getattr(server, "llm_config_db", None) + if llm_config_db: + row = llm_config_db.get_config() or {} + if row: + return { + "default_backend": row.get("default_backend", ""), + "backends": row.get("backends", {}) or {}, + "scenes": row.get("scenes", {}) or {}, + } + except Exception as e: + logger.warning(f"从 MySQL 读取 LLM 配置失败,回退 YAML: {e}") + + config_obj = _load_system_yaml() + llm_config = config_obj.get("llm", {}) or {} + return llm_config if isinstance(llm_config, dict) else {} + + +def _save_llm_config_runtime(llm_config: dict) -> None: + """保存运行时 LLM 配置到主数据源(MySQL)。""" + server = current_app.dashboard_server + llm_config_db = getattr(server, "llm_config_db", None) + if not llm_config_db: + raise RuntimeError("llm_config_db 未初始化,无法保存 LLM 配置到 MySQL") + ok = llm_config_db.save_config(llm_config or {}, source="admin") + if not ok: + raise RuntimeError("保存 LLM 配置到 MySQL 失败") + + def _plugins_root_path() -> str: """返回插件根目录绝对路径。""" return os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins')) @@ -105,8 +142,7 @@ def _scan_plugin_llm_usage() -> list: def _build_llm_topology() -> dict: """构建 LLM 拓扑视图(供后台页面直观展示依赖关系)。""" - config_obj = _load_system_yaml() - llm_config = config_obj.get("llm", {}) or {} + llm_config = _load_llm_config_runtime() scenes = llm_config.get("scenes", {}) or {} backends = llm_config.get("backends", {}) or {} default_backend = str(llm_config.get("default_backend", "") or "").strip() @@ -287,12 +323,11 @@ def get_current_user_info(): @login_required def get_system_config_raw(): try: - server = current_app.dashboard_server config_path = _system_config_path() with open(config_path, 'r', encoding='utf-8') as f: config_text = f.read() - robot_config = getattr(getattr(server, "robot", None), "config", None) - llm_config = getattr(robot_config, "llm", {}) if robot_config else {} + # 这里展示“运行时有效”的 LLM 后端列表(优先 MySQL),避免与 YAML 展示不一致。 + llm_config = _load_llm_config_runtime() llm_backends = (llm_config or {}).get("backends", {}) return jsonify({ "success": True, @@ -333,8 +368,7 @@ def update_system_config(): @login_required def get_system_llm_config(): try: - config_obj = _load_system_yaml() - llm_config = config_obj.get("llm", {}) or {} + llm_config = _load_llm_config_runtime() backends = llm_config.get("backends", {}) or {} scenes = llm_config.get("scenes", {}) or {} backend_list = [] @@ -368,7 +402,8 @@ def get_system_llm_config(): "scenes": scene_list, "topology_rows": topology.get("topology_rows", []), "plugin_usages": topology.get("plugin_usages", []), - "config_path": _system_config_path(), + # 配置来源改为 MySQL;保留 YAML 路径用于排障与一次性导入核对。 + "config_path": f"mysql:t_llm_config (fallback yaml: {_system_config_path()})", } }) except Exception as e: @@ -428,13 +463,12 @@ def update_system_llm_config(): return jsonify({"success": False, "message": f"场景 {scene_name} 绑定的后端不存在"}), 400 normalized_scenes[scene_name] = backend_name - config_obj = _load_system_yaml() - config_obj["llm"] = { + llm_config = { "default_backend": default_backend, "backends": normalized_backends, "scenes": normalized_scenes, } - _save_system_yaml(config_obj) + _save_llm_config_runtime(llm_config) if getattr(server, "robot", None) and getattr(server.robot, "config", None): server.robot.config.reload() diff --git a/admin/dashboard/server.py b/admin/dashboard/server.py index 493da51..a8f79ed 100644 --- a/admin/dashboard/server.py +++ b/admin/dashboard/server.py @@ -51,6 +51,7 @@ class DashboardServer: self.plugin_schedule_db = robot_instance.plugin_schedule_db self.plugin_schedule_manager = robot_instance.plugin_schedule_manager self.group_plugin_config_db = robot_instance.group_plugin_config_db + self.llm_config_db = robot_instance.llm_config_db self.group_plugin_config_service = robot_instance.group_plugin_config_service # 获取联系人管理器实例 self.contact_manager = robot_instance.contact_manager diff --git a/db/llm_config_db.py b/db/llm_config_db.py new file mode 100644 index 0000000..8d65b58 --- /dev/null +++ b/db/llm_config_db.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +import json +from typing import Any, Dict + +from loguru import logger + +from db.base import BaseDBOperator +from db.connection import DBConnectionManager + + +class LLMConfigDBOperator(BaseDBOperator): + """LLM 配置数据库操作器。 + + 设计目标: + 1. 把原先存放在 config.yaml 的 llm 配置迁移到 MySQL,便于后台实时维护; + 2. 采用“单行配置”模型,降低维护复杂度:一行记录保存 default_backend/backends/scenes; + 3. 支持“首次启动自动导入 YAML 配置”,保证迁移过程对线上透明。 + """ + + def __init__(self, db_manager: DBConnectionManager): + super().__init__(db_manager) + + def init_tables(self) -> bool: + """初始化 LLM 配置表。 + + 字段说明: + - id: 固定主键,当前仅使用 id=1 作为全局配置; + - default_backend: 全局默认后端; + - backends_json: 后端配置大对象(JSON 字符串); + - scenes_json: 场景路由对象(JSON 字符串); + - source: 记录当前配置来源,便于后续排障。 + """ + try: + return self.execute_update( + """ + CREATE TABLE IF NOT EXISTS t_llm_config ( + id TINYINT PRIMARY KEY, + default_backend VARCHAR(128) NOT NULL DEFAULT '', + backends_json JSON NOT NULL, + scenes_json JSON NOT NULL, + source VARCHAR(32) NOT NULL DEFAULT 'mysql', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) + """ + ) + except Exception as e: + logger.error(f"初始化 t_llm_config 失败: {e}") + return False + + @staticmethod + def _loads_json(value: Any) -> Dict[str, Any]: + """将数据库 JSON 字段统一解析为 dict。""" + if isinstance(value, dict): + return value + if isinstance(value, str): + try: + obj = json.loads(value) + return obj if isinstance(obj, dict) else {} + except json.JSONDecodeError: + return {} + return {} + + def get_config(self) -> Dict[str, Any]: + """读取数据库中的 LLM 配置。""" + row = self.execute_query( + """ + SELECT id, default_backend, backends_json, scenes_json, source, updated_at + FROM t_llm_config + WHERE id = 1 + LIMIT 1 + """, + fetch_one=True, + ) or {} + if not row: + return {} + + return { + "default_backend": str(row.get("default_backend") or "").strip(), + "backends": self._loads_json(row.get("backends_json")), + "scenes": self._loads_json(row.get("scenes_json")), + "source": str(row.get("source") or "mysql").strip(), + "updated_at": row.get("updated_at"), + } + + def save_config(self, llm_config: Dict[str, Any], source: str = "mysql") -> bool: + """保存(覆盖)LLM 配置到数据库。""" + data = llm_config or {} + default_backend = str(data.get("default_backend") or "").strip() + backends = data.get("backends", {}) or {} + scenes = data.get("scenes", {}) or {} + if not isinstance(backends, dict): + backends = {} + if not isinstance(scenes, dict): + scenes = {} + + sql = """ + INSERT INTO t_llm_config (id, default_backend, backends_json, scenes_json, source) + VALUES (1, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + default_backend = VALUES(default_backend), + backends_json = VALUES(backends_json), + scenes_json = VALUES(scenes_json), + source = VALUES(source) + """ + params = ( + default_backend, + json.dumps(backends, ensure_ascii=False), + json.dumps(scenes, ensure_ascii=False), + str(source or "mysql"), + ) + return self.execute_update(sql, params) + + def bootstrap_from_yaml_if_empty(self, yaml_llm_config: Dict[str, Any]) -> bool: + """当数据库为空时,把 YAML 里的 llm 配置导入到数据库。 + + 迁移策略: + 1. 只在“表中无 id=1 配置”时执行,避免覆盖后台已维护的数据; + 2. 导入后标记 source=yaml_bootstrap,便于识别初始数据来源; + 3. 返回 True 表示“已有配置或导入成功”,False 表示导入失败。 + """ + existed = self.get_config() + if existed: + return True + return self.save_config(yaml_llm_config or {}, source="yaml_bootstrap") diff --git a/robot.py b/robot.py index a8b558c..9eb6ef6 100644 --- a/robot.py +++ b/robot.py @@ -18,6 +18,7 @@ from configuration import Config from db.connection import DBConnectionManager from db.contacts_db import ContactsDBOperator from db.group_plugin_config_db import GroupPluginConfigDBOperator +from db.llm_config_db import LLMConfigDBOperator from db.plugin_schedule_db import PluginScheduleDBOperator from db.system_job_db import SystemJobDBOperator from utils.system_jobs import SystemJobLoader @@ -71,9 +72,16 @@ class Robot: self.contacts_db = ContactsDBOperator(self.db_manager) self.group_plugin_config_db = GroupPluginConfigDBOperator(self.db_manager) + self.llm_config_db = LLMConfigDBOperator(self.db_manager) self.plugin_schedule_db = PluginScheduleDBOperator(self.db_manager) self.system_job_db = SystemJobDBOperator(self.db_manager) self.group_plugin_config_db.init_tables() + # LLM 配置迁移到 MySQL: + # 1. 先确保表存在; + # 2. 若库里没有配置,则从 config.yaml 的 llm 节点导入一次; + # 3. 后续运行时以数据库为准,YAML 仅作为初始导入来源与兜底。 + self.llm_config_db.init_tables() + self.llm_config_db.bootstrap_from_yaml_if_empty(self.config.llm) self.group_plugin_config_service = GroupPluginConfigService( db_operator=self.group_plugin_config_db, redis_client=self.db_manager.get_redis_connection(), diff --git a/utils/ai/llm_registry.py b/utils/ai/llm_registry.py index 772fccb..4261e41 100644 --- a/utils/ai/llm_registry.py +++ b/utils/ai/llm_registry.py @@ -1,40 +1,135 @@ from __future__ import annotations +import json +import time from pathlib import Path from typing import Any, Dict, Optional import yaml +from db.connection import DBConnectionManager + class LLMRegistry: - """从项目根 config.yaml 读取集中式 LLM 后端配置。""" + """集中式 LLM 配置注册器。 - _cache: Dict[str, Any] = {"mtime": None, "data": {}} + 读取优先级: + 1. 优先读取 MySQL(t_llm_config); + 2. MySQL 不可用或无数据时,回退读取 config.yaml 的 llm 节点。 + """ + + _cache: Dict[str, Any] = { + # cache_until: 缓存过期时间戳,避免每次调用都打数据库; + # data: 最近一次成功读取并归一化后的 llm 配置对象。 + "cache_until": 0.0, + "data": {}, + } @classmethod def get_root_config_path(cls) -> Path: return Path(__file__).resolve().parents[2] / "config.yaml" @classmethod - def load_root_config(cls) -> Dict[str, Any]: + def _load_llm_from_yaml(cls) -> Dict[str, Any]: + """从 YAML 读取 llm 配置(兜底来源)。""" path = cls.get_root_config_path() if not path.exists(): return {} - stat = path.stat() - if cls._cache["mtime"] == stat.st_mtime and cls._cache["data"]: - return cls._cache["data"] - with open(path, "r", encoding="utf-8") as fp: - data = yaml.safe_load(fp) or {} - cls._cache = {"mtime": stat.st_mtime, "data": data} - return data + root = yaml.safe_load(fp) or {} + llm_config = root.get("llm", {}) or {} + if not isinstance(llm_config, dict): + return {} + return llm_config + + @staticmethod + def _loads_json(value: Any) -> Dict[str, Any]: + """把数据库 JSON 字段统一解析为 dict。""" + if isinstance(value, dict): + return value + if isinstance(value, str): + try: + obj = json.loads(value) + return obj if isinstance(obj, dict) else {} + except json.JSONDecodeError: + return {} + return {} + + @classmethod + def _load_llm_from_mysql(cls) -> Dict[str, Any]: + """从 MySQL 读取 llm 配置。 + + 注意: + 1. 该函数必须“无副作用失败”,即任何异常都返回空 dict,交由上层做 YAML 回退; + 2. 不依赖 Robot 实例,直接走 DBConnectionManager 单例,便于在插件调用链路中复用。 + """ + try: + db_manager = DBConnectionManager.get_instance() + if not db_manager or not db_manager.mysql_pool: + return {} + + conn = db_manager.get_mysql_connection() + try: + with conn.cursor(dictionary=True) as cursor: + cursor.execute( + """ + SELECT default_backend, backends_json, scenes_json + FROM t_llm_config + WHERE id = 1 + LIMIT 1 + """ + ) + row = cursor.fetchone() or {} + finally: + conn.close() + + if not row: + return {} + + return { + "default_backend": str(row.get("default_backend") or "").strip(), + "backends": cls._loads_json(row.get("backends_json")), + "scenes": cls._loads_json(row.get("scenes_json")), + } + except Exception: + return {} + + @classmethod + def _normalize_llm_config(cls, llm_config: Dict[str, Any]) -> Dict[str, Any]: + """统一规整 llm 配置结构,避免下游出现类型分支。""" + data = llm_config if isinstance(llm_config, dict) else {} + default_backend = str(data.get("default_backend") or "").strip() + backends = data.get("backends", {}) or {} + scenes = data.get("scenes", {}) or {} + if not isinstance(backends, dict): + backends = {} + if not isinstance(scenes, dict): + scenes = {} + return { + "default_backend": default_backend, + "backends": backends, + "scenes": scenes, + } @classmethod def get_llm_config(cls) -> Dict[str, Any]: - config = cls.load_root_config() - llm_config = config.get("llm", {}) or {} - return llm_config if isinstance(llm_config, dict) else {} + """获取运行时 LLM 配置(优先 MySQL,失败回退 YAML)。""" + now = time.time() + if cls._cache.get("cache_until", 0.0) > now and cls._cache.get("data"): + return cls._cache["data"] + + llm_config = cls._load_llm_from_mysql() + if not llm_config: + llm_config = cls._load_llm_from_yaml() + + normalized = cls._normalize_llm_config(llm_config) + # 轻量缓存 3 秒:兼顾“后台编辑后较快生效”和“降低高频调用的 DB 压力”。 + cls._cache = { + "cache_until": now + 3.0, + "data": normalized, + } + return normalized @classmethod def get_default_backend(cls) -> str: