Files
abot/utils/ai/llm_registry.py
liuwei 061f2b8084 feat: 重构LLM配置为Provider模板+Dify应用+Scene绑定
变更项:

1. 新增 LLM 目录数据层(t_llm_provider_templates/t_llm_dify_apps/t_llm_backends/t_llm_scenes/t_llm_catalog_meta),支持三层配置管理。

2. Robot 启动接入 llm_catalog_db:自动建表并从旧 llm(backends/scenes) 配置迁移初始化。

3. LLMRegistry 改为优先读取目录模型并按 scene 解析:dify_app 自动合并 Provider 模板与 app_key 差异,降低重复配置。

4. system 蓝图 /api/system/llm_config 改为目录模型读写,新增完整校验(provider引用、app_key、scene目标合法性)。

5. system_llm 页面重构为四块:Provider 模板、Dify 应用、通用 Backend、Scene 绑定,并展示插件依赖拓扑。

6. 保留 YAML 旧结构兜底展示与运行时回退,保证目录表异常时系统仍可运行。
2026-04-20 15:09:24 +08:00

378 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from db.connection import DBConnectionManager
class LLMRegistry:
"""统一 LLM 路由注册器scene -> target -> final_config
当前支持两套数据源:
1. 新目录模型推荐MySQL 的 provider_templates / dify_apps / backends / scenes
2. 旧模型兜底config.yaml 的 llm(backends/scenes)。
"""
_cache: Dict[str, Any] = {
"cache_until": 0.0,
"catalog": {},
"legacy_llm": {},
}
@classmethod
def get_root_config_path(cls) -> Path:
return Path(__file__).resolve().parents[2] / "config.yaml"
@classmethod
def _load_yaml_root(cls) -> Dict[str, Any]:
"""读取 YAML 根配置(仅作为迁移兜底)。"""
path = cls.get_root_config_path()
if not path.exists():
return {}
with open(path, "r", encoding="utf-8") as fp:
return yaml.safe_load(fp) or {}
@classmethod
def _load_yaml_legacy_llm(cls) -> Dict[str, Any]:
"""读取旧 llm 结构backends/scenes作为回退。"""
root = cls._load_yaml_root()
llm = root.get("llm", {}) or {}
return llm if isinstance(llm, dict) else {}
@staticmethod
def _loads_json(value: Any, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""把 JSON 字段安全转为 dict。"""
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
obj = json.loads(value)
return obj if isinstance(obj, dict) else (default or {})
except json.JSONDecodeError:
return default or {}
return default or {}
@classmethod
def _load_catalog_from_mysql(cls) -> Dict[str, Any]:
"""从 MySQL 读取新目录模型。"""
try:
db_manager = DBConnectionManager.get_instance()
if not db_manager or not db_manager.mysql_pool:
return {}
conn = db_manager.get_mysql_connection()
try:
with conn.cursor(dictionary=True) as cursor:
cursor.execute("SELECT name, provider_type, config_json, enabled FROM t_llm_provider_templates")
provider_rows = cursor.fetchall() or []
cursor.execute(
"""
SELECT name, provider_template, app_key, workflow_output_key, config_json, enabled
FROM t_llm_dify_apps
"""
)
app_rows = cursor.fetchall() or []
cursor.execute("SELECT name, config_json, enabled FROM t_llm_backends")
backend_rows = cursor.fetchall() or []
cursor.execute("SELECT name, target_type, target_ref, enabled FROM t_llm_scenes")
scene_rows = cursor.fetchall() or []
cursor.execute(
"SELECT meta_value FROM t_llm_catalog_meta WHERE meta_key='default_scene' LIMIT 1"
)
meta_row = cursor.fetchone() or {}
finally:
conn.close()
providers = {
str(row.get("name") or "").strip(): {
"name": str(row.get("name") or "").strip(),
"provider_type": str(row.get("provider_type") or "").strip().lower(),
"enabled": bool(int(row.get("enabled") or 0)),
"config": cls._loads_json(row.get("config_json"), {}),
}
for row in provider_rows
if str(row.get("name") or "").strip()
}
dify_apps = {
str(row.get("name") or "").strip(): {
"name": str(row.get("name") or "").strip(),
"provider_template": str(row.get("provider_template") or "").strip(),
"app_key": str(row.get("app_key") or "").strip(),
"workflow_output_key": str(row.get("workflow_output_key") or "text").strip(),
"enabled": bool(int(row.get("enabled") or 0)),
"config": cls._loads_json(row.get("config_json"), {}),
}
for row in app_rows
if str(row.get("name") or "").strip()
}
backends = {
str(row.get("name") or "").strip(): {
"name": str(row.get("name") or "").strip(),
"enabled": bool(int(row.get("enabled") or 0)),
"config": cls._loads_json(row.get("config_json"), {}),
}
for row in backend_rows
if str(row.get("name") or "").strip()
}
scenes = {
str(row.get("name") or "").strip(): {
"name": str(row.get("name") or "").strip(),
"target_type": str(row.get("target_type") or "").strip().lower(),
"target_ref": str(row.get("target_ref") or "").strip(),
"enabled": bool(int(row.get("enabled") or 0)),
}
for row in scene_rows
if str(row.get("name") or "").strip()
}
catalog = {
"default_scene": str(meta_row.get("meta_value") or "").strip(),
"providers": providers,
"dify_apps": dify_apps,
"backends": backends,
"scenes": scenes,
}
# 只要目录中存在场景,就认为新模型可用。
return catalog if scenes else {}
except Exception:
return {}
@classmethod
def _load_runtime_snapshot(cls) -> Dict[str, Any]:
"""加载运行时快照目录模型优先legacy 兜底)。"""
now = time.time()
if cls._cache.get("cache_until", 0.0) > now and cls._cache.get("catalog") is not None:
return {
"catalog": cls._cache.get("catalog", {}) or {},
"legacy_llm": cls._cache.get("legacy_llm", {}) or {},
}
catalog = cls._load_catalog_from_mysql()
legacy_llm = cls._load_yaml_legacy_llm()
cls._cache = {
"cache_until": now + 3.0,
"catalog": catalog or {},
"legacy_llm": legacy_llm or {},
}
return {"catalog": catalog or {}, "legacy_llm": legacy_llm or {}}
@classmethod
def _merge_dify_provider_and_app(cls, provider: Dict[str, Any], app: Dict[str, Any]) -> Dict[str, Any]:
"""把 Dify Provider 模板与 Dify 应用差异合并成最终调用配置。"""
provider_cfg = dict((provider or {}).get("config", {}) or {})
app_cfg = dict((app or {}).get("config", {}) or {})
merged = dict(provider_cfg)
merged.update(app_cfg)
# 强制写入 Dify 必要字段provider、api_key、workflow_output_key。
merged["provider"] = "dify"
merged["api_key"] = str((app or {}).get("app_key") or "").strip()
merged["workflow_output_key"] = str((app or {}).get("workflow_output_key") or "text").strip()
return merged
@classmethod
def _resolve_scene_with_catalog(cls, scene_name: str, catalog: Dict[str, Any]) -> Dict[str, Any]:
"""按新目录模型解析 scene。"""
scenes = (catalog or {}).get("scenes", {}) or {}
scene = scenes.get(scene_name, {}) or {}
if not scene or not scene.get("enabled", True):
return {}
target_type = str(scene.get("target_type") or "").strip().lower()
target_ref = str(scene.get("target_ref") or "").strip()
if not target_ref:
return {}
if target_type == "backend":
backend = ((catalog or {}).get("backends", {}) or {}).get(target_ref, {}) or {}
if not backend or not backend.get("enabled", True):
return {}
config = dict(backend.get("config", {}) or {})
config["backend"] = target_ref
config["scene"] = scene_name
return config
if target_type == "dify_app":
app = ((catalog or {}).get("dify_apps", {}) or {}).get(target_ref, {}) or {}
if not app or not app.get("enabled", True):
return {}
provider_name = str(app.get("provider_template") or "").strip()
provider = ((catalog or {}).get("providers", {}) or {}).get(provider_name, {}) or {}
if not provider or not provider.get("enabled", True):
return {}
config = cls._merge_dify_provider_and_app(provider, app)
# 生成稳定 backend 标识,便于日志追踪。
config["backend"] = f"dify_app::{target_ref}"
config["scene"] = scene_name
return config
return {}
@classmethod
def _resolve_scene_with_legacy(cls, scene_name: str, legacy_llm: Dict[str, Any]) -> Dict[str, Any]:
"""按旧 llm(backends/scenes) 结构解析 scene兜底"""
llm = legacy_llm or {}
backends = llm.get("backends", {}) or {}
scenes = llm.get("scenes", {}) or {}
if not isinstance(backends, dict):
backends = {}
if not isinstance(scenes, dict):
scenes = {}
backend_name = str(scenes.get(scene_name) or "").strip()
if not backend_name:
backend_name = str(llm.get("default_backend") or "").strip()
if not backend_name:
return {}
backend = backends.get(backend_name, {}) or {}
if not isinstance(backend, dict):
return {}
config = dict(backend)
config["backend"] = backend_name
config["scene"] = scene_name
return config
@classmethod
def get_catalog(cls) -> Dict[str, Any]:
"""对外暴露运行时目录(优先 MySQL"""
return cls._load_runtime_snapshot().get("catalog", {}) or {}
@classmethod
def get_llm_config(cls) -> Dict[str, Any]:
"""兼容输出:将目录模型转为 legacy llm(backends/scenes) 视图。
说明:
1. 若目录模型可用,则返回“展开后”的 backends/scenes
2. 若目录模型不可用,则原样返回 YAML 旧结构。
"""
snapshot = cls._load_runtime_snapshot()
catalog = snapshot.get("catalog", {}) or {}
if not catalog:
llm = snapshot.get("legacy_llm", {}) or {}
return llm if isinstance(llm, dict) else {}
backends: Dict[str, Any] = {}
scenes_map: Dict[str, str] = {}
for backend_name, backend in (catalog.get("backends", {}) or {}).items():
if not (backend or {}).get("enabled", True):
continue
backends[backend_name] = dict((backend or {}).get("config", {}) or {})
for app_name, app in (catalog.get("dify_apps", {}) or {}).items():
if not (app or {}).get("enabled", True):
continue
provider_name = str((app or {}).get("provider_template") or "").strip()
provider = ((catalog.get("providers", {}) or {}).get(provider_name, {}) or {})
if not provider or not provider.get("enabled", True):
continue
synthetic_backend = f"dify_app::{app_name}"
backends[synthetic_backend] = cls._merge_dify_provider_and_app(provider, app)
for scene_name, scene in (catalog.get("scenes", {}) or {}).items():
if not (scene or {}).get("enabled", True):
continue
target_type = str((scene or {}).get("target_type") or "").strip().lower()
target_ref = str((scene or {}).get("target_ref") or "").strip()
if not target_ref:
continue
if target_type == "dify_app":
scenes_map[scene_name] = f"dify_app::{target_ref}"
elif target_type == "backend":
scenes_map[scene_name] = target_ref
default_scene = str(catalog.get("default_scene") or "").strip()
default_backend = str(scenes_map.get(default_scene) or "").strip()
return {
"default_backend": default_backend,
"backends": backends,
"scenes": scenes_map,
}
@classmethod
def get_default_backend(cls) -> str:
llm_config = cls.get_llm_config()
return str(llm_config.get("default_backend", "") or "").strip()
@classmethod
def get_backend(cls, backend_name: str) -> Dict[str, Any]:
if not backend_name:
return {}
llm_config = cls.get_llm_config()
backends = llm_config.get("backends", {}) or {}
backend = backends.get(backend_name, {}) or {}
return dict(backend) if isinstance(backend, dict) else {}
@classmethod
def get_scenes(cls) -> Dict[str, str]:
llm_config = cls.get_llm_config()
raw_scenes = llm_config.get("scenes", {}) or {}
if not isinstance(raw_scenes, dict):
return {}
normalized: Dict[str, str] = {}
for raw_scene, raw_backend in raw_scenes.items():
scene_name = str(raw_scene or "").strip()
backend_name = str(raw_backend or "").strip()
if scene_name and backend_name:
normalized[scene_name] = backend_name
return normalized
@classmethod
def get_scene_backend_name(cls, scene_name: str) -> str:
name = str(scene_name or "").strip()
if not name:
return cls.get_default_backend()
scenes = cls.get_scenes()
backend_name = str(scenes.get(name, "") or "").strip()
if backend_name:
return backend_name
return cls.get_default_backend()
@classmethod
def resolve_by_scene(cls, scene_name: str) -> Dict[str, Any]:
"""按 scene 解析最终配置(优先目录模型)。"""
name = str(scene_name or "").strip()
if not name:
return {}
snapshot = cls._load_runtime_snapshot()
catalog = snapshot.get("catalog", {}) or {}
if catalog:
resolved = cls._resolve_scene_with_catalog(name, catalog)
if resolved:
return resolved
return cls._resolve_scene_with_legacy(name, snapshot.get("legacy_llm", {}) or {})
@classmethod
def resolve(cls, local_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
local = dict(local_config or {})
scene_name = str(local.get("scene") or "").strip()
if scene_name:
merged = cls.resolve_by_scene(scene_name)
merged.update(local)
# 以场景路由结果为准,避免调用方覆盖关键路由字段。
routed = cls.resolve_by_scene(scene_name)
if routed:
merged.update(routed)
merged["scene"] = scene_name
return merged
return local
@classmethod
def get_scene_names(cls) -> List[str]:
"""返回可用场景名列表(便于后台下拉框或校验)。"""
catalog = cls.get_catalog()
if catalog:
return sorted(list((catalog.get("scenes", {}) or {}).keys()))
return sorted(list(cls.get_scenes().keys()))