Files

70 lines
2.2 KiB
Python

from typing import Literal
from fastapi import Cookie, Depends, Header
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.common.db.session import get_db
from app.common.errors.app_error import AuthenticationError, AuthorizationError
from app.common.security.jwt import decode_access_token
from app.models.entities import AdminUser, User
def _extract_token(
authorization: str | None,
cookie_token: str | None,
) -> str:
if authorization and authorization.startswith("Bearer "):
return authorization.split(" ", 1)[1].strip()
if cookie_token:
return cookie_token
raise AuthenticationError()
def get_current_user(
db: Session = Depends(get_db),
authorization: str | None = Header(default=None),
user_access_token: str | None = Cookie(default=None),
) -> User:
token = _extract_token(authorization, user_access_token)
try:
payload = decode_access_token(token)
except Exception as exc: # noqa: BLE001
raise AuthenticationError() from exc
if payload.get("scope") != "user":
raise AuthenticationError()
user = db.scalar(select(User).where(User.public_id == payload["sub"]))
if not user:
raise AuthenticationError()
if user.status != 1:
raise AuthorizationError("user disabled")
return user
def get_current_admin(
db: Session = Depends(get_db),
authorization: str | None = Header(default=None),
admin_access_token: str | None = Cookie(default=None),
) -> AdminUser:
token = _extract_token(authorization, admin_access_token)
try:
payload = decode_access_token(token)
except Exception as exc: # noqa: BLE001
raise AuthenticationError() from exc
if payload.get("scope") != "admin":
raise AuthenticationError()
admin = db.scalar(select(AdminUser).where(AdminUser.username == payload["sub"]))
if not admin:
raise AuthenticationError()
if admin.status != 1:
raise AuthorizationError("admin disabled")
return admin
def require_admin_permission(_permission: Literal["any"] = "any"):
def dependency(admin: AdminUser = Depends(get_current_admin)) -> AdminUser:
return admin
return dependency