Some checks failed
Create and publish Docker images with specific build args / build-main-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-main-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda126-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-cuda126-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-ollama-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-ollama-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Create and publish Docker images with specific build args / build-slim-image (linux/amd64, ubuntu-latest) (push) Has been cancelled
Create and publish Docker images with specific build args / build-slim-image (linux/arm64, ubuntu-24.04-arm) (push) Has been cancelled
Python CI / Format Backend (3.11.x) (push) Has been cancelled
Python CI / Format Backend (3.12.x) (push) Has been cancelled
Frontend Build / Format & Build Frontend (push) Has been cancelled
Frontend Build / Frontend Unit Tests (push) Has been cancelled
Create and publish Docker images with specific build args / merge-main-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-cuda-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-cuda126-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-ollama-images (push) Has been cancelled
Create and publish Docker images with specific build args / merge-slim-images (push) Has been cancelled
Close inactive issues / close-issues (push) Has been cancelled
813 lines
26 KiB
Python
813 lines
26 KiB
Python
import logging
|
||
import shutil
|
||
import uuid
|
||
import jwt
|
||
import base64
|
||
import hmac
|
||
import hashlib
|
||
import requests
|
||
import os
|
||
import bcrypt
|
||
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
from cryptography.hazmat.primitives.asymmetric import ed25519
|
||
from cryptography.hazmat.primitives import serialization
|
||
import json
|
||
|
||
|
||
from datetime import datetime, timedelta
|
||
import pytz
|
||
from pytz import UTC
|
||
from typing import Optional, Union, List, Dict
|
||
|
||
from opentelemetry import trace
|
||
|
||
from open_webui.config import WEBUI_URL, EMAIL_VERIFY_TEMPLATE, EMAIL_CODE_TEMPLATE
|
||
from open_webui.utils.smtp import send_email
|
||
|
||
from open_webui.utils.access_control import has_permission
|
||
from open_webui.models.users import Users
|
||
|
||
from open_webui.constants import ERROR_MESSAGES
|
||
|
||
from open_webui.env import (
|
||
ENABLE_PASSWORD_VALIDATION,
|
||
OFFLINE_MODE,
|
||
LICENSE_BLOB,
|
||
PASSWORD_VALIDATION_REGEX_PATTERN,
|
||
REDIS_KEY_PREFIX,
|
||
pk,
|
||
WEBUI_SECRET_KEY,
|
||
TRUSTED_SIGNATURE_KEY,
|
||
STATIC_DIR,
|
||
WEBUI_AUTH_TRUSTED_EMAIL_HEADER,
|
||
FRONTEND_BUILD_DIR,
|
||
REDIS_URL,
|
||
REDIS_SENTINEL_HOSTS,
|
||
REDIS_SENTINEL_PORT,
|
||
WEBUI_NAME,
|
||
REDIS_CLUSTER,
|
||
BASE_DIR,
|
||
)
|
||
|
||
from fastapi import BackgroundTasks, Depends, HTTPException, Request, Response, status
|
||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
|
||
from open_webui.utils.redis import get_redis_connection, get_sentinels_from_env
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
SESSION_SECRET = WEBUI_SECRET_KEY
|
||
ALGORITHM = "HS256"
|
||
|
||
|
||
##############
|
||
# Auth Utils
|
||
##############
|
||
|
||
|
||
def verify_signature(payload: str, signature: str) -> bool:
|
||
"""
|
||
Verifies the HMAC signature of the received payload.
|
||
"""
|
||
try:
|
||
expected_signature = base64.b64encode(
|
||
hmac.new(TRUSTED_SIGNATURE_KEY, payload.encode(), hashlib.sha256).digest()
|
||
).decode()
|
||
|
||
# Compare securely to prevent timing attacks
|
||
return hmac.compare_digest(expected_signature, signature)
|
||
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def override_static(path: str, content: str) -> bool:
|
||
"""Download content from URL and save to path.
|
||
|
||
Returns True if successful, False otherwise.
|
||
Does not raise exceptions to prevent app startup failure.
|
||
"""
|
||
os.makedirs(os.path.dirname(path), exist_ok=True)
|
||
|
||
try:
|
||
# try with SSL verification first, then without if it fails
|
||
try:
|
||
r = requests.get(content, stream=True, timeout=30)
|
||
except requests.exceptions.SSLError:
|
||
log.warning(f"SSL error downloading {content}, retrying without verification")
|
||
r = requests.get(content, stream=True, timeout=30, verify=False)
|
||
|
||
r.raise_for_status()
|
||
with open(path, "wb") as f:
|
||
r.raw.decode_content = True
|
||
shutil.copyfileobj(r.raw, f)
|
||
log.info(f"Successfully downloaded {content} to {path}")
|
||
return True
|
||
except Exception as e:
|
||
log.error(f"Failed to download {content}: {e}")
|
||
return False
|
||
|
||
|
||
def apply_branding(app):
|
||
"""Apply branding settings from database config or environment variables."""
|
||
custom_png = ""
|
||
custom_svg = ""
|
||
custom_ico = ""
|
||
custom_dark_png = ""
|
||
organization_name = "OpenWebui"
|
||
custom_name = ""
|
||
|
||
if hasattr(app, "state") and hasattr(app.state, "config"):
|
||
# Get config values - must access _state directly to get fresh PersistentConfig values
|
||
def get_config_value(attr_name, default=""):
|
||
# Access _state directly to get the PersistentConfig object
|
||
config_obj = app.state.config._state.get(attr_name)
|
||
if config_obj is None:
|
||
return default
|
||
# If it's a PersistentConfig object, get its value
|
||
if hasattr(config_obj, "value"):
|
||
result = config_obj.value or default
|
||
log.info(f"Branding: {attr_name}.value = {result}")
|
||
return result
|
||
return config_obj or default
|
||
|
||
custom_png = get_config_value("BRANDING_FAVICON_PNG") or os.getenv("CUSTOM_PNG", "")
|
||
custom_svg = get_config_value("BRANDING_FAVICON_SVG") or os.getenv("CUSTOM_SVG", "")
|
||
custom_ico = get_config_value("BRANDING_FAVICON_ICO") or os.getenv("CUSTOM_ICO", "")
|
||
custom_dark_png = get_config_value("BRANDING_FAVICON_DARK_PNG") or os.getenv("CUSTOM_DARK_PNG", "")
|
||
organization_name = get_config_value("BRANDING_ORGANIZATION_NAME") or os.getenv("ORGANIZATION_NAME", "OpenWebui")
|
||
custom_name = get_config_value("BRANDING_CUSTOM_NAME") or os.getenv("CUSTOM_NAME", "")
|
||
else:
|
||
custom_png = os.getenv("CUSTOM_PNG", "")
|
||
custom_svg = os.getenv("CUSTOM_SVG", "")
|
||
custom_ico = os.getenv("CUSTOM_ICO", "")
|
||
custom_dark_png = os.getenv("CUSTOM_DARK_PNG", "")
|
||
organization_name = os.getenv("ORGANIZATION_NAME", "OpenWebui")
|
||
custom_name = os.getenv("CUSTOM_NAME", "")
|
||
|
||
log.info(f"Branding: custom_name={custom_name}, custom_png={custom_png}")
|
||
|
||
# Frontend static/static directory (for dev mode)
|
||
# SvelteKit maps static/ folder to root, so /static/favicon.png = static/static/favicon.png
|
||
frontend_static_dir = BASE_DIR / "static" / "static"
|
||
|
||
# Build resources mapping for both backend and frontend static dirs
|
||
resources = []
|
||
|
||
# files to override
|
||
branding_files = [
|
||
("logo.png", custom_png),
|
||
("favicon.png", custom_png),
|
||
("favicon.svg", custom_svg),
|
||
("favicon-96x96.png", custom_png),
|
||
("apple-touch-icon.png", custom_png),
|
||
("web-app-manifest-192x192.png", custom_png),
|
||
("web-app-manifest-512x512.png", custom_png),
|
||
("splash.png", custom_png),
|
||
("favicon.ico", custom_ico),
|
||
("favicon-dark.png", custom_dark_png),
|
||
("splash-dark.png", custom_dark_png),
|
||
]
|
||
|
||
for filename, url in branding_files:
|
||
if url:
|
||
# Add backend static dir path
|
||
resources.append((os.path.join(STATIC_DIR, filename), url))
|
||
# Add frontend static/static dir path (for dev mode)
|
||
resources.append((os.path.join(frontend_static_dir, filename), url))
|
||
|
||
try:
|
||
for path, url in resources:
|
||
if url:
|
||
log.info(f"Branding: Downloading {url} to {path}")
|
||
override_static(path, url)
|
||
|
||
# set metadata
|
||
setattr(app.state, "LICENSE_METADATA", {
|
||
"type": "enterprise",
|
||
"organization_name": organization_name,
|
||
})
|
||
log.info(f"Branding: Set LICENSE_METADATA organization_name={organization_name}")
|
||
|
||
# set custom name if provided
|
||
if custom_name:
|
||
setattr(app.state, "WEBUI_NAME", custom_name)
|
||
log.info(f"Branding: Set WEBUI_NAME={custom_name}")
|
||
else:
|
||
log.info(f"Branding: custom_name is empty, WEBUI_NAME not changed")
|
||
|
||
return True
|
||
except Exception as ex:
|
||
log.exception(f"Branding: Uncaught Exception: {ex}")
|
||
return False
|
||
|
||
|
||
def get_license_data(app, key):
|
||
# get branding config from app.state.config if available, fallback to env vars
|
||
custom_png = ""
|
||
custom_svg = ""
|
||
custom_ico = ""
|
||
custom_dark_png = ""
|
||
organization_name = "OpenWebui"
|
||
|
||
if hasattr(app, "state") and hasattr(app.state, "config"):
|
||
custom_png = getattr(app.state.config, "BRANDING_FAVICON_PNG", "") or os.getenv("CUSTOM_PNG", "")
|
||
custom_svg = getattr(app.state.config, "BRANDING_FAVICON_SVG", "") or os.getenv("CUSTOM_SVG", "")
|
||
custom_ico = getattr(app.state.config, "BRANDING_FAVICON_ICO", "") or os.getenv("CUSTOM_ICO", "")
|
||
custom_dark_png = getattr(app.state.config, "BRANDING_FAVICON_DARK_PNG", "") or os.getenv("CUSTOM_DARK_PNG", "")
|
||
organization_name = getattr(app.state.config, "BRANDING_ORGANIZATION_NAME", "") or os.getenv("ORGANIZATION_NAME", "OpenWebui")
|
||
else:
|
||
custom_png = os.getenv("CUSTOM_PNG", "")
|
||
custom_svg = os.getenv("CUSTOM_SVG", "")
|
||
custom_ico = os.getenv("CUSTOM_ICO", "")
|
||
custom_dark_png = os.getenv("CUSTOM_DARK_PNG", "")
|
||
organization_name = os.getenv("ORGANIZATION_NAME", "OpenWebui")
|
||
|
||
payload = {
|
||
"resources": {
|
||
os.path.join(STATIC_DIR, "logo.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "favicon.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "favicon.svg"): custom_svg,
|
||
os.path.join(STATIC_DIR, "favicon-96x96.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "apple-touch-icon.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "web-app-manifest-192x192.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "web-app-manifest-512x512.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "splash.png"): custom_png,
|
||
os.path.join(STATIC_DIR, "favicon.ico"): custom_ico,
|
||
os.path.join(STATIC_DIR, "favicon-dark.png"): custom_dark_png,
|
||
os.path.join(STATIC_DIR, "splash-dark.png"): custom_dark_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "favicon.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/favicon.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/favicon.svg"): custom_svg,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/favicon-96x96.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/apple-touch-icon.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/web-app-manifest-192x192.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/web-app-manifest-512x512.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/splash.png"): custom_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/favicon.ico"): custom_ico,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/favicon-dark.png"): custom_dark_png,
|
||
os.path.join(FRONTEND_BUILD_DIR, "static/splash-dark.png"): custom_dark_png,
|
||
},
|
||
"metadata": {
|
||
"type": "enterprise",
|
||
"organization_name": organization_name,
|
||
},
|
||
}
|
||
try:
|
||
for k, v in payload.items():
|
||
if k == "resources":
|
||
for p, c in v.items():
|
||
if c:
|
||
globals().get("override_static", lambda a, b: None)(p, c)
|
||
elif k == "count":
|
||
setattr(app.state, "USER_COUNT", v)
|
||
elif k == "name":
|
||
setattr(app.state, "WEBUI_NAME", v)
|
||
elif k == "metadata":
|
||
setattr(app.state, "LICENSE_METADATA", v)
|
||
return True
|
||
except Exception as ex:
|
||
log.exception(f"License: Uncaught Exception: {ex}")
|
||
|
||
return True
|
||
|
||
|
||
bearer_security = HTTPBearer(auto_error=False)
|
||
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""Hash a password using bcrypt"""
|
||
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
||
|
||
|
||
def validate_password(password: str) -> bool:
|
||
# The password passed to bcrypt must be 72 bytes or fewer. If it is longer, it will be truncated before hashing.
|
||
if len(password.encode("utf-8")) > 72:
|
||
raise Exception(
|
||
ERROR_MESSAGES.PASSWORD_TOO_LONG,
|
||
)
|
||
|
||
if ENABLE_PASSWORD_VALIDATION:
|
||
if not PASSWORD_VALIDATION_REGEX_PATTERN.match(password):
|
||
raise Exception(ERROR_MESSAGES.INVALID_PASSWORD())
|
||
|
||
return True
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""Verify a password against its hash"""
|
||
return (
|
||
bcrypt.checkpw(
|
||
plain_password.encode("utf-8"),
|
||
hashed_password.encode("utf-8"),
|
||
)
|
||
if hashed_password
|
||
else None
|
||
)
|
||
|
||
|
||
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
|
||
payload = data.copy()
|
||
|
||
if expires_delta:
|
||
expire = datetime.now(UTC) + expires_delta
|
||
payload.update({"exp": expire})
|
||
|
||
jti = str(uuid.uuid4())
|
||
payload.update({"jti": jti})
|
||
|
||
encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
|
||
return encoded_jwt
|
||
|
||
|
||
def decode_token(token: str) -> Optional[dict]:
|
||
try:
|
||
decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
|
||
return decoded
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def is_valid_token(request, decoded) -> bool:
|
||
# Require Redis to check revoked tokens
|
||
if request.app.state.redis:
|
||
jti = decoded.get("jti")
|
||
|
||
if jti:
|
||
revoked = await request.app.state.redis.get(
|
||
f"{REDIS_KEY_PREFIX}:auth:token:{jti}:revoked"
|
||
)
|
||
if revoked:
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
async def invalidate_token(request, token):
|
||
decoded = decode_token(token)
|
||
|
||
# Require Redis to store revoked tokens
|
||
if request.app.state.redis:
|
||
jti = decoded.get("jti")
|
||
exp = decoded.get("exp")
|
||
|
||
if jti and exp:
|
||
ttl = exp - int(
|
||
datetime.now(UTC).timestamp()
|
||
) # Calculate time-to-live for the token
|
||
|
||
if ttl > 0:
|
||
# Store the revoked token in Redis with an expiration time
|
||
await request.app.state.redis.set(
|
||
f"{REDIS_KEY_PREFIX}:auth:token:{jti}:revoked",
|
||
"1",
|
||
ex=ttl,
|
||
)
|
||
|
||
|
||
def extract_token_from_auth_header(auth_header: str):
|
||
return auth_header[len("Bearer ") :]
|
||
|
||
|
||
def create_api_key():
|
||
key = str(uuid.uuid4()).replace("-", "")
|
||
return f"sk-{key}"
|
||
|
||
|
||
def get_http_authorization_cred(auth_header: Optional[str]):
|
||
if not auth_header:
|
||
return None
|
||
try:
|
||
scheme, credentials = auth_header.split(" ")
|
||
return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def get_current_user(
|
||
request: Request,
|
||
response: Response,
|
||
background_tasks: BackgroundTasks,
|
||
auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
|
||
):
|
||
token = None
|
||
|
||
if auth_token is not None:
|
||
token = auth_token.credentials
|
||
|
||
if token is None and "token" in request.cookies:
|
||
token = request.cookies.get("token")
|
||
|
||
if token is None:
|
||
raise HTTPException(status_code=401, detail="Not authenticated")
|
||
|
||
# auth by api key
|
||
if token.startswith("sk-"):
|
||
user = get_current_user_by_api_key(request, token)
|
||
|
||
# Add user info to current span
|
||
current_span = trace.get_current_span()
|
||
if current_span:
|
||
current_span.set_attribute("client.user.id", user.id)
|
||
current_span.set_attribute("client.user.email", user.email)
|
||
current_span.set_attribute("client.user.role", user.role)
|
||
current_span.set_attribute("client.auth.type", "api_key")
|
||
|
||
return user
|
||
|
||
# auth by jwt token
|
||
try:
|
||
try:
|
||
data = decode_token(token)
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Invalid token",
|
||
)
|
||
|
||
if data is not None and "id" in data:
|
||
if data.get("jti") and not await is_valid_token(request, data):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Invalid token",
|
||
)
|
||
|
||
user = Users.get_user_by_id(data["id"])
|
||
if user is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
||
)
|
||
else:
|
||
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER:
|
||
trusted_email = request.headers.get(
|
||
WEBUI_AUTH_TRUSTED_EMAIL_HEADER, ""
|
||
).lower()
|
||
if trusted_email and user.email != trusted_email:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="User mismatch. Please sign in again.",
|
||
)
|
||
|
||
# Add user info to current span
|
||
current_span = trace.get_current_span()
|
||
if current_span:
|
||
current_span.set_attribute("client.user.id", user.id)
|
||
current_span.set_attribute("client.user.email", user.email)
|
||
current_span.set_attribute("client.user.role", user.role)
|
||
current_span.set_attribute("client.auth.type", "jwt")
|
||
|
||
# Refresh the user's last active timestamp asynchronously
|
||
# to prevent blocking the request
|
||
if background_tasks:
|
||
background_tasks.add_task(Users.update_last_active_by_id, user.id)
|
||
return user
|
||
else:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail=ERROR_MESSAGES.UNAUTHORIZED,
|
||
)
|
||
except Exception as e:
|
||
# Delete the token cookie
|
||
if request.cookies.get("token"):
|
||
response.delete_cookie("token")
|
||
|
||
if request.cookies.get("oauth_id_token"):
|
||
response.delete_cookie("oauth_id_token")
|
||
|
||
# Delete OAuth session if present
|
||
if request.cookies.get("oauth_session_id"):
|
||
response.delete_cookie("oauth_session_id")
|
||
|
||
raise e
|
||
|
||
|
||
def get_current_user_by_api_key(request, api_key: str):
|
||
user = Users.get_user_by_api_key(api_key)
|
||
|
||
if user is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
||
)
|
||
|
||
if not request.state.enable_api_keys or (
|
||
user.role != "admin"
|
||
and not has_permission(
|
||
user.id,
|
||
"features.api_keys",
|
||
request.app.state.config.USER_PERMISSIONS,
|
||
)
|
||
):
|
||
raise HTTPException(
|
||
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.API_KEY_NOT_ALLOWED
|
||
)
|
||
|
||
# Add user info to current span
|
||
current_span = trace.get_current_span()
|
||
if current_span:
|
||
current_span.set_attribute("client.user.id", user.id)
|
||
current_span.set_attribute("client.user.email", user.email)
|
||
current_span.set_attribute("client.user.role", user.role)
|
||
current_span.set_attribute("client.auth.type", "api_key")
|
||
|
||
Users.update_last_active_by_id(user.id)
|
||
return user
|
||
|
||
|
||
def get_verified_user(user=Depends(get_current_user)):
|
||
if user.role not in {"user", "admin"}:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||
)
|
||
return user
|
||
|
||
|
||
def get_admin_user(user=Depends(get_current_user)):
|
||
if user.role != "admin":
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
||
)
|
||
return user
|
||
|
||
|
||
verify_email_template = """<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>账户激活</title>
|
||
<style>
|
||
* {
|
||
margin: 0;
|
||
padding: 0;
|
||
box-sizing: border-box;
|
||
}
|
||
|
||
body {
|
||
font-family: 'Poppins', Arial, sans-serif;
|
||
background-color: #f5f7fa;
|
||
color: #333;
|
||
line-height: 1.6;
|
||
-webkit-font-smoothing: antialiased;
|
||
}
|
||
|
||
.email-container {
|
||
max-width: 600px;
|
||
margin: 0 auto;
|
||
background-color: #ffffff;
|
||
border-radius: 16px;
|
||
overflow: hidden;
|
||
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.05);
|
||
}
|
||
|
||
.email-header {
|
||
background: linear-gradient(135deg, #6B73FF 0%%, #5DADE2 100%%);
|
||
padding: 30px;
|
||
text-align: center;
|
||
}
|
||
|
||
.email-header img {
|
||
max-width: 100px;
|
||
height: auto;
|
||
}
|
||
|
||
.email-header h1 {
|
||
color: white;
|
||
font-size: 24px;
|
||
margin-top: 15px;
|
||
font-weight: 600;
|
||
}
|
||
|
||
.email-content {
|
||
padding: 40px 30px;
|
||
text-align: center;
|
||
}
|
||
|
||
.welcome-text {
|
||
font-size: 18px;
|
||
margin-bottom: 25px;
|
||
color: #2c3e50;
|
||
}
|
||
|
||
.instruction-text {
|
||
font-size: 16px;
|
||
margin-bottom: 30px;
|
||
color: #5d6778;
|
||
}
|
||
|
||
.activate-button {
|
||
display: inline-block;
|
||
background: linear-gradient(to right, #5D8CF7, #4286F5);
|
||
color: white;
|
||
text-decoration: none;
|
||
padding: 14px 30px;
|
||
border-radius: 50px;
|
||
font-weight: 500;
|
||
font-size: 16px;
|
||
margin: 20px 0;
|
||
transition: all 0.3s ease;
|
||
box-shadow: 0 4px 10px rgba(65, 132, 234, 0.35);
|
||
}
|
||
|
||
.activate-button:hover {
|
||
transform: translateY(-2px);
|
||
box-shadow: 0 6px 15px rgba(65, 132, 234, 0.40);
|
||
}
|
||
|
||
.note {
|
||
font-size: 14px;
|
||
color: #8a94a6;
|
||
margin-top: 30px;
|
||
}
|
||
|
||
.email-footer {
|
||
background-color: #f8fafc;
|
||
padding: 20px;
|
||
text-align: center;
|
||
font-size: 13px;
|
||
color: #8a94a6;
|
||
border-top: 1px solid #eaeef3;
|
||
}
|
||
|
||
.social-links {
|
||
margin: 15px 0;
|
||
}
|
||
|
||
.social-link {
|
||
display: inline-block;
|
||
margin: 0 8px;
|
||
width: 32px;
|
||
height: 32px;
|
||
background-color: #e1e5eb;
|
||
border-radius: 50%%;
|
||
line-height: 32px;
|
||
text-align: center;
|
||
transition: background-color 0.3s ease;
|
||
}
|
||
|
||
.social-link:hover {
|
||
background-color: #d0d5dd;
|
||
}
|
||
|
||
@media only screen and (max-width: 600px) {
|
||
.email-container {
|
||
border-radius: 0;
|
||
}
|
||
|
||
.email-header, .email-content {
|
||
padding: 25px 20px;
|
||
}
|
||
|
||
.email-header h1 {
|
||
font-size: 22px;
|
||
}
|
||
|
||
.welcome-text {
|
||
font-size: 16px;
|
||
}
|
||
|
||
.instruction-text {
|
||
font-size: 15px;
|
||
}
|
||
|
||
.activate-button {
|
||
padding: 12px 25px;
|
||
font-size: 15px;
|
||
}
|
||
}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="email-container">
|
||
<div class="email-header">
|
||
<h1>%(title)s</h1>
|
||
</div>
|
||
|
||
<div class="email-content">
|
||
<p class="instruction-text">请点击下方按钮激活您的账户,开始精彩体验。激活链接有效期为24小时。</p>
|
||
<a href="%(link)s" class="activate-button">立即激活账户</a>
|
||
<p class="note">如果您没有注册我们的服务,请忽略此邮件。</p>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>"""
|
||
|
||
|
||
def get_email_code_key(code: str) -> str:
|
||
return f"email_verify:{code}"
|
||
|
||
|
||
def send_verify_email(email: str):
|
||
redis = get_redis_connection(
|
||
redis_url=REDIS_URL,
|
||
redis_sentinels=get_sentinels_from_env(
|
||
REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT
|
||
),
|
||
redis_cluster=REDIS_CLUSTER,
|
||
)
|
||
code = f"{uuid.uuid4().hex}{uuid.uuid1().hex}"
|
||
redis.set(name=get_email_code_key(code=code), value=email, ex=timedelta(days=1))
|
||
link = f"{WEBUI_URL.value.rstrip('/')}/api/v1/auths/signup_verify/{code}"
|
||
|
||
# use template from config
|
||
template = EMAIL_VERIFY_TEMPLATE.value or verify_email_template
|
||
|
||
# use replace instead of % formatting to avoid issues with % in CSS
|
||
body = template.replace("%(title)s", f"{WEBUI_NAME} Email Verify").replace("%(link)s", link)
|
||
|
||
send_email(
|
||
receiver=email,
|
||
subject=f"{WEBUI_NAME} Email Verify",
|
||
body=body,
|
||
)
|
||
|
||
|
||
def verify_email_by_code(code: str) -> str:
|
||
redis = get_redis_connection(
|
||
redis_url=REDIS_URL,
|
||
redis_sentinels=get_sentinels_from_env(
|
||
REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT
|
||
),
|
||
redis_cluster=REDIS_CLUSTER,
|
||
)
|
||
return redis.get(name=get_email_code_key(code=code))
|
||
|
||
|
||
# email verification code template
|
||
email_code_template = """<!DOCTYPE html>
|
||
<html>
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<style>
|
||
body { font-family: Arial, sans-serif; background-color: #f5f5f5; margin: 0; padding: 20px; }
|
||
.container { max-width: 500px; margin: 0 auto; background: white; border-radius: 10px; padding: 30px; }
|
||
.code { font-size: 32px; font-weight: bold; color: #3b82f6; letter-spacing: 8px; text-align: center; padding: 20px; background: #f0f9ff; border-radius: 8px; margin: 20px 0; }
|
||
.note { color: #666; font-size: 14px; }
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<h2>%(title)s</h2>
|
||
<p>您的验证码是:</p>
|
||
<div class="code">%(code)s</div>
|
||
<p class="note">验证码有效期为10分钟,请勿泄露给他人。</p>
|
||
</div>
|
||
</body>
|
||
</html>"""
|
||
|
||
|
||
def get_signup_code_key(email: str) -> str:
|
||
return f"signup_code:{email}"
|
||
|
||
|
||
def send_signup_email_code(email: str) -> str:
|
||
"""Send a 6-digit verification code to email for signup."""
|
||
import random
|
||
|
||
redis = get_redis_connection(
|
||
redis_url=REDIS_URL,
|
||
redis_sentinels=get_sentinels_from_env(
|
||
REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT
|
||
),
|
||
redis_cluster=REDIS_CLUSTER,
|
||
)
|
||
|
||
# generate 6-digit code
|
||
code = str(random.randint(100000, 999999))
|
||
|
||
# store in redis with 10 min expiration
|
||
redis.set(name=get_signup_code_key(email.lower()), value=code, ex=timedelta(minutes=10))
|
||
|
||
# use template from config
|
||
template = EMAIL_CODE_TEMPLATE.value or email_code_template
|
||
|
||
send_email(
|
||
receiver=email,
|
||
subject=f"{WEBUI_NAME} 注册验证码",
|
||
body=template % {"title": f"{WEBUI_NAME} 注册验证码", "code": code},
|
||
)
|
||
|
||
return code
|
||
|
||
|
||
def verify_signup_email_code(email: str, code: str) -> bool:
|
||
"""Verify the signup email code."""
|
||
redis = get_redis_connection(
|
||
redis_url=REDIS_URL,
|
||
redis_sentinels=get_sentinels_from_env(
|
||
REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT
|
||
),
|
||
redis_cluster=REDIS_CLUSTER,
|
||
)
|
||
|
||
stored_code = redis.get(name=get_signup_code_key(email.lower()))
|
||
if stored_code and stored_code == code:
|
||
# delete code after successful verification
|
||
redis.delete(get_signup_code_key(email.lower()))
|
||
return True
|
||
return False
|