""" WebUI 多功能面板 功能:实时日志查看 + 配置编辑器 + 插件管理 前端静态文件位于 utils/webui_static/ 目录。 """ import asyncio import base64 import collections import copy import hashlib import hmac import secrets import time import tomllib from pathlib import Path from aiohttp import web, WSMsgType from loguru import logger try: import tomli_w except ImportError: tomli_w = None class LogBuffer: """环形日志缓冲区""" def __init__(self, maxlen: int = 500): self._buffer = collections.deque(maxlen=maxlen) self._clients: set[web.WebSocketResponse] = set() def append(self, line: str): self._buffer.append(line) dead = set() for ws in self._clients: if ws.closed: dead.add(ws) continue task = asyncio.get_event_loop().create_task(ws.send_str(line)) task.add_done_callback(lambda t, w=ws: self._clients.discard(w) if t.exception() else None) self._clients -= dead def get_history(self) -> list[str]: return list(self._buffer) def add_client(self, ws: web.WebSocketResponse): self._clients.add(ws) def remove_client(self, ws: web.WebSocketResponse): self._clients.discard(ws) _log_buffer = LogBuffer() def get_log_buffer() -> LogBuffer: return _log_buffer def loguru_sink(message): """loguru sink""" text = str(message).rstrip("\n") _log_buffer.append(text) # 主配置 section 中文名映射 SECTION_LABELS = { "HttpHook": "Hook 连接", "Bot": "机器人", "Database": "数据库", "Performance": "性能", "Queue": "消息队列", "Concurrency": "并发控制", "Scheduler": "定时任务", "WebUI": "WebUI", } # 插件配置 section 中文名映射 PLUGIN_CONFIG_LABELS = { "AIChat": { "plugin": "插件基本信息", "api": "AI API 配置", "proxy": "代理配置", "prompt": "人设配置", "output": "输出后处理", "behavior": "触发行为", "memory": "对话记忆", "history": "群组历史记录", "image_description": "图片描述", "video_recognition": "视频识别", "rate_limit": "限流配置", "redis": "Redis 存储", "tools": "LLM 工具", "tools.timeout": "工具超时", "tools.concurrency": "工具并发", "vector_memory": "向量长期记忆", }, } # 静态文件目录 _STATIC_DIR = Path(__file__).parent / "webui_static" SESSION_COOKIE_NAME = "whb_webui_session" PBKDF2_ROUNDS = 240000 DEFAULT_WEBUI_USERNAME = "admin" DEFAULT_WEBUI_PASSWORD = "admin123456" DEFAULT_SESSION_TIMEOUT_SECONDS = 8 * 60 * 60 AUTH_MANAGED_KEYS = {"auth_username", "auth_password_hash"} def hash_password(password: str, *, salt: str | None = None) -> str: """生成 PBKDF2 密码哈希。""" if not isinstance(password, str) or password == "": raise ValueError("密码不能为空") if salt is None: salt = secrets.token_hex(16) digest = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt.encode("utf-8"), PBKDF2_ROUNDS, ) encoded = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") return f"pbkdf2_sha256${PBKDF2_ROUNDS}${salt}${encoded}" def verify_password(password: str, stored_hash: str) -> bool: """校验明文密码是否匹配哈希。""" if not isinstance(password, str) or not isinstance(stored_hash, str): return False try: algo, rounds, salt, digest = stored_hash.split("$", 3) rounds_int = int(rounds) except (ValueError, TypeError): return False if algo != "pbkdf2_sha256" or rounds_int <= 0: return False calculated = hashlib.pbkdf2_hmac( "sha256", password.encode("utf-8"), salt.encode("utf-8"), rounds_int, ) calculated_encoded = base64.urlsafe_b64encode(calculated).decode("ascii").rstrip("=") return hmac.compare_digest(calculated_encoded, digest) class WebUIServer: """WebUI HTTP 服务器""" def __init__(self, host: str = "0.0.0.0", port: int = 5001, config_path: str = "main_config.toml"): self.host = host self.port = port self.config_path = Path(config_path) self.app = web.Application(middlewares=[self._auth_middleware]) self.runner = None self._sessions: dict[str, dict[str, float | str]] = {} self._auth_update_lock = asyncio.Lock() self._setup_routes() self._ensure_auth_initialized() def _setup_routes(self): self.app.router.add_get("/", self._handle_index) self.app.router.add_get("/api/auth/status", self._handle_auth_status) self.app.router.add_post("/api/auth/login", self._handle_auth_login) self.app.router.add_post("/api/auth/logout", self._handle_auth_logout) self.app.router.add_post("/api/auth/change-credentials", self._handle_change_credentials) self.app.router.add_get("/ws", self._handle_ws) self.app.router.add_get("/api/config", self._handle_get_config) self.app.router.add_post("/api/config", self._handle_save_config) self.app.router.add_get("/api/plugins", self._handle_get_plugins) self.app.router.add_post("/api/plugins/toggle", self._handle_toggle_plugin) self.app.router.add_get("/api/plugins/{name}/config", self._handle_get_plugin_config) self.app.router.add_post("/api/plugins/{name}/config", self._handle_save_plugin_config) self.app.router.add_static("/static/", path=_STATIC_DIR, name="static") @staticmethod def _normalize_session_timeout(value) -> int: try: timeout = int(value) except Exception: return DEFAULT_SESSION_TIMEOUT_SECONDS return max(300, timeout) def _load_main_config(self) -> dict: with open(self.config_path, "rb") as f: return tomllib.load(f) def _save_main_config(self, data: dict): if tomli_w is None: raise RuntimeError("tomli_w 未安装,无法写入 main_config.toml") with open(self.config_path, "wb") as f: tomli_w.dump(data, f) def _ensure_auth_initialized(self): try: data = self._load_main_config() except Exception as e: logger.error(f"读取配置失败,无法初始化 WebUI 认证: {e}") return webui_cfg = data.setdefault("WebUI", {}) changed = False username = str(webui_cfg.get("auth_username", "")).strip() if not username: webui_cfg["auth_username"] = DEFAULT_WEBUI_USERNAME changed = True pwd_hash = str(webui_cfg.get("auth_password_hash", "")).strip() if not pwd_hash: webui_cfg["auth_password_hash"] = hash_password(DEFAULT_WEBUI_PASSWORD) changed = True logger.warning( "WebUI 未配置管理员账号密码,已初始化默认账号: " f"{DEFAULT_WEBUI_USERNAME} / {DEFAULT_WEBUI_PASSWORD},请尽快在 WebUI 的安全页面修改。" ) normalized_timeout = self._normalize_session_timeout( webui_cfg.get("session_timeout_seconds", DEFAULT_SESSION_TIMEOUT_SECONDS) ) if webui_cfg.get("session_timeout_seconds") != normalized_timeout: webui_cfg["session_timeout_seconds"] = normalized_timeout changed = True if changed: try: self._save_main_config(data) except Exception as e: logger.error(f"写入 WebUI 认证初始化配置失败: {e}") def _get_auth_settings(self) -> tuple[str, str, int]: try: data = self._load_main_config() webui_cfg = data.get("WebUI", {}) username = str(webui_cfg.get("auth_username", DEFAULT_WEBUI_USERNAME)).strip() or DEFAULT_WEBUI_USERNAME pwd_hash = str(webui_cfg.get("auth_password_hash", "")).strip() timeout = self._normalize_session_timeout( webui_cfg.get("session_timeout_seconds", DEFAULT_SESSION_TIMEOUT_SECONDS) ) if not pwd_hash: # 配置异常时兜底,避免直接失去登录能力 pwd_hash = hash_password(DEFAULT_WEBUI_PASSWORD) return username, pwd_hash, timeout except Exception as e: logger.error(f"读取 WebUI 认证配置失败,使用默认兜底: {e}") return DEFAULT_WEBUI_USERNAME, hash_password(DEFAULT_WEBUI_PASSWORD), DEFAULT_SESSION_TIMEOUT_SECONDS def _cleanup_sessions(self): now = time.time() expired_tokens = [ token for token, session in self._sessions.items() if float(session.get("expires_at", 0)) <= now ] for token in expired_tokens: self._sessions.pop(token, None) def _create_session(self, username: str, timeout_seconds: int) -> str: self._cleanup_sessions() token = secrets.token_urlsafe(32) self._sessions[token] = { "username": username, "expires_at": time.time() + timeout_seconds, } return token def _invalidate_session(self, token: str): if token: self._sessions.pop(token, None) def _set_session_cookie(self, response: web.Response, request: web.Request, token: str, timeout_seconds: int): response.set_cookie( SESSION_COOKIE_NAME, token, max_age=timeout_seconds, path="/", httponly=True, secure=request.secure, samesite="Lax", ) def _get_session_username(self, request: web.Request, *, refresh: bool = True) -> str | None: token = request.cookies.get(SESSION_COOKIE_NAME, "") if not token: return None self._cleanup_sessions() session = self._sessions.get(token) if not session: return None now = time.time() expires_at = float(session.get("expires_at", 0)) if expires_at <= now: self._sessions.pop(token, None) return None username = str(session.get("username", "")).strip() if not username: self._sessions.pop(token, None) return None if refresh: _, _, timeout = self._get_auth_settings() session["expires_at"] = now + timeout return username @staticmethod def _is_public_path(path: str) -> bool: return ( path == "/" or path.startswith("/static/") or path == "/favicon.ico" or path == "/api/auth/status" or path == "/api/auth/login" ) @web.middleware async def _auth_middleware(self, request: web.Request, handler): path = request.path if self._is_public_path(path): return await handler(request) username = self._get_session_username(request) if not username: if path.startswith("/api/") or path == "/ws": return web.json_response({"ok": False, "error": "未登录或会话已过期"}, status=401) raise web.HTTPFound("/") request["webui_user"] = username return await handler(request) async def _handle_auth_status(self, request: web.Request) -> web.Response: configured_username, _, _ = self._get_auth_settings() current_username = self._get_session_username(request, refresh=False) return web.json_response({ "ok": True, "authenticated": bool(current_username), "username": current_username or configured_username, }) async def _handle_auth_login(self, request: web.Request) -> web.Response: try: body = await request.json() except Exception: body = {} username = str(body.get("username", "")).strip() password = str(body.get("password", "")) configured_username, configured_hash, timeout = self._get_auth_settings() if username != configured_username or not verify_password(password, configured_hash): return web.json_response({"ok": False, "error": "用户名或密码错误"}, status=401) token = self._create_session(configured_username, timeout) response = web.json_response({"ok": True, "username": configured_username}) self._set_session_cookie(response, request, token, timeout) return response async def _handle_auth_logout(self, request: web.Request) -> web.Response: token = request.cookies.get(SESSION_COOKIE_NAME, "") if token: self._invalidate_session(token) response = web.json_response({"ok": True}) response.del_cookie(SESSION_COOKIE_NAME, path="/") return response async def _handle_change_credentials(self, request: web.Request) -> web.Response: if tomli_w is None: return web.json_response({"ok": False, "error": "tomli_w 未安装,无法保存认证配置"}) try: body = await request.json() current_password = str(body.get("current_password", "")) new_username = str(body.get("new_username", "")).strip() new_password = str(body.get("new_password", "")) except Exception: return web.json_response({"ok": False, "error": "请求参数格式错误"}) if not current_password: return web.json_response({"ok": False, "error": "请输入当前密码"}) if not new_username: return web.json_response({"ok": False, "error": "账号不能为空"}) if len(new_password) < 8: return web.json_response({"ok": False, "error": "新密码长度至少 8 位"}) async with self._auth_update_lock: try: data = self._load_main_config() webui_cfg = data.setdefault("WebUI", {}) stored_hash = str(webui_cfg.get("auth_password_hash", "")).strip() if not stored_hash: stored_hash = hash_password(DEFAULT_WEBUI_PASSWORD) if not verify_password(current_password, stored_hash): return web.json_response({"ok": False, "error": "当前密码错误"}) webui_cfg["auth_username"] = new_username webui_cfg["auth_password_hash"] = hash_password(new_password) webui_cfg["session_timeout_seconds"] = self._normalize_session_timeout( webui_cfg.get("session_timeout_seconds", DEFAULT_SESSION_TIMEOUT_SECONDS) ) self._save_main_config(data) timeout = self._normalize_session_timeout(webui_cfg["session_timeout_seconds"]) self._sessions.clear() new_token = self._create_session(new_username, timeout) response = web.json_response({"ok": True, "username": new_username}) self._set_session_cookie(response, request, new_token, timeout) return response except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_index(self, request: web.Request) -> web.Response: index_file = _STATIC_DIR / "index.html" return web.FileResponse(index_file) async def _handle_ws(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) _log_buffer.add_client(ws) for line in _log_buffer.get_history(): await ws.send_str(line) try: async for msg in ws: if msg.type == WSMsgType.ERROR: break finally: _log_buffer.remove_client(ws) return ws async def _handle_get_config(self, request: web.Request) -> web.Response: try: data = self._load_main_config() safe_data = copy.deepcopy(data) webui_cfg = safe_data.get("WebUI") if isinstance(webui_cfg, dict): for key in AUTH_MANAGED_KEYS: webui_cfg.pop(key, None) return web.json_response({"ok": True, "data": safe_data, "labels": SECTION_LABELS}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_save_config(self, request: web.Request) -> web.Response: if tomli_w is None: return web.json_response({"ok": False, "error": "tomli_w 未安装"}) try: body = await request.json() data = body.get("data", {}) # 避免配置编辑器覆盖认证字段(认证字段专用接口维护) current = self._load_main_config() current_webui = current.get("WebUI", {}) new_webui = data.setdefault("WebUI", {}) for key in AUTH_MANAGED_KEYS: if key in current_webui and key not in new_webui: new_webui[key] = current_webui[key] self._save_main_config(data) self._ensure_auth_initialized() return web.json_response({"ok": True}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_get_plugins(self, request: web.Request) -> web.Response: try: from utils.plugin_manager import PluginManager pm = PluginManager() plugins = [] for name, info in pm.plugin_info.items(): directory = info.get("directory", name) cfg_path = Path("plugins") / directory / "config.toml" plugins.append({ "name": name, "description": info.get("description", ""), "author": info.get("author", ""), "version": info.get("version", ""), "directory": directory, "enabled": info.get("enabled", False), "has_config": cfg_path.exists(), }) plugins.sort(key=lambda p: p["name"]) return web.json_response({"ok": True, "plugins": plugins}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_toggle_plugin(self, request: web.Request) -> web.Response: try: from utils.plugin_manager import PluginManager body = await request.json() name = body.get("name", "") enable = body.get("enable", False) if name == "ManagePlugin": return web.json_response({"ok": False, "error": "ManagePlugin 不可禁用"}) pm = PluginManager() if enable: ok = await pm.load_plugin(name) if ok: return web.json_response({"ok": True}) return web.json_response({"ok": False, "error": f"启用 {name} 失败"}) else: ok = await pm.unload_plugin(name) if ok: return web.json_response({"ok": True}) return web.json_response({"ok": False, "error": f"禁用 {name} 失败"}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_get_plugin_config(self, request: web.Request) -> web.Response: name = request.match_info["name"] try: from utils.plugin_manager import PluginManager pm = PluginManager() info = pm.plugin_info.get(name) if not info: return web.json_response({"ok": False, "error": f"插件 {name} 不存在"}) directory = info.get("directory", name) cfg_path = Path("plugins") / directory / "config.toml" if not cfg_path.exists(): return web.json_response({"ok": False, "error": "该插件无配置文件"}) with open(cfg_path, "rb") as f: data = tomllib.load(f) labels = PLUGIN_CONFIG_LABELS.get(name, {}) return web.json_response({"ok": True, "data": data, "labels": labels}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def _handle_save_plugin_config(self, request: web.Request) -> web.Response: if tomli_w is None: return web.json_response({"ok": False, "error": "tomli_w 未安装"}) name = request.match_info["name"] try: from utils.plugin_manager import PluginManager pm = PluginManager() info = pm.plugin_info.get(name) if not info: return web.json_response({"ok": False, "error": f"插件 {name} 不存在"}) directory = info.get("directory", name) cfg_path = Path("plugins") / directory / "config.toml" body = await request.json() data = body.get("data", {}) with open(cfg_path, "wb") as f: tomli_w.dump(data, f) return web.json_response({"ok": True}) except Exception as e: return web.json_response({"ok": False, "error": str(e)}) async def start(self): self.runner = web.AppRunner(self.app) await self.runner.setup() site = web.TCPSite(self.runner, self.host, self.port) await site.start() logger.success(f"WebUI 已启动: http://{self.host}:{self.port}") async def stop(self): if self.runner: await self.runner.cleanup() logger.info("WebUI 已停止")