78 lines
2.2 KiB
Python
78 lines
2.2 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import jwt
|
|
from fastapi import Response
|
|
|
|
from app.common.config.settings import get_settings
|
|
|
|
|
|
settings = get_settings()
|
|
|
|
|
|
def _encode(payload: dict[str, Any], secret: str, expires_delta: timedelta) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
body = {
|
|
**payload,
|
|
"iat": int(now.timestamp()),
|
|
"exp": int((now + expires_delta).timestamp()),
|
|
}
|
|
return jwt.encode(body, secret, algorithm="HS256")
|
|
|
|
|
|
def create_access_token(subject: str, *, scope: str) -> str:
|
|
return _encode(
|
|
{"sub": subject, "scope": scope, "type": "access"},
|
|
settings.jwt_secret,
|
|
timedelta(minutes=settings.jwt_access_expire_minutes),
|
|
)
|
|
|
|
|
|
def create_refresh_token(subject: str, *, scope: str) -> str:
|
|
return _encode(
|
|
{"sub": subject, "scope": scope, "type": "refresh"},
|
|
settings.jwt_refresh_secret,
|
|
timedelta(days=settings.jwt_refresh_expire_days),
|
|
)
|
|
|
|
|
|
def decode_access_token(token: str) -> dict[str, Any]:
|
|
return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
|
|
|
|
|
|
def decode_refresh_token(token: str) -> dict[str, Any]:
|
|
return jwt.decode(token, settings.jwt_refresh_secret, algorithms=["HS256"])
|
|
|
|
|
|
def set_auth_cookies(response: Response, access_token: str, refresh_token: str, *, prefix: str) -> None:
|
|
common_kwargs = {
|
|
"httponly": True,
|
|
"secure": False,
|
|
"samesite": "lax",
|
|
"domain": settings.jwt_cookie_domain or None,
|
|
}
|
|
response.set_cookie(
|
|
key=f"{prefix}_access_token",
|
|
value=access_token,
|
|
max_age=settings.jwt_access_expire_minutes * 60,
|
|
**common_kwargs,
|
|
)
|
|
response.set_cookie(
|
|
key=f"{prefix}_refresh_token",
|
|
value=refresh_token,
|
|
max_age=settings.jwt_refresh_expire_days * 24 * 3600,
|
|
**common_kwargs,
|
|
)
|
|
|
|
|
|
def clear_auth_cookies(response: Response, *, prefix: str) -> None:
|
|
response.delete_cookie(
|
|
key=f"{prefix}_access_token",
|
|
domain=settings.jwt_cookie_domain or None,
|
|
)
|
|
response.delete_cookie(
|
|
key=f"{prefix}_refresh_token",
|
|
domain=settings.jwt_cookie_domain or None,
|
|
)
|
|
|