import re import uuid import time import datetime import logging from decimal import Decimal from aiohttp import ClientSession import urllib from open_webui.models.auths import ( AddUserForm, ApiKey, Auths, Token, LdapForm, SigninForm, SigninResponse, SignupForm, UpdatePasswordForm, ) from open_webui.models.users import ( UserProfileImageResponse, Users, UserModel, UpdateProfileForm, UserStatus, ) from open_webui.models.groups import Groups from open_webui.models.subscriptions import UserSubscriptions from open_webui.models.oauth_sessions import OAuthSessions from open_webui.constants import ERROR_MESSAGES, WEBHOOK_MESSAGES from open_webui.env import ( WEBUI_AUTH, WEBUI_AUTH_TRUSTED_EMAIL_HEADER, WEBUI_AUTH_TRUSTED_NAME_HEADER, WEBUI_AUTH_TRUSTED_GROUPS_HEADER, WEBUI_AUTH_COOKIE_SAME_SITE, WEBUI_AUTH_COOKIE_SECURE, WEBUI_AUTH_SIGNOUT_REDIRECT_URL, ENABLE_INITIAL_ADMIN_SIGNUP, REDIS_URL, REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT, REDIS_CLUSTER, ) from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import RedirectResponse, Response, JSONResponse from open_webui.config import ( OPENID_PROVIDER_URL, ENABLE_OAUTH_SIGNUP, ENABLE_LDAP, ENABLE_PASSWORD_AUTH, ) from pydantic import BaseModel, Field from open_webui.utils.misc import parse_duration, validate_email_format from open_webui.utils.auth import ( validate_password, verify_password, decode_token, invalidate_token, create_api_key, create_token, get_admin_user, get_verified_user, get_current_user, get_password_hash, get_http_authorization_cred, send_verify_email, verify_email_by_code, apply_branding, send_signup_email_code, verify_signup_email_code, ) from open_webui.utils.webhook import post_webhook from open_webui.utils.access_control import get_permissions, has_permission from open_webui.utils.groups import apply_default_group_assignment from open_webui.utils.redis import ( get_redis_client, get_redis_connection, get_sentinels_from_env, ) from open_webui.utils.rate_limit import RateLimiter from typing import Optional, List from ssl import CERT_NONE, CERT_REQUIRED, PROTOCOL_TLS from ldap3 import Server, Connection, NONE, Tls from ldap3.utils.conv import escape_filter_chars router = APIRouter() log = logging.getLogger(__name__) signin_rate_limiter = RateLimiter( redis_client=get_redis_client(), limit=5 * 3, window=60 * 3 ) ############################ # GetSessionUser ############################ class SessionUserResponse(Token, UserProfileImageResponse): expires_at: Optional[int] = None permissions: Optional[dict] = None class SessionUserInfoResponse(SessionUserResponse, UserStatus): bio: Optional[str] = None gender: Optional[str] = None date_of_birth: Optional[datetime.date] = None @router.get("/", response_model=SessionUserInfoResponse) async def get_session_user( request: Request, response: Response, user: UserModel = Depends(get_current_user) ): auth_header = request.headers.get("Authorization") auth_token = get_http_authorization_cred(auth_header) token = auth_token.credentials data = decode_token(token) expires_at = None if data: expires_at = data.get("exp") if (expires_at is not None) and int(time.time()) > expires_at: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.INVALID_TOKEN, ) # Set the cookie token response.set_cookie( key="token", value=token, expires=( datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc) if expires_at else None ), httponly=True, # Ensures the cookie is not accessible via JavaScript samesite=WEBUI_AUTH_COOKIE_SAME_SITE, secure=WEBUI_AUTH_COOKIE_SECURE, ) user_permissions = get_permissions( user.id, request.app.state.config.USER_PERMISSIONS ) # init subscription for user (assigns default free plan if not exists) UserSubscriptions.get_or_create_subscription(user.id) return { "token": token, "token_type": "Bearer", "expires_at": expires_at, "id": user.id, "email": user.email, "name": user.name, "role": user.role, "profile_image_url": user.profile_image_url, "bio": user.bio, "gender": user.gender, "date_of_birth": user.date_of_birth, "status_emoji": user.status_emoji, "status_message": user.status_message, "status_expires_at": user.status_expires_at, "permissions": user_permissions, } ############################ # Update Profile ############################ @router.post("/update/profile", response_model=UserProfileImageResponse) async def update_profile( form_data: UpdateProfileForm, session_user=Depends(get_verified_user) ): if session_user: user = Users.update_user_by_id( session_user.id, form_data.model_dump(), ) if user: return user else: raise HTTPException(400, detail=ERROR_MESSAGES.DEFAULT()) else: raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED) ############################ # Update Password ############################ @router.post("/update/password", response_model=bool) async def update_password( form_data: UpdatePasswordForm, session_user=Depends(get_current_user) ): if WEBUI_AUTH_TRUSTED_EMAIL_HEADER: raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED) if session_user: user = Auths.authenticate_user( session_user.email, lambda pw: verify_password(form_data.password, pw) ) if user: try: validate_password(form_data.password) except Exception as e: raise HTTPException(400, detail=str(e)) hashed = get_password_hash(form_data.new_password) return Auths.update_user_password_by_id(user.id, hashed) else: raise HTTPException(400, detail=ERROR_MESSAGES.INCORRECT_PASSWORD) else: raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED) ############################ # LDAP Authentication ############################ @router.post("/ldap", response_model=SessionUserResponse) async def ldap_auth(request: Request, response: Response, form_data: LdapForm): # Security checks FIRST - before loading any config if not request.app.state.config.ENABLE_LDAP: raise HTTPException(400, detail="LDAP authentication is not enabled") if not ENABLE_PASSWORD_AUTH: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) # NOW load LDAP config variables LDAP_SERVER_LABEL = request.app.state.config.LDAP_SERVER_LABEL LDAP_SERVER_HOST = request.app.state.config.LDAP_SERVER_HOST LDAP_SERVER_PORT = request.app.state.config.LDAP_SERVER_PORT LDAP_ATTRIBUTE_FOR_MAIL = request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL LDAP_ATTRIBUTE_FOR_USERNAME = request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME LDAP_SEARCH_BASE = request.app.state.config.LDAP_SEARCH_BASE LDAP_SEARCH_FILTERS = request.app.state.config.LDAP_SEARCH_FILTERS LDAP_APP_DN = request.app.state.config.LDAP_APP_DN LDAP_APP_PASSWORD = request.app.state.config.LDAP_APP_PASSWORD LDAP_USE_TLS = request.app.state.config.LDAP_USE_TLS LDAP_CA_CERT_FILE = request.app.state.config.LDAP_CA_CERT_FILE LDAP_VALIDATE_CERT = ( CERT_REQUIRED if request.app.state.config.LDAP_VALIDATE_CERT else CERT_NONE ) LDAP_CIPHERS = ( request.app.state.config.LDAP_CIPHERS if request.app.state.config.LDAP_CIPHERS else "ALL" ) try: tls = Tls( validate=LDAP_VALIDATE_CERT, version=PROTOCOL_TLS, ca_certs_file=LDAP_CA_CERT_FILE, ciphers=LDAP_CIPHERS, ) except Exception as e: log.error(f"TLS configuration error: {str(e)}") raise HTTPException(400, detail="Failed to configure TLS for LDAP connection.") try: server = Server( host=LDAP_SERVER_HOST, port=LDAP_SERVER_PORT, get_info=NONE, use_ssl=LDAP_USE_TLS, tls=tls, ) connection_app = Connection( server, LDAP_APP_DN, LDAP_APP_PASSWORD, auto_bind="NONE", authentication="SIMPLE" if LDAP_APP_DN else "ANONYMOUS", ) if not connection_app.bind(): raise HTTPException(400, detail="Application account bind failed") ENABLE_LDAP_GROUP_MANAGEMENT = ( request.app.state.config.ENABLE_LDAP_GROUP_MANAGEMENT ) ENABLE_LDAP_GROUP_CREATION = request.app.state.config.ENABLE_LDAP_GROUP_CREATION LDAP_ATTRIBUTE_FOR_GROUPS = request.app.state.config.LDAP_ATTRIBUTE_FOR_GROUPS search_attributes = [ f"{LDAP_ATTRIBUTE_FOR_USERNAME}", f"{LDAP_ATTRIBUTE_FOR_MAIL}", "cn", ] if ENABLE_LDAP_GROUP_MANAGEMENT: search_attributes.append(f"{LDAP_ATTRIBUTE_FOR_GROUPS}") log.info( f"LDAP Group Management enabled. Adding {LDAP_ATTRIBUTE_FOR_GROUPS} to search attributes" ) log.info(f"LDAP search attributes: {search_attributes}") search_success = connection_app.search( search_base=LDAP_SEARCH_BASE, search_filter=f"(&({LDAP_ATTRIBUTE_FOR_USERNAME}={escape_filter_chars(form_data.user.lower())}){LDAP_SEARCH_FILTERS})", attributes=search_attributes, ) if not search_success or not connection_app.entries: raise HTTPException(400, detail="User not found in the LDAP server") entry = connection_app.entries[0] entry_username = entry[f"{LDAP_ATTRIBUTE_FOR_USERNAME}"].value email = entry[ f"{LDAP_ATTRIBUTE_FOR_MAIL}" ].value # retrieve the Attribute value username_list = [] # list of usernames from LDAP attribute if isinstance(entry_username, list): username_list = [str(name).lower() for name in entry_username] else: username_list = [str(entry_username).lower()] # TODO: support multiple emails if LDAP returns a list if not email: raise HTTPException(400, "User does not have a valid email address.") elif isinstance(email, str): email = email.lower() elif isinstance(email, list): email = email[0].lower() else: email = str(email).lower() cn = str(entry["cn"]) # common name user_dn = entry.entry_dn # user distinguished name user_groups = [] if ENABLE_LDAP_GROUP_MANAGEMENT and LDAP_ATTRIBUTE_FOR_GROUPS in entry: group_dns = entry[LDAP_ATTRIBUTE_FOR_GROUPS] log.info(f"LDAP raw group DNs for user {username_list}: {group_dns}") if group_dns: log.info(f"LDAP group_dns original: {group_dns}") log.info(f"LDAP group_dns type: {type(group_dns)}") log.info(f"LDAP group_dns length: {len(group_dns)}") if hasattr(group_dns, "value"): group_dns = group_dns.value log.info(f"Extracted .value property: {group_dns}") elif hasattr(group_dns, "__iter__") and not isinstance( group_dns, (str, bytes) ): group_dns = list(group_dns) log.info(f"Converted to list: {group_dns}") if isinstance(group_dns, list): group_dns = [str(item) for item in group_dns] else: group_dns = [str(group_dns)] log.info( f"LDAP group_dns after processing - type: {type(group_dns)}, length: {len(group_dns)}" ) for group_idx, group_dn in enumerate(group_dns): group_dn = str(group_dn) log.info(f"Processing group DN #{group_idx + 1}: {group_dn}") try: group_cn = None for item in group_dn.split(","): item = item.strip() if item.upper().startswith("CN="): group_cn = item[3:] break if group_cn: user_groups.append(group_cn) else: log.warning( f"Could not extract CN from group DN: {group_dn}" ) except Exception as e: log.warning( f"Failed to extract group name from DN {group_dn}: {e}" ) log.info( f"LDAP groups for user {username_list}: {user_groups} (total: {len(user_groups)})" ) else: log.info(f"No groups found for user {username_list}") elif ENABLE_LDAP_GROUP_MANAGEMENT: log.warning( f"LDAP Group Management enabled but {LDAP_ATTRIBUTE_FOR_GROUPS} attribute not found in user entry" ) if username_list and form_data.user.lower() in username_list: connection_user = Connection( server, user_dn, form_data.password, auto_bind="NONE", authentication="SIMPLE", ) if not connection_user.bind(): raise HTTPException(400, "Authentication failed.") user = Users.get_user_by_email(email) if not user: try: role = ( "admin" if not Users.has_users() else request.app.state.config.DEFAULT_USER_ROLE ) user = Auths.insert_new_auth( email=email, password=str(uuid.uuid4()), name=cn, role=role, ) if not user: raise HTTPException( 500, detail=ERROR_MESSAGES.CREATE_USER_ERROR ) apply_default_group_assignment( request.app.state.config.DEFAULT_GROUP_ID, user.id, ) except HTTPException: raise except Exception as err: log.error(f"LDAP user creation error: {str(err)}") raise HTTPException( 500, detail="Internal error occurred during LDAP user creation." ) user = Auths.authenticate_user_by_email(email) if user: expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN) expires_at = None if expires_delta: expires_at = int(time.time()) + int(expires_delta.total_seconds()) token = create_token( data={"id": user.id}, expires_delta=expires_delta, ) # Set the cookie token response.set_cookie( key="token", value=token, expires=( datetime.datetime.fromtimestamp( expires_at, datetime.timezone.utc ) if expires_at else None ), httponly=True, # Ensures the cookie is not accessible via JavaScript samesite=WEBUI_AUTH_COOKIE_SAME_SITE, secure=WEBUI_AUTH_COOKIE_SECURE, ) user_permissions = get_permissions( user.id, request.app.state.config.USER_PERMISSIONS ) # init subscription for user UserSubscriptions.get_or_create_subscription(user.id) if ( user.role != "admin" and ENABLE_LDAP_GROUP_MANAGEMENT and user_groups ): if ENABLE_LDAP_GROUP_CREATION: Groups.create_groups_by_group_names(user.id, user_groups) try: Groups.sync_groups_by_group_names(user.id, user_groups) log.info( f"Successfully synced groups for user {user.id}: {user_groups}" ) except Exception as e: log.error(f"Failed to sync groups for user {user.id}: {e}") return { "token": token, "token_type": "Bearer", "expires_at": expires_at, "id": user.id, "email": user.email, "name": user.name, "role": user.role, "profile_image_url": user.profile_image_url, "permissions": user_permissions, } else: raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED) else: raise HTTPException(400, "User record mismatch.") except Exception as e: log.error(f"LDAP authentication error: {str(e)}") raise HTTPException(400, detail="LDAP authentication failed.") ############################ # SignIn ############################ @router.post("/signin", response_model=SessionUserResponse) async def signin(request: Request, response: Response, form_data: SigninForm): if not ENABLE_PASSWORD_AUTH: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) if WEBUI_AUTH_TRUSTED_EMAIL_HEADER: if WEBUI_AUTH_TRUSTED_EMAIL_HEADER not in request.headers: raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_TRUSTED_HEADER) email = request.headers[WEBUI_AUTH_TRUSTED_EMAIL_HEADER].lower() name = email if WEBUI_AUTH_TRUSTED_NAME_HEADER: name = request.headers.get(WEBUI_AUTH_TRUSTED_NAME_HEADER, email) try: name = urllib.parse.unquote(name, encoding="utf-8") except Exception as e: pass if not Users.get_user_by_email(email.lower()): await signup( request, response, SignupForm(email=email, password=str(uuid.uuid4()), name=name), ) user = Auths.authenticate_user_by_email(email) if WEBUI_AUTH_TRUSTED_GROUPS_HEADER and user and user.role != "admin": group_names = request.headers.get( WEBUI_AUTH_TRUSTED_GROUPS_HEADER, "" ).split(",") group_names = [name.strip() for name in group_names if name.strip()] if group_names: Groups.sync_groups_by_group_names(user.id, group_names) elif WEBUI_AUTH == False: admin_email = "admin@localhost" admin_password = "admin" if Users.get_user_by_email(admin_email.lower()): user = Auths.authenticate_user( admin_email.lower(), lambda pw: verify_password(admin_password, pw) ) else: if Users.has_users(): raise HTTPException(400, detail=ERROR_MESSAGES.EXISTING_USERS) await signup( request, response, SignupForm(email=admin_email, password=admin_password, name="User"), ) user = Auths.authenticate_user( admin_email.lower(), lambda pw: verify_password(admin_password, pw) ) else: if signin_rate_limiter.is_limited(form_data.email.lower()): raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=ERROR_MESSAGES.RATE_LIMIT_EXCEEDED, ) password_bytes = form_data.password.encode("utf-8") if len(password_bytes) > 72: # TODO: Implement other hashing algorithms that support longer passwords log.info("Password too long, truncating to 72 bytes for bcrypt") password_bytes = password_bytes[:72] # decode safely — ignore incomplete UTF-8 sequences form_data.password = password_bytes.decode("utf-8", errors="ignore") user = Auths.authenticate_user( form_data.email.lower(), lambda pw: verify_password(form_data.password, pw) ) if user: expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN) expires_at = None if expires_delta: expires_at = int(time.time()) + int(expires_delta.total_seconds()) token = create_token( data={"id": user.id}, expires_delta=expires_delta, ) datetime_expires_at = ( datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc) if expires_at else None ) # Set the cookie token response.set_cookie( key="token", value=token, expires=datetime_expires_at, httponly=True, # Ensures the cookie is not accessible via JavaScript samesite=WEBUI_AUTH_COOKIE_SAME_SITE, secure=WEBUI_AUTH_COOKIE_SECURE, ) user_permissions = get_permissions( user.id, request.app.state.config.USER_PERMISSIONS ) # init subscription for user UserSubscriptions.get_or_create_subscription(user.id) return { "token": token, "token_type": "Bearer", "expires_at": expires_at, "id": user.id, "email": user.email, "name": user.name, "role": user.role, "profile_image_url": user.profile_image_url, "permissions": user_permissions, } else: raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED) ############################ # Send Email Code ############################ class SendEmailCodeForm(BaseModel): email: str @router.post("/send_email_code") async def send_email_code(request: Request, form_data: SendEmailCodeForm): if not validate_email_format(form_data.email.lower()): raise HTTPException( status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT ) # check if email already registered if Users.get_user_by_email(form_data.email.lower()): raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN) try: send_signup_email_code(form_data.email.lower()) return {"success": True, "message": "Verification code sent"} except Exception as e: log.error(f"Failed to send email code: {e}") raise HTTPException(500, detail="Failed to send verification code") ############################ # SignUp ############################ @router.post("/signup", response_model=SessionUserResponse) async def signup(request: Request, response: Response, form_data: SignupForm): has_users = Users.has_users() if WEBUI_AUTH: if ( not request.app.state.config.ENABLE_SIGNUP or not request.app.state.config.ENABLE_LOGIN_FORM ): if has_users or not ENABLE_INITIAL_ADMIN_SIGNUP: raise HTTPException( status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED ) else: if has_users: raise HTTPException( status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED ) # check for email domain whitelist email_domain_whitelist = [ i.strip() for i in request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST.split(",") if i ] if email_domain_whitelist: domain = form_data.email.split("@")[-1] if domain not in email_domain_whitelist: raise HTTPException( status.HTTP_403_FORBIDDEN, detail=f"Only emails from {request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST} are allowed", ) if not validate_email_format(form_data.email.lower()): raise HTTPException( status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT ) if Users.get_user_by_email(form_data.email.lower()): raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN) # verify email code for non-admin signup if has_users and form_data.email_code: if not verify_signup_email_code(form_data.email.lower(), form_data.email_code): raise HTTPException(400, detail="Invalid or expired verification code") elif has_users and not form_data.email_code: raise HTTPException(400, detail="Verification code is required") try: try: validate_password(form_data.password) except Exception as e: raise HTTPException(400, detail=str(e)) hashed = get_password_hash(form_data.password) if not has_users: role = "admin" elif request.app.state.config.ENABLE_SIGNUP_VERIFY: role = "pending" send_verify_email(email=form_data.email.lower()) else: role = request.app.state.config.DEFAULT_USER_ROLE user = Auths.insert_new_auth( form_data.email.lower(), hashed, form_data.name, form_data.profile_image_url, role, ) if user: expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN) expires_at = None if expires_delta: expires_at = int(time.time()) + int(expires_delta.total_seconds()) token = create_token( data={"id": user.id}, expires_delta=expires_delta, ) datetime_expires_at = ( datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc) if expires_at else None ) # Set the cookie token response.set_cookie( key="token", value=token, expires=datetime_expires_at, httponly=True, # Ensures the cookie is not accessible via JavaScript samesite=WEBUI_AUTH_COOKIE_SAME_SITE, secure=WEBUI_AUTH_COOKIE_SECURE, ) if request.app.state.config.WEBHOOK_URL: await post_webhook( request.app.state.WEBUI_NAME, request.app.state.config.WEBHOOK_URL, WEBHOOK_MESSAGES.USER_SIGNUP(user.name), { "action": "signup", "message": WEBHOOK_MESSAGES.USER_SIGNUP(user.name), "user": user.model_dump_json(exclude_none=True), }, ) user_permissions = get_permissions( user.id, request.app.state.config.USER_PERMISSIONS ) if not has_users: # Disable signup after the first user is created request.app.state.config.ENABLE_SIGNUP = False apply_default_group_assignment( request.app.state.config.DEFAULT_GROUP_ID, user.id, ) # init subscription for user UserSubscriptions.get_or_create_subscription(user.id) return { "token": token, "token_type": "Bearer", "expires_at": expires_at, "id": user.id, "email": user.email, "name": user.name, "role": user.role, "profile_image_url": user.profile_image_url, "permissions": user_permissions, } else: raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR) except Exception as err: log.error(f"Signup error: {str(err)}") raise HTTPException(500, detail="An internal error occurred during signup.") @router.get("/signup_verify/{code}") async def signup_verify(request: Request, code: str): email = verify_email_by_code(code=code) if not email: raise HTTPException(403, detail="Invalid code") user = Users.get_user_by_email(email) if not user: raise HTTPException(404, detail="User not found") Users.update_user_role_by_id(user.id, "user") return RedirectResponse(url=request.app.state.config.WEBUI_URL) @router.get("/signout") async def signout(request: Request, response: Response): # get auth token from headers or cookies token = None auth_header = request.headers.get("Authorization") if auth_header: auth_cred = get_http_authorization_cred(auth_header) token = auth_cred.credentials else: token = request.cookies.get("token") if token: await invalidate_token(request, token) response.delete_cookie("token") response.delete_cookie("oui-session") response.delete_cookie("oauth_id_token") oauth_session_id = request.cookies.get("oauth_session_id") if oauth_session_id: response.delete_cookie("oauth_session_id") session = OAuthSessions.get_session_by_id(oauth_session_id) oauth_server_metadata_url = ( request.app.state.oauth_manager.get_server_metadata_url(session.provider) if session else None ) or OPENID_PROVIDER_URL.value if session and oauth_server_metadata_url: oauth_id_token = session.token.get("id_token") try: async with ClientSession(trust_env=True) as session: async with session.get(oauth_server_metadata_url) as r: if r.status == 200: openid_data = await r.json() logout_url = openid_data.get("end_session_endpoint") if logout_url: return JSONResponse( status_code=200, content={ "status": True, "redirect_url": f"{logout_url}?id_token_hint={oauth_id_token}" + ( f"&post_logout_redirect_uri={WEBUI_AUTH_SIGNOUT_REDIRECT_URL}" if WEBUI_AUTH_SIGNOUT_REDIRECT_URL else "" ), }, headers=response.headers, ) else: raise Exception("Failed to fetch OpenID configuration") except Exception as e: log.error(f"OpenID signout error: {str(e)}") raise HTTPException( status_code=500, detail="Failed to sign out from the OpenID provider.", headers=response.headers, ) if WEBUI_AUTH_SIGNOUT_REDIRECT_URL: return JSONResponse( status_code=200, content={ "status": True, "redirect_url": WEBUI_AUTH_SIGNOUT_REDIRECT_URL, }, headers=response.headers, ) return JSONResponse( status_code=200, content={"status": True}, headers=response.headers ) ############################ # AddUser ############################ @router.post("/add", response_model=SigninResponse) async def add_user( request: Request, form_data: AddUserForm, user=Depends(get_admin_user) ): if not validate_email_format(form_data.email.lower()): raise HTTPException( status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT ) if Users.get_user_by_email(form_data.email.lower()): raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN) try: try: validate_password(form_data.password) except Exception as e: raise HTTPException(400, detail=str(e)) hashed = get_password_hash(form_data.password) user = Auths.insert_new_auth( form_data.email.lower(), hashed, form_data.name, form_data.profile_image_url, form_data.role, ) if user: apply_default_group_assignment( request.app.state.config.DEFAULT_GROUP_ID, user.id, ) token = create_token(data={"id": user.id}) return { "token": token, "token_type": "Bearer", "id": user.id, "email": user.email, "name": user.name, "role": user.role, "profile_image_url": user.profile_image_url, } else: raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR) except Exception as err: log.error(f"Add user error: {str(err)}") raise HTTPException( 500, detail="An internal error occurred while adding the user." ) ############################ # GetAdminDetails ############################ @router.get("/admin/details") async def get_admin_details(request: Request, user=Depends(get_current_user)): if request.app.state.config.SHOW_ADMIN_DETAILS: admin_email = request.app.state.config.ADMIN_EMAIL admin_name = None log.info(f"Admin details - Email: {admin_email}, Name: {admin_name}") if admin_email: admin = Users.get_user_by_email(admin_email) if admin: admin_name = admin.name else: admin = Users.get_first_user() if admin: admin_email = admin.email admin_name = admin.name return { "name": admin_name, "email": admin_email, } else: raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED) ############################ # ToggleSignUp ############################ @router.get("/admin/config") async def get_admin_config(request: Request, user=Depends(get_admin_user)): return { "SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS, "WEBUI_URL": request.app.state.config.WEBUI_URL, "ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP, "ENABLE_SIGNUP_VERIFY": request.app.state.config.ENABLE_SIGNUP_VERIFY, "SIGNUP_EMAIL_DOMAIN_WHITELIST": request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST, "SMTP_HOST": request.app.state.config.SMTP_HOST, "SMTP_PORT": request.app.state.config.SMTP_PORT, "SMTP_USERNAME": request.app.state.config.SMTP_USERNAME, "SMTP_PASSWORD": request.app.state.config.SMTP_PASSWORD, "SMTP_SENT_FROM": request.app.state.config.SMTP_SENT_FROM, "ENABLE_API_KEYS": request.app.state.config.ENABLE_API_KEYS, "ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS": request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS, "API_KEYS_ALLOWED_ENDPOINTS": request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS, "DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE, "DEFAULT_GROUP_ID": request.app.state.config.DEFAULT_GROUP_ID, "JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN, "ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING, "ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING, "ENABLE_FOLDERS": request.app.state.config.ENABLE_FOLDERS, "ENABLE_CHANNELS": request.app.state.config.ENABLE_CHANNELS, "ENABLE_NOTES": request.app.state.config.ENABLE_NOTES, "ENABLE_USER_WEBHOOKS": request.app.state.config.ENABLE_USER_WEBHOOKS, "PENDING_USER_OVERLAY_TITLE": request.app.state.config.PENDING_USER_OVERLAY_TITLE, "PENDING_USER_OVERLAY_CONTENT": request.app.state.config.PENDING_USER_OVERLAY_CONTENT, "RESPONSE_WATERMARK": request.app.state.config.RESPONSE_WATERMARK, } class AdminConfig(BaseModel): SHOW_ADMIN_DETAILS: bool WEBUI_URL: str ENABLE_SIGNUP: bool ENABLE_SIGNUP_VERIFY: bool = Field(default=False) SIGNUP_EMAIL_DOMAIN_WHITELIST: str = Field(default="") SMTP_HOST: str SMTP_PORT: str SMTP_USERNAME: str SMTP_PASSWORD: str SMTP_SENT_FROM: str ENABLE_API_KEYS: bool ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS: bool API_KEYS_ALLOWED_ENDPOINTS: str DEFAULT_USER_ROLE: str DEFAULT_GROUP_ID: str JWT_EXPIRES_IN: str ENABLE_COMMUNITY_SHARING: bool ENABLE_MESSAGE_RATING: bool ENABLE_FOLDERS: bool ENABLE_CHANNELS: bool ENABLE_NOTES: bool ENABLE_USER_WEBHOOKS: bool PENDING_USER_OVERLAY_TITLE: Optional[str] = None PENDING_USER_OVERLAY_CONTENT: Optional[str] = None RESPONSE_WATERMARK: Optional[str] = None @router.post("/admin/config") async def update_admin_config( request: Request, form_data: AdminConfig, user=Depends(get_admin_user) ): # verify redis status if form_data.ENABLE_SIGNUP_VERIFY: # check redis _redis = get_redis_connection( redis_url=REDIS_URL, redis_sentinels=get_sentinels_from_env( REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT ), redis_cluster=REDIS_CLUSTER, ) if not _redis: raise HTTPException(status_code=400, detail="Redis is not configured.") request.app.state.config.SHOW_ADMIN_DETAILS = form_data.SHOW_ADMIN_DETAILS request.app.state.config.WEBUI_URL = form_data.WEBUI_URL request.app.state.config.ENABLE_SIGNUP = form_data.ENABLE_SIGNUP request.app.state.config.ENABLE_SIGNUP_VERIFY = form_data.ENABLE_SIGNUP_VERIFY request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST = ( form_data.SIGNUP_EMAIL_DOMAIN_WHITELIST ) request.app.state.config.SMTP_HOST = form_data.SMTP_HOST request.app.state.config.SMTP_PORT = form_data.SMTP_PORT request.app.state.config.SMTP_USERNAME = form_data.SMTP_USERNAME request.app.state.config.SMTP_PASSWORD = form_data.SMTP_PASSWORD request.app.state.config.SMTP_SENT_FROM = form_data.SMTP_SENT_FROM request.app.state.config.ENABLE_API_KEYS = form_data.ENABLE_API_KEYS request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS = ( form_data.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS ) request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS = ( form_data.API_KEYS_ALLOWED_ENDPOINTS ) request.app.state.config.ENABLE_FOLDERS = form_data.ENABLE_FOLDERS request.app.state.config.ENABLE_CHANNELS = form_data.ENABLE_CHANNELS request.app.state.config.ENABLE_NOTES = form_data.ENABLE_NOTES if form_data.DEFAULT_USER_ROLE in ["pending", "user", "admin"]: request.app.state.config.DEFAULT_USER_ROLE = form_data.DEFAULT_USER_ROLE request.app.state.config.DEFAULT_GROUP_ID = form_data.DEFAULT_GROUP_ID pattern = r"^(-1|0|(-?\d+(\.\d+)?)(ms|s|m|h|d|w))$" # Check if the input string matches the pattern if re.match(pattern, form_data.JWT_EXPIRES_IN): request.app.state.config.JWT_EXPIRES_IN = form_data.JWT_EXPIRES_IN request.app.state.config.ENABLE_COMMUNITY_SHARING = ( form_data.ENABLE_COMMUNITY_SHARING ) request.app.state.config.ENABLE_MESSAGE_RATING = form_data.ENABLE_MESSAGE_RATING request.app.state.config.ENABLE_USER_WEBHOOKS = form_data.ENABLE_USER_WEBHOOKS request.app.state.config.PENDING_USER_OVERLAY_TITLE = ( form_data.PENDING_USER_OVERLAY_TITLE ) request.app.state.config.PENDING_USER_OVERLAY_CONTENT = ( form_data.PENDING_USER_OVERLAY_CONTENT ) request.app.state.config.RESPONSE_WATERMARK = form_data.RESPONSE_WATERMARK return { "SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS, "WEBUI_URL": request.app.state.config.WEBUI_URL, "ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP, "ENABLE_SIGNUP_VERIFY": request.app.state.config.ENABLE_SIGNUP_VERIFY, "SIGNUP_EMAIL_DOMAIN_WHITELIST": request.app.state.config.SIGNUP_EMAIL_DOMAIN_WHITELIST, "SMTP_HOST": request.app.state.config.SMTP_HOST, "SMTP_PORT": request.app.state.config.SMTP_PORT, "SMTP_USERNAME": request.app.state.config.SMTP_USERNAME, "SMTP_PASSWORD": request.app.state.config.SMTP_PASSWORD, "SMTP_SENT_FROM": request.app.state.config.SMTP_SENT_FROM, "ENABLE_API_KEYS": request.app.state.config.ENABLE_API_KEYS, "ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS": request.app.state.config.ENABLE_API_KEYS_ENDPOINT_RESTRICTIONS, "API_KEYS_ALLOWED_ENDPOINTS": request.app.state.config.API_KEYS_ALLOWED_ENDPOINTS, "DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE, "DEFAULT_GROUP_ID": request.app.state.config.DEFAULT_GROUP_ID, "JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN, "ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING, "ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING, "ENABLE_FOLDERS": request.app.state.config.ENABLE_FOLDERS, "ENABLE_CHANNELS": request.app.state.config.ENABLE_CHANNELS, "ENABLE_NOTES": request.app.state.config.ENABLE_NOTES, "ENABLE_USER_WEBHOOKS": request.app.state.config.ENABLE_USER_WEBHOOKS, "PENDING_USER_OVERLAY_TITLE": request.app.state.config.PENDING_USER_OVERLAY_TITLE, "PENDING_USER_OVERLAY_CONTENT": request.app.state.config.PENDING_USER_OVERLAY_CONTENT, "RESPONSE_WATERMARK": request.app.state.config.RESPONSE_WATERMARK, } class LdapServerConfig(BaseModel): label: str host: str port: Optional[int] = None attribute_for_mail: str = "mail" attribute_for_username: str = "uid" app_dn: str app_dn_password: str search_base: str search_filters: str = "" use_tls: bool = True certificate_path: Optional[str] = None validate_cert: bool = True ciphers: Optional[str] = "ALL" @router.get("/admin/config/ldap/server", response_model=LdapServerConfig) async def get_ldap_server(request: Request, user=Depends(get_admin_user)): return { "label": request.app.state.config.LDAP_SERVER_LABEL, "host": request.app.state.config.LDAP_SERVER_HOST, "port": request.app.state.config.LDAP_SERVER_PORT, "attribute_for_mail": request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL, "attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME, "app_dn": request.app.state.config.LDAP_APP_DN, "app_dn_password": request.app.state.config.LDAP_APP_PASSWORD, "search_base": request.app.state.config.LDAP_SEARCH_BASE, "search_filters": request.app.state.config.LDAP_SEARCH_FILTERS, "use_tls": request.app.state.config.LDAP_USE_TLS, "certificate_path": request.app.state.config.LDAP_CA_CERT_FILE, "validate_cert": request.app.state.config.LDAP_VALIDATE_CERT, "ciphers": request.app.state.config.LDAP_CIPHERS, } @router.post("/admin/config/ldap/server") async def update_ldap_server( request: Request, form_data: LdapServerConfig, user=Depends(get_admin_user) ): required_fields = [ "label", "host", "attribute_for_mail", "attribute_for_username", "app_dn", "app_dn_password", "search_base", ] for key in required_fields: value = getattr(form_data, key) if not value: raise HTTPException(400, detail=f"Required field {key} is empty") request.app.state.config.LDAP_SERVER_LABEL = form_data.label request.app.state.config.LDAP_SERVER_HOST = form_data.host request.app.state.config.LDAP_SERVER_PORT = form_data.port request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL = form_data.attribute_for_mail request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME = ( form_data.attribute_for_username ) request.app.state.config.LDAP_APP_DN = form_data.app_dn request.app.state.config.LDAP_APP_PASSWORD = form_data.app_dn_password request.app.state.config.LDAP_SEARCH_BASE = form_data.search_base request.app.state.config.LDAP_SEARCH_FILTERS = form_data.search_filters request.app.state.config.LDAP_USE_TLS = form_data.use_tls request.app.state.config.LDAP_CA_CERT_FILE = form_data.certificate_path request.app.state.config.LDAP_VALIDATE_CERT = form_data.validate_cert request.app.state.config.LDAP_CIPHERS = form_data.ciphers return { "label": request.app.state.config.LDAP_SERVER_LABEL, "host": request.app.state.config.LDAP_SERVER_HOST, "port": request.app.state.config.LDAP_SERVER_PORT, "attribute_for_mail": request.app.state.config.LDAP_ATTRIBUTE_FOR_MAIL, "attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME, "app_dn": request.app.state.config.LDAP_APP_DN, "app_dn_password": request.app.state.config.LDAP_APP_PASSWORD, "search_base": request.app.state.config.LDAP_SEARCH_BASE, "search_filters": request.app.state.config.LDAP_SEARCH_FILTERS, "use_tls": request.app.state.config.LDAP_USE_TLS, "certificate_path": request.app.state.config.LDAP_CA_CERT_FILE, "validate_cert": request.app.state.config.LDAP_VALIDATE_CERT, "ciphers": request.app.state.config.LDAP_CIPHERS, } @router.get("/admin/config/ldap") async def get_ldap_config(request: Request, user=Depends(get_admin_user)): return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP} class LdapConfigForm(BaseModel): enable_ldap: Optional[bool] = None @router.post("/admin/config/ldap") async def update_ldap_config( request: Request, form_data: LdapConfigForm, user=Depends(get_admin_user) ): request.app.state.config.ENABLE_LDAP = form_data.enable_ldap return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP} ############################ # Branding Config ############################ @router.get("/admin/config/branding") async def get_branding_config(request: Request, user=Depends(get_admin_user)): def get_value(config_obj): if hasattr(config_obj, "value"): return config_obj.value return config_obj return { "ORGANIZATION_NAME": get_value(request.app.state.config.BRANDING_ORGANIZATION_NAME), "CUSTOM_NAME": get_value(request.app.state.config.BRANDING_CUSTOM_NAME), "FAVICON_ICO": get_value(request.app.state.config.BRANDING_FAVICON_ICO), "FAVICON_PNG": get_value(request.app.state.config.BRANDING_FAVICON_PNG), "FAVICON_DARK_PNG": get_value(request.app.state.config.BRANDING_FAVICON_DARK_PNG), "FAVICON_SVG": get_value(request.app.state.config.BRANDING_FAVICON_SVG), "AUTH_LOGO_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_URL), "SIDEBAR_LOGO_URL": get_value(request.app.state.config.BRANDING_SIDEBAR_LOGO_URL), "AUTH_LOGO_LINK_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_LINK_URL), } class BrandingConfig(BaseModel): ORGANIZATION_NAME: Optional[str] = "" CUSTOM_NAME: Optional[str] = "" FAVICON_ICO: Optional[str] = "" FAVICON_PNG: Optional[str] = "" FAVICON_DARK_PNG: Optional[str] = "" FAVICON_SVG: Optional[str] = "" AUTH_LOGO_URL: Optional[str] = "" SIDEBAR_LOGO_URL: Optional[str] = "" AUTH_LOGO_LINK_URL: Optional[str] = "" @router.post("/admin/config/branding") async def update_branding_config( request: Request, form_data: BrandingConfig, user=Depends(get_admin_user) ): # Helper function to update config value # Must access _state directly to get PersistentConfig object (not the value) def update_config(key: str, new_value): config_obj = request.app.state.config._state.get(key) if config_obj and hasattr(config_obj, "value") and hasattr(config_obj, "save"): config_obj.value = new_value config_obj.save() log.info(f"Branding: Saved config {key} = {new_value}") else: log.warning(f"Branding: config key '{key}' not found or not PersistentConfig") # Update config values update_config("BRANDING_ORGANIZATION_NAME", form_data.ORGANIZATION_NAME) update_config("BRANDING_CUSTOM_NAME", form_data.CUSTOM_NAME) update_config("BRANDING_FAVICON_ICO", form_data.FAVICON_ICO) update_config("BRANDING_FAVICON_PNG", form_data.FAVICON_PNG) update_config("BRANDING_FAVICON_DARK_PNG", form_data.FAVICON_DARK_PNG) update_config("BRANDING_FAVICON_SVG", form_data.FAVICON_SVG) update_config("BRANDING_AUTH_LOGO_URL", form_data.AUTH_LOGO_URL) update_config("BRANDING_SIDEBAR_LOGO_URL", form_data.SIDEBAR_LOGO_URL) update_config("BRANDING_AUTH_LOGO_LINK_URL", form_data.AUTH_LOGO_LINK_URL) # Apply branding changes immediately apply_branding(request.app) # Helper to get value def get_value(config_obj): if hasattr(config_obj, "value"): return config_obj.value return config_obj return { "ORGANIZATION_NAME": get_value(request.app.state.config.BRANDING_ORGANIZATION_NAME), "CUSTOM_NAME": get_value(request.app.state.config.BRANDING_CUSTOM_NAME), "FAVICON_ICO": get_value(request.app.state.config.BRANDING_FAVICON_ICO), "FAVICON_PNG": get_value(request.app.state.config.BRANDING_FAVICON_PNG), "FAVICON_DARK_PNG": get_value(request.app.state.config.BRANDING_FAVICON_DARK_PNG), "FAVICON_SVG": get_value(request.app.state.config.BRANDING_FAVICON_SVG), "AUTH_LOGO_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_URL), "SIDEBAR_LOGO_URL": get_value(request.app.state.config.BRANDING_SIDEBAR_LOGO_URL), "AUTH_LOGO_LINK_URL": get_value(request.app.state.config.BRANDING_AUTH_LOGO_LINK_URL), } ############################ # Email Templates ############################ @router.get("/admin/config/email-templates") async def get_email_templates(request: Request, user=Depends(get_admin_user)): from open_webui.config import ( DEFAULT_EMAIL_VERIFY_TEMPLATE, DEFAULT_EMAIL_CODE_TEMPLATE, ) def get_value(config_obj): if hasattr(config_obj, "value"): return config_obj.value return config_obj return { "EMAIL_VERIFY_TEMPLATE": get_value( request.app.state.config.EMAIL_VERIFY_TEMPLATE ), "EMAIL_CODE_TEMPLATE": get_value( request.app.state.config.EMAIL_CODE_TEMPLATE ), "DEFAULT_EMAIL_VERIFY_TEMPLATE": DEFAULT_EMAIL_VERIFY_TEMPLATE, "DEFAULT_EMAIL_CODE_TEMPLATE": DEFAULT_EMAIL_CODE_TEMPLATE, } class EmailTemplatesConfig(BaseModel): EMAIL_VERIFY_TEMPLATE: Optional[str] = "" EMAIL_CODE_TEMPLATE: Optional[str] = "" @router.post("/admin/config/email-templates") async def update_email_templates( request: Request, form_data: EmailTemplatesConfig, user=Depends(get_admin_user) ): def update_config(key: str, new_value): config_obj = request.app.state.config._state.get(key) if config_obj and hasattr(config_obj, "value") and hasattr(config_obj, "save"): config_obj.value = new_value config_obj.save() log.info(f"Email Templates: Saved config {key}") update_config("EMAIL_VERIFY_TEMPLATE", form_data.EMAIL_VERIFY_TEMPLATE) update_config("EMAIL_CODE_TEMPLATE", form_data.EMAIL_CODE_TEMPLATE) def get_value(config_obj): if hasattr(config_obj, "value"): return config_obj.value return config_obj return { "EMAIL_VERIFY_TEMPLATE": get_value( request.app.state.config.EMAIL_VERIFY_TEMPLATE ), "EMAIL_CODE_TEMPLATE": get_value( request.app.state.config.EMAIL_CODE_TEMPLATE ), } ############################ # API Key ############################ # create api key @router.post("/api_key", response_model=ApiKey) async def generate_api_key(request: Request, user=Depends(get_current_user)): if not request.app.state.config.ENABLE_API_KEYS or not has_permission( user.id, "features.api_keys", request.app.state.config.USER_PERMISSIONS ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.API_KEY_CREATION_NOT_ALLOWED, ) api_key = create_api_key() success = Users.update_user_api_key_by_id(user.id, api_key) if success: return { "api_key": api_key, } else: raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_API_KEY_ERROR) # delete api key @router.delete("/api_key", response_model=bool) async def delete_api_key(user=Depends(get_current_user)): return Users.delete_user_api_key_by_id(user.id) # get api key @router.get("/api_key", response_model=ApiKey) async def get_api_key(user=Depends(get_current_user)): api_key = Users.get_user_api_key_by_id(user.id) if api_key: return { "api_key": api_key, } else: raise HTTPException(404, detail=ERROR_MESSAGES.API_KEY_NOT_FOUND)