Some checks failed
Create and publish Docker images with specific build args / build-main-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-main-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda126-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda126-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-ollama-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-ollama-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-slim-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-slim-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Python CI / Format Backend (3.11.x) (push) Has been cancelled
Python CI / Format Backend (3.12.x) (push) Has been cancelled
Frontend Build / Format & Build Frontend (push) Has been cancelled
Frontend Build / Frontend Unit Tests (push) Has been cancelled
Create and publish Docker images with specific build args / merge-main-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-cuda-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-cuda126-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-ollama-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-slim-images (push) Has been cancelled
Close inactive issues / close-issues (push) Has been cancelled
1477 lines
53 KiB
Python
1477 lines
53 KiB
Python
import re
|
|
import uuid
|
|
import time
|
|
import datetime
|
|
import logging
|
|
from decimal import Decimal
|
|
|
|
from aiohttp import ClientSession
|
|
import urllib
|
|
|
|
|
|
from open_webui.models.auths import (
|
|
AddUserForm,
|
|
ApiKey,
|
|
Auths,
|
|
Token,
|
|
LdapForm,
|
|
SigninForm,
|
|
SigninResponse,
|
|
SignupForm,
|
|
UpdatePasswordForm,
|
|
)
|
|
from open_webui.models.users import (
|
|
UserProfileImageResponse,
|
|
Users,
|
|
UserModel,
|
|
UpdateProfileForm,
|
|
UserStatus,
|
|
)
|
|
from open_webui.models.groups import Groups
|
|
from open_webui.models.subscriptions import UserSubscriptions
|
|
from open_webui.models.oauth_sessions import OAuthSessions
|
|
|
|
from open_webui.constants import ERROR_MESSAGES, WEBHOOK_MESSAGES
|
|
from open_webui.env import (
|
|
WEBUI_AUTH,
|
|
WEBUI_AUTH_TRUSTED_EMAIL_HEADER,
|
|
WEBUI_AUTH_TRUSTED_NAME_HEADER,
|
|
WEBUI_AUTH_TRUSTED_GROUPS_HEADER,
|
|
WEBUI_AUTH_COOKIE_SAME_SITE,
|
|
WEBUI_AUTH_COOKIE_SECURE,
|
|
WEBUI_AUTH_SIGNOUT_REDIRECT_URL,
|
|
ENABLE_INITIAL_ADMIN_SIGNUP,
|
|
REDIS_URL,
|
|
REDIS_SENTINEL_HOSTS,
|
|
REDIS_SENTINEL_PORT,
|
|
REDIS_CLUSTER,
|
|
)
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from fastapi.responses import RedirectResponse, Response, JSONResponse
|
|
from open_webui.config import (
|
|
OPENID_PROVIDER_URL,
|
|
ENABLE_OAUTH_SIGNUP,
|
|
ENABLE_LDAP,
|
|
ENABLE_PASSWORD_AUTH,
|
|
)
|
|
from pydantic import BaseModel, Field
|
|
|
|
from open_webui.utils.misc import parse_duration, validate_email_format
|
|
from open_webui.utils.auth import (
|
|
validate_password,
|
|
verify_password,
|
|
decode_token,
|
|
invalidate_token,
|
|
create_api_key,
|
|
create_token,
|
|
get_admin_user,
|
|
get_verified_user,
|
|
get_current_user,
|
|
get_password_hash,
|
|
get_http_authorization_cred,
|
|
send_verify_email,
|
|
verify_email_by_code,
|
|
apply_branding,
|
|
send_signup_email_code,
|
|
verify_signup_email_code,
|
|
)
|
|
from open_webui.utils.webhook import post_webhook
|
|
from open_webui.utils.access_control import get_permissions, has_permission
|
|
from open_webui.utils.groups import apply_default_group_assignment
|
|
|
|
from open_webui.utils.redis import (
|
|
get_redis_client,
|
|
get_redis_connection,
|
|
get_sentinels_from_env,
|
|
)
|
|
from open_webui.utils.rate_limit import RateLimiter
|
|
|
|
|
|
from typing import Optional, List
|
|
|
|
from ssl import CERT_NONE, CERT_REQUIRED, PROTOCOL_TLS
|
|
|
|
from ldap3 import Server, Connection, NONE, Tls
|
|
from ldap3.utils.conv import escape_filter_chars
|
|
|
|
router = APIRouter()
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
signin_rate_limiter = RateLimiter(
|
|
redis_client=get_redis_client(), limit=5 * 3, window=60 * 3
|
|
)
|
|
|
|
############################
|
|
# GetSessionUser
|
|
############################
|
|
|
|
|
|
class SessionUserResponse(Token, UserProfileImageResponse):
|
|
expires_at: Optional[int] = None
|
|
permissions: Optional[dict] = None
|
|
|
|
|
|
class SessionUserInfoResponse(SessionUserResponse, UserStatus):
|
|
bio: Optional[str] = None
|
|
gender: Optional[str] = None
|
|
date_of_birth: Optional[datetime.date] = None
|
|
|
|
|
|
@router.get("/", response_model=SessionUserInfoResponse)
|
|
async def get_session_user(
|
|
request: Request, response: Response, user: UserModel = Depends(get_current_user)
|
|
):
|
|
auth_header = request.headers.get("Authorization")
|
|
auth_token = get_http_authorization_cred(auth_header)
|
|
token = auth_token.credentials
|
|
data = decode_token(token)
|
|
|
|
expires_at = None
|
|
|
|
if data:
|
|
expires_at = data.get("exp")
|
|
|
|
if (expires_at is not None) and int(time.time()) > expires_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=(
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
),
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
|
|
secure=WEBUI_AUTH_COOKIE_SECURE,
|
|
)
|
|
|
|
user_permissions = get_permissions(
|
|
user.id, request.app.state.config.USER_PERMISSIONS
|
|
)
|
|
|
|
# init subscription for user (assigns default free plan if not exists)
|
|
UserSubscriptions.get_or_create_subscription(user.id)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
"bio": user.bio,
|
|
"gender": user.gender,
|
|
"date_of_birth": user.date_of_birth,
|
|
"status_emoji": user.status_emoji,
|
|
"status_message": user.status_message,
|
|
"status_expires_at": user.status_expires_at,
|
|
"permissions": user_permissions,
|
|
}
|
|
|
|
|
|
############################
|
|
# Update Profile
|
|
############################
|
|
|
|
|
|
@router.post("/update/profile", response_model=UserProfileImageResponse)
|
|
async def update_profile(
|
|
form_data: UpdateProfileForm, session_user=Depends(get_verified_user)
|
|
):
|
|
if session_user:
|
|
user = Users.update_user_by_id(
|
|
session_user.id,
|
|
form_data.model_dump(),
|
|
)
|
|
if user:
|
|
return user
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.DEFAULT())
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# Update Password
|
|
############################
|
|
|
|
|
|
@router.post("/update/password", response_model=bool)
|
|
async def update_password(
|
|
form_data: UpdatePasswordForm, session_user=Depends(get_current_user)
|
|
):
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED)
|
|
if session_user:
|
|
user = Auths.authenticate_user(
|
|
session_user.email, lambda pw: verify_password(form_data.password, pw)
|
|
)
|
|
|
|
if user:
|
|
try:
|
|
validate_password(form_data.password)
|
|
except Exception as e:
|
|
raise HTTPException(400, detail=str(e))
|
|
hashed = get_password_hash(form_data.new_password)
|
|
return Auths.update_user_password_by_id(user.id, hashed)
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INCORRECT_PASSWORD)
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# LDAP Authentication
|
|
############################
|
|
@router.post("/ldap", response_model=SessionUserResponse)
|
|
async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
|
|
# Security checks FIRST - before loading any config
|
|
if not request.app.state.config.ENABLE_LDAP:
|
|
raise HTTPException(400, detail="LDAP authentication is not enabled")
|
|
|
|
if not ENABLE_PASSWORD_AUTH:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
|
|
)
|
|
|
|
# NOW load LDAP config variables
|
|
LDAP_SERVER_LABEL = request.app.state.config.LDAP_SERVER_LABEL
|
|
LDAP_SERVER_HOST = request.app.state.config.LDAP_SERVER_HOST
|
|
LDAP_SERVER_PORT = request.app.state.config.LDAP_SERVER_PORT
|
|
LDAP_ATTRIBUTE_FOR_MAIL = request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL
|
|
LDAP_ATTRIBUTE_FOR_USERNAME = request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME
|
|
LDAP_SEARCH_BASE = request.app.state.config.LDAP_SEARCH_BASE
|
|
LDAP_SEARCH_FILTERS = request.app.state.config.LDAP_SEARCH_FILTERS
|
|
LDAP_APP_DN = request.app.state.config.LDAP_APP_DN
|
|
LDAP_APP_PASSWORD = request.app.state.config.LDAP_APP_PASSWORD
|
|
LDAP_USE_TLS = request.app.state.config.LDAP_USE_TLS
|
|
LDAP_CA_CERT_FILE = request.app.state.config.LDAP_CA_CERT_FILE
|
|
LDAP_VALIDATE_CERT = (
|
|
CERT_REQUIRED if request.app.state.config.LDAP_VALIDATE_CERT else CERT_NONE
|
|
)
|
|
LDAP_CIPHERS = (
|
|
request.app.state.config.LDAP_CIPHERS
|
|
if request.app.state.config.LDAP_CIPHERS
|
|
else "ALL"
|
|
)
|
|
|
|
try:
|
|
tls = Tls(
|
|
validate=LDAP_VALIDATE_CERT,
|
|
version=PROTOCOL_TLS,
|
|
ca_certs_file=LDAP_CA_CERT_FILE,
|
|
ciphers=LDAP_CIPHERS,
|
|
)
|
|
except Exception as e:
|
|
log.error(f"TLS configuration error: {str(e)}")
|
|
raise HTTPException(400, detail="Failed to configure TLS for LDAP connection.")
|
|
|
|
try:
|
|
server = Server(
|
|
host=LDAP_SERVER_HOST,
|
|
port=LDAP_SERVER_PORT,
|
|
get_info=NONE,
|
|
use_ssl=LDAP_USE_TLS,
|
|
tls=tls,
|
|
)
|
|
connection_app = Connection(
|
|
server,
|
|
LDAP_APP_DN,
|
|
LDAP_APP_PASSWORD,
|
|
auto_bind="NONE",
|
|
authentication="SIMPLE" if LDAP_APP_DN else "ANONYMOUS",
|
|
)
|
|
if not connection_app.bind():
|
|
raise HTTPException(400, detail="Application account bind failed")
|
|
|
|
ENABLE_LDAP_GROUP_MANAGEMENT = (
|
|
request.app.state.config.ENABLE_LDAP_GROUP_MANAGEMENT
|
|
)
|
|
ENABLE_LDAP_GROUP_CREATION = request.app.state.config.ENABLE_LDAP_GROUP_CREATION
|
|
LDAP_ATTRIBUTE_FOR_GROUPS = request.app.state.config.LDAP_ATTRIBUTE_FOR_GROUPS
|
|
|
|
search_attributes = [
|
|
f"{LDAP_ATTRIBUTE_FOR_USERNAME}",
|
|
f"{LDAP_ATTRIBUTE_FOR_MAIL}",
|
|
"cn",
|
|
]
|
|
if ENABLE_LDAP_GROUP_MANAGEMENT:
|
|
search_attributes.append(f"{LDAP_ATTRIBUTE_FOR_GROUPS}")
|
|
log.info(
|
|
f"LDAP Group Management enabled. Adding {LDAP_ATTRIBUTE_FOR_GROUPS} to search attributes"
|
|
)
|
|
log.info(f"LDAP search attributes: {search_attributes}")
|
|
|
|
search_success = connection_app.search(
|
|
search_base=LDAP_SEARCH_BASE,
|
|
search_filter=f"(&({LDAP_ATTRIBUTE_FOR_USERNAME}={escape_filter_chars(form_data.user.lower())}){LDAP_SEARCH_FILTERS})",
|
|
attributes=search_attributes,
|
|
)
|
|
if not search_success or not connection_app.entries:
|
|
raise HTTPException(400, detail="User not found in the LDAP server")
|
|
|
|
entry = connection_app.entries[0]
|
|
entry_username = entry[f"{LDAP_ATTRIBUTE_FOR_USERNAME}"].value
|
|
email = entry[
|
|
f"{LDAP_ATTRIBUTE_FOR_MAIL}"
|
|
].value # retrieve the Attribute value
|
|
|
|
username_list = [] # list of usernames from LDAP attribute
|
|
if isinstance(entry_username, list):
|
|
username_list = [str(name).lower() for name in entry_username]
|
|
else:
|
|
username_list = [str(entry_username).lower()]
|
|
|
|
# TODO: support multiple emails if LDAP returns a list
|
|
if not email:
|
|
raise HTTPException(400, "User does not have a valid email address.")
|
|
elif isinstance(email, str):
|
|
email = email.lower()
|
|
elif isinstance(email, list):
|
|
email = email[0].lower()
|
|
else:
|
|
email = str(email).lower()
|
|
|
|
cn = str(entry["cn"]) # common name
|
|
user_dn = entry.entry_dn # user distinguished name
|
|
|
|
user_groups = []
|
|
if ENABLE_LDAP_GROUP_MANAGEMENT and LDAP_ATTRIBUTE_FOR_GROUPS in entry:
|
|
group_dns = entry[LDAP_ATTRIBUTE_FOR_GROUPS]
|
|
log.info(f"LDAP raw group DNs for user {username_list}: {group_dns}")
|
|
|
|
if group_dns:
|
|
log.info(f"LDAP group_dns original: {group_dns}")
|
|
log.info(f"LDAP group_dns type: {type(group_dns)}")
|
|
log.info(f"LDAP group_dns length: {len(group_dns)}")
|
|
|
|
if hasattr(group_dns, "value"):
|
|
group_dns = group_dns.value
|
|
log.info(f"Extracted .value property: {group_dns}")
|
|
elif hasattr(group_dns, "__iter__") and not isinstance(
|
|
group_dns, (str, bytes)
|
|
):
|
|
group_dns = list(group_dns)
|
|
log.info(f"Converted to list: {group_dns}")
|
|
|
|
if isinstance(group_dns, list):
|
|
group_dns = [str(item) for item in group_dns]
|
|
else:
|
|
group_dns = [str(group_dns)]
|
|
|
|
log.info(
|
|
f"LDAP group_dns after processing - type: {type(group_dns)}, length: {len(group_dns)}"
|
|
)
|
|
|
|
for group_idx, group_dn in enumerate(group_dns):
|
|
group_dn = str(group_dn)
|
|
log.info(f"Processing group DN #{group_idx + 1}: {group_dn}")
|
|
|
|
try:
|
|
group_cn = None
|
|
|
|
for item in group_dn.split(","):
|
|
item = item.strip()
|
|
if item.upper().startswith("CN="):
|
|
group_cn = item[3:]
|
|
break
|
|
|
|
if group_cn:
|
|
user_groups.append(group_cn)
|
|
|
|
else:
|
|
log.warning(
|
|
f"Could not extract CN from group DN: {group_dn}"
|
|
)
|
|
except Exception as e:
|
|
log.warning(
|
|
f"Failed to extract group name from DN {group_dn}: {e}"
|
|
)
|
|
|
|
log.info(
|
|
f"LDAP groups for user {username_list}: {user_groups} (total: {len(user_groups)})"
|
|
)
|
|
else:
|
|
log.info(f"No groups found for user {username_list}")
|
|
elif ENABLE_LDAP_GROUP_MANAGEMENT:
|
|
log.warning(
|
|
f"LDAP Group Management enabled but {LDAP_ATTRIBUTE_FOR_GROUPS} attribute not found in user entry"
|
|
)
|
|
|
|
if username_list and form_data.user.lower() in username_list:
|
|
connection_user = Connection(
|
|
server,
|
|
user_dn,
|
|
form_data.password,
|
|
auto_bind="NONE",
|
|
authentication="SIMPLE",
|
|
)
|
|
if not connection_user.bind():
|
|
raise HTTPException(400, "Authentication failed.")
|
|
|
|
user = Users.get_user_by_email(email)
|
|
if not user:
|
|
try:
|
|
role = (
|
|
"admin"
|
|
if not Users.has_users()
|
|
else request.app.state.config.DEFAULT_USER_ROLE
|
|
)
|
|
|
|
user = Auths.insert_new_auth(
|
|
email=email,
|
|
password=str(uuid.uuid4()),
|
|
name=cn,
|
|
role=role,
|
|
)
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
500, detail=ERROR_MESSAGES.CREATE_USER_ERROR
|
|
)
|
|
|
|
apply_default_group_assignment(
|
|
request.app.state.config.DEFAULT_GROUP_ID,
|
|
user.id,
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as err:
|
|
log.error(f"LDAP user creation error: {str(err)}")
|
|
raise HTTPException(
|
|
500, detail="Internal error occurred during LDAP user creation."
|
|
)
|
|
|
|
user = Auths.authenticate_user_by_email(email)
|
|
|
|
if user:
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=(
|
|
datetime.datetime.fromtimestamp(
|
|
expires_at, datetime.timezone.utc
|
|
)
|
|
if expires_at
|
|
else None
|
|
),
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
|
|
secure=WEBUI_AUTH_COOKIE_SECURE,
|
|
)
|
|
|
|
user_permissions = get_permissions(
|
|
user.id, request.app.state.config.USER_PERMISSIONS
|
|
)
|
|
|
|
# init subscription for user
|
|
UserSubscriptions.get_or_create_subscription(user.id)
|
|
|
|
if (
|
|
user.role != "admin"
|
|
and ENABLE_LDAP_GROUP_MANAGEMENT
|
|
and user_groups
|
|
):
|
|
if ENABLE_LDAP_GROUP_CREATION:
|
|
Groups.create_groups_by_group_names(user.id, user_groups)
|
|
try:
|
|
Groups.sync_groups_by_group_names(user.id, user_groups)
|
|
log.info(
|
|
f"Successfully synced groups for user {user.id}: {user_groups}"
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Failed to sync groups for user {user.id}: {e}")
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
"permissions": user_permissions,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
else:
|
|
raise HTTPException(400, "User record mismatch.")
|
|
except Exception as e:
|
|
log.error(f"LDAP authentication error: {str(e)}")
|
|
raise HTTPException(400, detail="LDAP authentication failed.")
|
|
|
|
|
|
############################
|
|
# SignIn
|
|
############################
|
|
|
|
|
|
@router.post("/signin", response_model=SessionUserResponse)
|
|
async def signin(request: Request, response: Response, form_data: SigninForm):
|
|
if not ENABLE_PASSWORD_AUTH:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
|
|
)
|
|
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER:
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER not in request.headers:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_TRUSTED_HEADER)
|
|
|
|
email = request.headers[WEBUI_AUTH_TRUSTED_EMAIL_HEADER].lower()
|
|
name = email
|
|
|
|
if WEBUI_AUTH_TRUSTED_NAME_HEADER:
|
|
name = request.headers.get(WEBUI_AUTH_TRUSTED_NAME_HEADER, email)
|
|
try:
|
|
name = urllib.parse.unquote(name, encoding="utf-8")
|
|
except Exception as e:
|
|
pass
|
|
|
|
if not Users.get_user_by_email(email.lower()):
|
|
await signup(
|
|
request,
|
|
response,
|
|
SignupForm(email=email, password=str(uuid.uuid4()), name=name),
|
|
)
|
|
|
|
user = Auths.authenticate_user_by_email(email)
|
|
if WEBUI_AUTH_TRUSTED_GROUPS_HEADER and user and user.role != "admin":
|
|
group_names = request.headers.get(
|
|
WEBUI_AUTH_TRUSTED_GROUPS_HEADER, ""
|
|
).split(",")
|
|
group_names = [name.strip() for name in group_names if name.strip()]
|
|
|
|
if group_names:
|
|
Groups.sync_groups_by_group_names(user.id, group_names)
|
|
|
|
elif WEBUI_AUTH == False:
|
|
admin_email = "admin@localhost"
|
|
admin_password = "admin"
|
|
|
|
if Users.get_user_by_email(admin_email.lower()):
|
|
user = Auths.authenticate_user(
|
|
admin_email.lower(), lambda pw: verify_password(admin_password, pw)
|
|
)
|
|
else:
|
|
if Users.has_users():
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EXISTING_USERS)
|
|
|
|
await signup(
|
|
request,
|
|
response,
|
|
SignupForm(email=admin_email, password=admin_password, name="User"),
|
|
)
|
|
|
|
user = Auths.authenticate_user(
|
|
admin_email.lower(), lambda pw: verify_password(admin_password, pw)
|
|
)
|
|
else:
|
|
if signin_rate_limiter.is_limited(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=ERROR_MESSAGES.RATE_LIMIT_EXCEEDED,
|
|
)
|
|
|
|
password_bytes = form_data.password.encode("utf-8")
|
|
if len(password_bytes) > 72:
|
|
# TODO: Implement other hashing algorithms that support longer passwords
|
|
log.info("Password too long, truncating to 72 bytes for bcrypt")
|
|
password_bytes = password_bytes[:72]
|
|
|
|
# decode safely — ignore incomplete UTF-8 sequences
|
|
form_data.password = password_bytes.decode("utf-8", errors="ignore")
|
|
|
|
user = Auths.authenticate_user(
|
|
form_data.email.lower(), lambda pw: verify_password(form_data.password, pw)
|
|
)
|
|
|
|
if user:
|
|
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
datetime_expires_at = (
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=datetime_expires_at,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
|
|
secure=WEBUI_AUTH_COOKIE_SECURE,
|
|
)
|
|
|
|
user_permissions = get_permissions(
|
|
user.id, request.app.state.config.USER_PERMISSIONS
|
|
)
|
|
|
|
# init subscription for user
|
|
UserSubscriptions.get_or_create_subscription(user.id)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
"permissions": user_permissions,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# Send Email Code
|
|
############################
|
|
|
|
|
|
class SendEmailCodeForm(BaseModel):
|
|
email: str
|
|
|
|
|
|
@router.post("/send_email_code")
|
|
async def send_email_code(request: Request, form_data: SendEmailCodeForm):
|
|
if not validate_email_format(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
|
|
)
|
|
|
|
# check if email already registered
|
|
if Users.get_user_by_email(form_data.email.lower()):
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
try:
|
|
send_signup_email_code(form_data.email.lower())
|
|
return {"success": True, "message": "Verification code sent"}
|
|
except Exception as e:
|
|
log.error(f"Failed to send email code: {e}")
|
|
raise HTTPException(500, detail="Failed to send verification code")
|
|
|
|
|
|
############################
|
|
# SignUp
|
|
############################
|
|
|
|
|
|
@router.post("/signup", response_model=SessionUserResponse)
|
|
async def signup(request: Request, response: Response, form_data: SignupForm):
|
|
has_users = Users.has_users()
|
|
|
|
if WEBUI_AUTH:
|
|
if (
|
|
not request.app.state.config.ENABLE_SIGNUP
|
|
or not request.app.state.config.ENABLE_LOGIN_FORM
|
|
):
|
|
if has_users or not ENABLE_INITIAL_ADMIN_SIGNUP:
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
|
|
)
|
|
else:
|
|
if has_users:
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
|
|
)
|
|
|
|
# check for email domain whitelist
|
|
email_domain_whitelist = [
|
|
i.strip()
|
|
for i in request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST.split(",")
|
|
if i
|
|
]
|
|
if email_domain_whitelist:
|
|
domain = form_data.email.split("@")[-1]
|
|
if domain not in email_domain_whitelist:
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN,
|
|
detail=f"Only emails from {request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST} are allowed",
|
|
)
|
|
|
|
if not validate_email_format(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
|
|
)
|
|
|
|
if Users.get_user_by_email(form_data.email.lower()):
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
# verify email code for non-admin signup
|
|
if has_users and form_data.email_code:
|
|
if not verify_signup_email_code(form_data.email.lower(), form_data.email_code):
|
|
raise HTTPException(400, detail="Invalid or expired verification code")
|
|
elif has_users and not form_data.email_code:
|
|
raise HTTPException(400, detail="Verification code is required")
|
|
|
|
try:
|
|
try:
|
|
validate_password(form_data.password)
|
|
except Exception as e:
|
|
raise HTTPException(400, detail=str(e))
|
|
|
|
hashed = get_password_hash(form_data.password)
|
|
|
|
if not has_users:
|
|
role = "admin"
|
|
elif request.app.state.config.ENABLE_SIGNUP_VERIFY:
|
|
role = "pending"
|
|
send_verify_email(email=form_data.email.lower())
|
|
else:
|
|
role = request.app.state.config.DEFAULT_USER_ROLE
|
|
|
|
user = Auths.insert_new_auth(
|
|
form_data.email.lower(),
|
|
hashed,
|
|
form_data.name,
|
|
form_data.profile_image_url,
|
|
role,
|
|
)
|
|
|
|
if user:
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
datetime_expires_at = (
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=datetime_expires_at,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_AUTH_COOKIE_SAME_SITE,
|
|
secure=WEBUI_AUTH_COOKIE_SECURE,
|
|
)
|
|
|
|
if request.app.state.config.WEBHOOK_URL:
|
|
await post_webhook(
|
|
request.app.state.WEBUI_NAME,
|
|
request.app.state.config.WEBHOOK_URL,
|
|
WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
|
|
{
|
|
"action": "signup",
|
|
"message": WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
|
|
"user": user.model_dump_json(exclude_none=True),
|
|
},
|
|
)
|
|
|
|
user_permissions = get_permissions(
|
|
user.id, request.app.state.config.USER_PERMISSIONS
|
|
)
|
|
|
|
if not has_users:
|
|
# Disable signup after the first user is created
|
|
request.app.state.config.ENABLE_SIGNUP = False
|
|
|
|
apply_default_group_assignment(
|
|
request.app.state.config.DEFAULT_GROUP_ID,
|
|
user.id,
|
|
)
|
|
|
|
# init subscription for user
|
|
UserSubscriptions.get_or_create_subscription(user.id)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
"permissions": user_permissions,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
|
|
except Exception as err:
|
|
log.error(f"Signup error: {str(err)}")
|
|
raise HTTPException(500, detail="An internal error occurred during signup.")
|
|
|
|
|
|
@router.get("/signup_verify/{code}")
|
|
async def signup_verify(request: Request, code: str):
|
|
email = verify_email_by_code(code=code)
|
|
if not email:
|
|
raise HTTPException(403, detail="Invalid code")
|
|
|
|
user = Users.get_user_by_email(email)
|
|
if not user:
|
|
raise HTTPException(404, detail="User not found")
|
|
|
|
Users.update_user_role_by_id(user.id, "user")
|
|
return RedirectResponse(url=request.app.state.config.WEBUI_URL)
|
|
|
|
|
|
@router.get("/signout")
|
|
async def signout(request: Request, response: Response):
|
|
|
|
# get auth token from headers or cookies
|
|
token = None
|
|
auth_header = request.headers.get("Authorization")
|
|
if auth_header:
|
|
auth_cred = get_http_authorization_cred(auth_header)
|
|
token = auth_cred.credentials
|
|
else:
|
|
token = request.cookies.get("token")
|
|
|
|
if token:
|
|
await invalidate_token(request, token)
|
|
|
|
response.delete_cookie("token")
|
|
response.delete_cookie("oui-session")
|
|
response.delete_cookie("oauth_id_token")
|
|
|
|
oauth_session_id = request.cookies.get("oauth_session_id")
|
|
if oauth_session_id:
|
|
response.delete_cookie("oauth_session_id")
|
|
|
|
session = OAuthSessions.get_session_by_id(oauth_session_id)
|
|
oauth_server_metadata_url = (
|
|
request.app.state.oauth_manager.get_server_metadata_url(session.provider)
|
|
if session
|
|
else None
|
|
) or OPENID_PROVIDER_URL.value
|
|
|
|
if session and oauth_server_metadata_url:
|
|
oauth_id_token = session.token.get("id_token")
|
|
try:
|
|
async with ClientSession(trust_env=True) as session:
|
|
async with session.get(oauth_server_metadata_url) as r:
|
|
if r.status == 200:
|
|
openid_data = await r.json()
|
|
logout_url = openid_data.get("end_session_endpoint")
|
|
|
|
if logout_url:
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"status": True,
|
|
"redirect_url": f"{logout_url}?id_token_hint={oauth_id_token}"
|
|
+ (
|
|
f"&post_logout_redirect_uri={WEBUI_AUTH_SIGNOUT_REDIRECT_URL}"
|
|
if WEBUI_AUTH_SIGNOUT_REDIRECT_URL
|
|
else ""
|
|
),
|
|
},
|
|
headers=response.headers,
|
|
)
|
|
else:
|
|
raise Exception("Failed to fetch OpenID configuration")
|
|
|
|
except Exception as e:
|
|
log.error(f"OpenID signout error: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to sign out from the OpenID provider.",
|
|
headers=response.headers,
|
|
)
|
|
|
|
if WEBUI_AUTH_SIGNOUT_REDIRECT_URL:
|
|
return JSONResponse(
|
|
status_code=200,
|
|
content={
|
|
"status": True,
|
|
"redirect_url": WEBUI_AUTH_SIGNOUT_REDIRECT_URL,
|
|
},
|
|
headers=response.headers,
|
|
)
|
|
|
|
return JSONResponse(
|
|
status_code=200, content={"status": True}, headers=response.headers
|
|
)
|
|
|
|
|
|
############################
|
|
# AddUser
|
|
############################
|
|
|
|
|
|
@router.post("/add", response_model=SigninResponse)
|
|
async def add_user(
|
|
request: Request, form_data: AddUserForm, user=Depends(get_admin_user)
|
|
):
|
|
if not validate_email_format(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
|
|
)
|
|
|
|
if Users.get_user_by_email(form_data.email.lower()):
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
try:
|
|
try:
|
|
validate_password(form_data.password)
|
|
except Exception as e:
|
|
raise HTTPException(400, detail=str(e))
|
|
|
|
hashed = get_password_hash(form_data.password)
|
|
user = Auths.insert_new_auth(
|
|
form_data.email.lower(),
|
|
hashed,
|
|
form_data.name,
|
|
form_data.profile_image_url,
|
|
form_data.role,
|
|
)
|
|
|
|
if user:
|
|
apply_default_group_assignment(
|
|
request.app.state.config.DEFAULT_GROUP_ID,
|
|
user.id,
|
|
)
|
|
|
|
token = create_token(data={"id": user.id})
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
|
|
except Exception as err:
|
|
log.error(f"Add user error: {str(err)}")
|
|
raise HTTPException(
|
|
500, detail="An internal error occurred while adding the user."
|
|
)
|
|
|
|
|
|
############################
|
|
# GetAdminDetails
|
|
############################
|
|
|
|
|
|
@router.get("/admin/details")
|
|
async def get_admin_details(request: Request, user=Depends(get_current_user)):
|
|
if request.app.state.config.SHOW_ADMIN_DETAILS:
|
|
admin_email = request.app.state.config.ADMIN_EMAIL
|
|
admin_name = None
|
|
|
|
log.info(f"Admin details - Email: {admin_email}, Name: {admin_name}")
|
|
|
|
if admin_email:
|
|
admin = Users.get_user_by_email(admin_email)
|
|
if admin:
|
|
admin_name = admin.name
|
|
else:
|
|
admin = Users.get_first_user()
|
|
if admin:
|
|
admin_email = admin.email
|
|
admin_name = admin.name
|
|
|
|
return {
|
|
"name": admin_name,
|
|
"email": admin_email,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED)
|
|
|
|
|
|
############################
|
|
# ToggleSignUp
|
|
############################
|
|
|
|
|
|
@router.get("/admin/config")
|
|
async def get_admin_config(request: Request, user=Depends(get_admin_user)):
|
|
return {
|
|
"SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS,
|
|
"WEBUI_URL": request.app.state.config.WEBUI_URL,
|
|
"ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP,
|
|
"ENABLE_SIGNUP_VERIFY": request.app.state.config.ENABLE_SIGNUP_VERIFY,
|
|
"SIGNUP_EMAIL_DOMAIN_WHITELIST": request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST,
|
|
"SMTP_HOST": request.app.state.config.SMTP_HOST,
|
|
"SMTP_PORT": request.app.state.config.SMTP_PORT,
|
|
"SMTP_USERNAME": request.app.state.config.SMTP_USERNAME,
|
|
"SMTP_PASSWORD": request.app.state.config.SMTP_PASSWORD,
|
|
"SMTP_SENT_FROM": request.app.state.config.SMTP_SENT_FROM,
|
|
"ENABLE_API_KEYS": request.app.state.config.ENABLE_API_KEYS,
|
|
"ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS": request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS,
|
|
"API_KEYS_ALLOWED_ENDPOINTS": request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS,
|
|
"DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE,
|
|
"DEFAULT_GROUP_ID": request.app.state.config.DEFAULT_GROUP_ID,
|
|
"JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN,
|
|
"ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING,
|
|
"ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING,
|
|
"ENABLE_FOLDERS": request.app.state.config.ENABLE_FOLDERS,
|
|
"ENABLE_CHANNELS": request.app.state.config.ENABLE_CHANNELS,
|
|
"ENABLE_NOTES": request.app.state.config.ENABLE_NOTES,
|
|
"ENABLE_USER_WEBHOOKS": request.app.state.config.ENABLE_USER_WEBHOOKS,
|
|
"PENDING_USER_OVERLAY_TITLE": request.app.state.config.PENDING_USER_OVERLAY_TITLE,
|
|
"PENDING_USER_OVERLAY_CONTENT": request.app.state.config.PENDING_USER_OVERLAY_CONTENT,
|
|
"RESPONSE_WATERMARK": request.app.state.config.RESPONSE_WATERMARK,
|
|
}
|
|
|
|
|
|
class AdminConfig(BaseModel):
|
|
SHOW_ADMIN_DETAILS: bool
|
|
WEBUI_URL: str
|
|
ENABLE_SIGNUP: bool
|
|
ENABLE_SIGNUP_VERIFY: bool = Field(default=False)
|
|
SIGNUP_EMAIL_DOMAIN_WHITELIST: str = Field(default="")
|
|
SMTP_HOST: str
|
|
SMTP_PORT: str
|
|
SMTP_USERNAME: str
|
|
SMTP_PASSWORD: str
|
|
SMTP_SENT_FROM: str
|
|
ENABLE_API_KEYS: bool
|
|
ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS: bool
|
|
API_KEYS_ALLOWED_ENDPOINTS: str
|
|
DEFAULT_USER_ROLE: str
|
|
DEFAULT_GROUP_ID: str
|
|
JWT_EXPIRES_IN: str
|
|
ENABLE_COMMUNITY_SHARING: bool
|
|
ENABLE_MESSAGE_RATING: bool
|
|
ENABLE_FOLDERS: bool
|
|
ENABLE_CHANNELS: bool
|
|
ENABLE_NOTES: bool
|
|
ENABLE_USER_WEBHOOKS: bool
|
|
PENDING_USER_OVERLAY_TITLE: Optional[str] = None
|
|
PENDING_USER_OVERLAY_CONTENT: Optional[str] = None
|
|
RESPONSE_WATERMARK: Optional[str] = None
|
|
|
|
|
|
@router.post("/admin/config")
|
|
async def update_admin_config(
|
|
request: Request, form_data: AdminConfig, user=Depends(get_admin_user)
|
|
):
|
|
# verify redis status
|
|
if form_data.ENABLE_SIGNUP_VERIFY:
|
|
# check redis
|
|
_redis = get_redis_connection(
|
|
redis_url=REDIS_URL,
|
|
redis_sentinels=get_sentinels_from_env(
|
|
REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT
|
|
),
|
|
redis_cluster=REDIS_CLUSTER,
|
|
)
|
|
if not _redis:
|
|
raise HTTPException(status_code=400, detail="Redis is not configured.")
|
|
|
|
request.app.state.config.SHOW_ADMIN_DETAILS = form_data.SHOW_ADMIN_DETAILS
|
|
request.app.state.config.WEBUI_URL = form_data.WEBUI_URL
|
|
request.app.state.config.ENABLE_SIGNUP = form_data.ENABLE_SIGNUP
|
|
request.app.state.config.ENABLE_SIGNUP_VERIFY = form_data.ENABLE_SIGNUP_VERIFY
|
|
request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST = (
|
|
form_data.SIGNUP_EMAIL_DOMAIN_WHITELIST
|
|
)
|
|
request.app.state.config.SMTP_HOST = form_data.SMTP_HOST
|
|
request.app.state.config.SMTP_PORT = form_data.SMTP_PORT
|
|
request.app.state.config.SMTP_USERNAME = form_data.SMTP_USERNAME
|
|
request.app.state.config.SMTP_PASSWORD = form_data.SMTP_PASSWORD
|
|
request.app.state.config.SMTP_SENT_FROM = form_data.SMTP_SENT_FROM
|
|
|
|
request.app.state.config.ENABLE_API_KEYS = form_data.ENABLE_API_KEYS
|
|
request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS = (
|
|
form_data.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS
|
|
)
|
|
request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS = (
|
|
form_data.API_KEYS_ALLOWED_ENDPOINTS
|
|
)
|
|
|
|
request.app.state.config.ENABLE_FOLDERS = form_data.ENABLE_FOLDERS
|
|
request.app.state.config.ENABLE_CHANNELS = form_data.ENABLE_CHANNELS
|
|
request.app.state.config.ENABLE_NOTES = form_data.ENABLE_NOTES
|
|
|
|
if form_data.DEFAULT_USER_ROLE in ["pending", "user", "admin"]:
|
|
request.app.state.config.DEFAULT_USER_ROLE = form_data.DEFAULT_USER_ROLE
|
|
|
|
request.app.state.config.DEFAULT_GROUP_ID = form_data.DEFAULT_GROUP_ID
|
|
|
|
pattern = r"^(-1|0|(-?\d+(\.\d+)?)(ms|s|m|h|d|w))$"
|
|
|
|
# Check if the input string matches the pattern
|
|
if re.match(pattern, form_data.JWT_EXPIRES_IN):
|
|
request.app.state.config.JWT_EXPIRES_IN = form_data.JWT_EXPIRES_IN
|
|
|
|
request.app.state.config.ENABLE_COMMUNITY_SHARING = (
|
|
form_data.ENABLE_COMMUNITY_SHARING
|
|
)
|
|
request.app.state.config.ENABLE_MESSAGE_RATING = form_data.ENABLE_MESSAGE_RATING
|
|
|
|
request.app.state.config.ENABLE_USER_WEBHOOKS = form_data.ENABLE_USER_WEBHOOKS
|
|
|
|
request.app.state.config.PENDING_USER_OVERLAY_TITLE = (
|
|
form_data.PENDING_USER_OVERLAY_TITLE
|
|
)
|
|
request.app.state.config.PENDING_USER_OVERLAY_CONTENT = (
|
|
form_data.PENDING_USER_OVERLAY_CONTENT
|
|
)
|
|
|
|
request.app.state.config.RESPONSE_WATERMARK = form_data.RESPONSE_WATERMARK
|
|
|
|
return {
|
|
"SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS,
|
|
"WEBUI_URL": request.app.state.config.WEBUI_URL,
|
|
"ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP,
|
|
"ENABLE_SIGNUP_VERIFY": request.app.state.config.ENABLE_SIGNUP_VERIFY,
|
|
"SIGNUP_EMAIL_DOMAIN_WHITELIST": request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST,
|
|
"SMTP_HOST": request.app.state.config.SMTP_HOST,
|
|
"SMTP_PORT": request.app.state.config.SMTP_PORT,
|
|
"SMTP_USERNAME": request.app.state.config.SMTP_USERNAME,
|
|
"SMTP_PASSWORD": request.app.state.config.SMTP_PASSWORD,
|
|
"SMTP_SENT_FROM": request.app.state.config.SMTP_SENT_FROM,
|
|
"ENABLE_API_KEYS": request.app.state.config.ENABLE_API_KEYS,
|
|
"ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS": request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS,
|
|
"API_KEYS_ALLOWED_ENDPOINTS": request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS,
|
|
"DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE,
|
|
"DEFAULT_GROUP_ID": request.app.state.config.DEFAULT_GROUP_ID,
|
|
"JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN,
|
|
"ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING,
|
|
"ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING,
|
|
"ENABLE_FOLDERS": request.app.state.config.ENABLE_FOLDERS,
|
|
"ENABLE_CHANNELS": request.app.state.config.ENABLE_CHANNELS,
|
|
"ENABLE_NOTES": request.app.state.config.ENABLE_NOTES,
|
|
"ENABLE_USER_WEBHOOKS": request.app.state.config.ENABLE_USER_WEBHOOKS,
|
|
"PENDING_USER_OVERLAY_TITLE": request.app.state.config.PENDING_USER_OVERLAY_TITLE,
|
|
"PENDING_USER_OVERLAY_CONTENT": request.app.state.config.PENDING_USER_OVERLAY_CONTENT,
|
|
"RESPONSE_WATERMARK": request.app.state.config.RESPONSE_WATERMARK,
|
|
}
|
|
|
|
|
|
class LdapServerConfig(BaseModel):
|
|
label: str
|
|
host: str
|
|
port: Optional[int] = None
|
|
attribute_for_mail: str = "mail"
|
|
attribute_for_username: str = "uid"
|
|
app_dn: str
|
|
app_dn_password: str
|
|
search_base: str
|
|
search_filters: str = ""
|
|
use_tls: bool = True
|
|
certificate_path: Optional[str] = None
|
|
validate_cert: bool = True
|
|
ciphers: Optional[str] = "ALL"
|
|
|
|
|
|
@router.get("/admin/config/ldap/server", response_model=LdapServerConfig)
|
|
async def get_ldap_server(request: Request, user=Depends(get_admin_user)):
|
|
return {
|
|
"label": request.app.state.config.LDAP_SERVER_LABEL,
|
|
"host": request.app.state.config.LDAP_SERVER_HOST,
|
|
"port": request.app.state.config.LDAP_SERVER_PORT,
|
|
"attribute_for_mail": request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL,
|
|
"attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME,
|
|
"app_dn": request.app.state.config.LDAP_APP_DN,
|
|
"app_dn_password": request.app.state.config.LDAP_APP_PASSWORD,
|
|
"search_base": request.app.state.config.LDAP_SEARCH_BASE,
|
|
"search_filters": request.app.state.config.LDAP_SEARCH_FILTERS,
|
|
"use_tls": request.app.state.config.LDAP_USE_TLS,
|
|
"certificate_path": request.app.state.config.LDAP_CA_CERT_FILE,
|
|
"validate_cert": request.app.state.config.LDAP_VALIDATE_CERT,
|
|
"ciphers": request.app.state.config.LDAP_CIPHERS,
|
|
}
|
|
|
|
|
|
@router.post("/admin/config/ldap/server")
|
|
async def update_ldap_server(
|
|
request: Request, form_data: LdapServerConfig, user=Depends(get_admin_user)
|
|
):
|
|
required_fields = [
|
|
"label",
|
|
"host",
|
|
"attribute_for_mail",
|
|
"attribute_for_username",
|
|
"app_dn",
|
|
"app_dn_password",
|
|
"search_base",
|
|
]
|
|
for key in required_fields:
|
|
value = getattr(form_data, key)
|
|
if not value:
|
|
raise HTTPException(400, detail=f"Required field {key} is empty")
|
|
|
|
request.app.state.config.LDAP_SERVER_LABEL = form_data.label
|
|
request.app.state.config.LDAP_SERVER_HOST = form_data.host
|
|
request.app.state.config.LDAP_SERVER_PORT = form_data.port
|
|
request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL = form_data.attribute_for_mail
|
|
request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME = (
|
|
form_data.attribute_for_username
|
|
)
|
|
request.app.state.config.LDAP_APP_DN = form_data.app_dn
|
|
request.app.state.config.LDAP_APP_PASSWORD = form_data.app_dn_password
|
|
request.app.state.config.LDAP_SEARCH_BASE = form_data.search_base
|
|
request.app.state.config.LDAP_SEARCH_FILTERS = form_data.search_filters
|
|
request.app.state.config.LDAP_USE_TLS = form_data.use_tls
|
|
request.app.state.config.LDAP_CA_CERT_FILE = form_data.certificate_path
|
|
request.app.state.config.LDAP_VALIDATE_CERT = form_data.validate_cert
|
|
request.app.state.config.LDAP_CIPHERS = form_data.ciphers
|
|
|
|
return {
|
|
"label": request.app.state.config.LDAP_SERVER_LABEL,
|
|
"host": request.app.state.config.LDAP_SERVER_HOST,
|
|
"port": request.app.state.config.LDAP_SERVER_PORT,
|
|
"attribute_for_mail": request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL,
|
|
"attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME,
|
|
"app_dn": request.app.state.config.LDAP_APP_DN,
|
|
"app_dn_password": request.app.state.config.LDAP_APP_PASSWORD,
|
|
"search_base": request.app.state.config.LDAP_SEARCH_BASE,
|
|
"search_filters": request.app.state.config.LDAP_SEARCH_FILTERS,
|
|
"use_tls": request.app.state.config.LDAP_USE_TLS,
|
|
"certificate_path": request.app.state.config.LDAP_CA_CERT_FILE,
|
|
"validate_cert": request.app.state.config.LDAP_VALIDATE_CERT,
|
|
"ciphers": request.app.state.config.LDAP_CIPHERS,
|
|
}
|
|
|
|
|
|
@router.get("/admin/config/ldap")
|
|
async def get_ldap_config(request: Request, user=Depends(get_admin_user)):
|
|
return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP}
|
|
|
|
|
|
class LdapConfigForm(BaseModel):
|
|
enable_ldap: Optional[bool] = None
|
|
|
|
|
|
@router.post("/admin/config/ldap")
|
|
async def update_ldap_config(
|
|
request: Request, form_data: LdapConfigForm, user=Depends(get_admin_user)
|
|
):
|
|
request.app.state.config.ENABLE_LDAP = form_data.enable_ldap
|
|
return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP}
|
|
|
|
|
|
############################
|
|
# Branding Config
|
|
############################
|
|
|
|
|
|
@router.get("/admin/config/branding")
|
|
async def get_branding_config(request: Request, user=Depends(get_admin_user)):
|
|
def get_value(config_obj):
|
|
if hasattr(config_obj, "value"):
|
|
return config_obj.value
|
|
return config_obj
|
|
|
|
return {
|
|
"ORGANIZATION_NAME": get_value(request.app.state.config.BRANDING_ORGANIZATION_NAME),
|
|
"CUSTOM_NAME": get_value(request.app.state.config.BRANDING_CUSTOM_NAME),
|
|
"FAVICON_ICO": get_value(request.app.state.config.BRANDING_FAVICON_ICO),
|
|
"FAVICON_PNG": get_value(request.app.state.config.BRANDING_FAVICON_PNG),
|
|
"FAVICON_DARK_PNG": get_value(request.app.state.config.BRANDING_FAVICON_DARK_PNG),
|
|
"FAVICON_SVG": get_value(request.app.state.config.BRANDING_FAVICON_SVG),
|
|
"AUTH_LOGO_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_URL),
|
|
"SIDEBAR_LOGO_URL": get_value(request.app.state.config.BRANDING_SIDEBAR_LOGO_URL),
|
|
"AUTH_LOGO_LINK_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_LINK_URL),
|
|
}
|
|
|
|
|
|
class BrandingConfig(BaseModel):
|
|
ORGANIZATION_NAME: Optional[str] = ""
|
|
CUSTOM_NAME: Optional[str] = ""
|
|
FAVICON_ICO: Optional[str] = ""
|
|
FAVICON_PNG: Optional[str] = ""
|
|
FAVICON_DARK_PNG: Optional[str] = ""
|
|
FAVICON_SVG: Optional[str] = ""
|
|
AUTH_LOGO_URL: Optional[str] = ""
|
|
SIDEBAR_LOGO_URL: Optional[str] = ""
|
|
AUTH_LOGO_LINK_URL: Optional[str] = ""
|
|
|
|
|
|
@router.post("/admin/config/branding")
|
|
async def update_branding_config(
|
|
request: Request, form_data: BrandingConfig, user=Depends(get_admin_user)
|
|
):
|
|
# Helper function to update config value
|
|
# Must access _state directly to get PersistentConfig object (not the value)
|
|
def update_config(key: str, new_value):
|
|
config_obj = request.app.state.config._state.get(key)
|
|
if config_obj and hasattr(config_obj, "value") and hasattr(config_obj, "save"):
|
|
config_obj.value = new_value
|
|
config_obj.save()
|
|
log.info(f"Branding: Saved config {key} = {new_value}")
|
|
else:
|
|
log.warning(f"Branding: config key '{key}' not found or not PersistentConfig")
|
|
|
|
# Update config values
|
|
update_config("BRANDING_ORGANIZATION_NAME", form_data.ORGANIZATION_NAME)
|
|
update_config("BRANDING_CUSTOM_NAME", form_data.CUSTOM_NAME)
|
|
update_config("BRANDING_FAVICON_ICO", form_data.FAVICON_ICO)
|
|
update_config("BRANDING_FAVICON_PNG", form_data.FAVICON_PNG)
|
|
update_config("BRANDING_FAVICON_DARK_PNG", form_data.FAVICON_DARK_PNG)
|
|
update_config("BRANDING_FAVICON_SVG", form_data.FAVICON_SVG)
|
|
update_config("BRANDING_AUTH_LOGO_URL", form_data.AUTH_LOGO_URL)
|
|
update_config("BRANDING_SIDEBAR_LOGO_URL", form_data.SIDEBAR_LOGO_URL)
|
|
update_config("BRANDING_AUTH_LOGO_LINK_URL", form_data.AUTH_LOGO_LINK_URL)
|
|
|
|
# Apply branding changes immediately
|
|
apply_branding(request.app)
|
|
|
|
# Helper to get value
|
|
def get_value(config_obj):
|
|
if hasattr(config_obj, "value"):
|
|
return config_obj.value
|
|
return config_obj
|
|
|
|
return {
|
|
"ORGANIZATION_NAME": get_value(request.app.state.config.BRANDING_ORGANIZATION_NAME),
|
|
"CUSTOM_NAME": get_value(request.app.state.config.BRANDING_CUSTOM_NAME),
|
|
"FAVICON_ICO": get_value(request.app.state.config.BRANDING_FAVICON_ICO),
|
|
"FAVICON_PNG": get_value(request.app.state.config.BRANDING_FAVICON_PNG),
|
|
"FAVICON_DARK_PNG": get_value(request.app.state.config.BRANDING_FAVICON_DARK_PNG),
|
|
"FAVICON_SVG": get_value(request.app.state.config.BRANDING_FAVICON_SVG),
|
|
"AUTH_LOGO_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_URL),
|
|
"SIDEBAR_LOGO_URL": get_value(request.app.state.config.BRANDING_SIDEBAR_LOGO_URL),
|
|
"AUTH_LOGO_LINK_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_LINK_URL),
|
|
}
|
|
|
|
|
|
############################
|
|
# Email Templates
|
|
############################
|
|
|
|
|
|
@router.get("/admin/config/email-templates")
|
|
async def get_email_templates(request: Request, user=Depends(get_admin_user)):
|
|
from open_webui.config import (
|
|
DEFAULT_EMAIL_VERIFY_TEMPLATE,
|
|
DEFAULT_EMAIL_CODE_TEMPLATE,
|
|
)
|
|
|
|
def get_value(config_obj):
|
|
if hasattr(config_obj, "value"):
|
|
return config_obj.value
|
|
return config_obj
|
|
|
|
return {
|
|
"EMAIL_VERIFY_TEMPLATE": get_value(
|
|
request.app.state.config.EMAIL_VERIFY_TEMPLATE
|
|
),
|
|
"EMAIL_CODE_TEMPLATE": get_value(
|
|
request.app.state.config.EMAIL_CODE_TEMPLATE
|
|
),
|
|
"DEFAULT_EMAIL_VERIFY_TEMPLATE": DEFAULT_EMAIL_VERIFY_TEMPLATE,
|
|
"DEFAULT_EMAIL_CODE_TEMPLATE": DEFAULT_EMAIL_CODE_TEMPLATE,
|
|
}
|
|
|
|
|
|
class EmailTemplatesConfig(BaseModel):
|
|
EMAIL_VERIFY_TEMPLATE: Optional[str] = ""
|
|
EMAIL_CODE_TEMPLATE: Optional[str] = ""
|
|
|
|
|
|
@router.post("/admin/config/email-templates")
|
|
async def update_email_templates(
|
|
request: Request, form_data: EmailTemplatesConfig, user=Depends(get_admin_user)
|
|
):
|
|
def update_config(key: str, new_value):
|
|
config_obj = request.app.state.config._state.get(key)
|
|
if config_obj and hasattr(config_obj, "value") and hasattr(config_obj, "save"):
|
|
config_obj.value = new_value
|
|
config_obj.save()
|
|
log.info(f"Email Templates: Saved config {key}")
|
|
|
|
update_config("EMAIL_VERIFY_TEMPLATE", form_data.EMAIL_VERIFY_TEMPLATE)
|
|
update_config("EMAIL_CODE_TEMPLATE", form_data.EMAIL_CODE_TEMPLATE)
|
|
|
|
def get_value(config_obj):
|
|
if hasattr(config_obj, "value"):
|
|
return config_obj.value
|
|
return config_obj
|
|
|
|
return {
|
|
"EMAIL_VERIFY_TEMPLATE": get_value(
|
|
request.app.state.config.EMAIL_VERIFY_TEMPLATE
|
|
),
|
|
"EMAIL_CODE_TEMPLATE": get_value(
|
|
request.app.state.config.EMAIL_CODE_TEMPLATE
|
|
),
|
|
}
|
|
|
|
|
|
############################
|
|
# API Key
|
|
############################
|
|
|
|
|
|
# create api key
|
|
@router.post("/api_key", response_model=ApiKey)
|
|
async def generate_api_key(request: Request, user=Depends(get_current_user)):
|
|
if not request.app.state.config.ENABLE_API_KEYS or not has_permission(
|
|
user.id, "features.api_keys", request.app.state.config.USER_PERMISSIONS
|
|
):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=ERROR_MESSAGES.API_KEY_CREATION_NOT_ALLOWED,
|
|
)
|
|
|
|
api_key = create_api_key()
|
|
success = Users.update_user_api_key_by_id(user.id, api_key)
|
|
|
|
if success:
|
|
return {
|
|
"api_key": api_key,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_API_KEY_ERROR)
|
|
|
|
|
|
# delete api key
|
|
@router.delete("/api_key", response_model=bool)
|
|
async def delete_api_key(user=Depends(get_current_user)):
|
|
return Users.delete_user_api_key_by_id(user.id)
|
|
|
|
|
|
# get api key
|
|
@router.get("/api_key", response_model=ApiKey)
|
|
async def get_api_key(user=Depends(get_current_user)):
|
|
api_key = Users.get_user_api_key_by_id(user.id)
|
|
if api_key:
|
|
return {
|
|
"api_key": api_key,
|
|
}
|
|
else:
|
|
raise HTTPException(404, detail=ERROR_MESSAGES.API_KEY_NOT_FOUND)
|