70 lines
2.2 KiB
Python
70 lines
2.2 KiB
Python
from typing import Literal
|
|
|
|
from fastapi import Cookie, Depends, Header
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.common.db.session import get_db
|
|
from app.common.errors.app_error import AuthenticationError, AuthorizationError
|
|
from app.common.security.jwt import decode_access_token
|
|
from app.models.entities import AdminUser, User
|
|
|
|
|
|
def _extract_token(
|
|
authorization: str | None,
|
|
cookie_token: str | None,
|
|
) -> str:
|
|
if authorization and authorization.startswith("Bearer "):
|
|
return authorization.split(" ", 1)[1].strip()
|
|
if cookie_token:
|
|
return cookie_token
|
|
raise AuthenticationError()
|
|
|
|
|
|
def get_current_user(
|
|
db: Session = Depends(get_db),
|
|
authorization: str | None = Header(default=None),
|
|
user_access_token: str | None = Cookie(default=None),
|
|
) -> User:
|
|
token = _extract_token(authorization, user_access_token)
|
|
try:
|
|
payload = decode_access_token(token)
|
|
except Exception as exc: # noqa: BLE001
|
|
raise AuthenticationError() from exc
|
|
if payload.get("scope") != "user":
|
|
raise AuthenticationError()
|
|
user = db.scalar(select(User).where(User.public_id == payload["sub"]))
|
|
if not user:
|
|
raise AuthenticationError()
|
|
if user.status != 1:
|
|
raise AuthorizationError("user disabled")
|
|
return user
|
|
|
|
|
|
def get_current_admin(
|
|
db: Session = Depends(get_db),
|
|
authorization: str | None = Header(default=None),
|
|
admin_access_token: str | None = Cookie(default=None),
|
|
) -> AdminUser:
|
|
token = _extract_token(authorization, admin_access_token)
|
|
try:
|
|
payload = decode_access_token(token)
|
|
except Exception as exc: # noqa: BLE001
|
|
raise AuthenticationError() from exc
|
|
if payload.get("scope") != "admin":
|
|
raise AuthenticationError()
|
|
admin = db.scalar(select(AdminUser).where(AdminUser.username == payload["sub"]))
|
|
if not admin:
|
|
raise AuthenticationError()
|
|
if admin.status != 1:
|
|
raise AuthorizationError("admin disabled")
|
|
return admin
|
|
|
|
|
|
def require_admin_permission(_permission: Literal["any"] = "any"):
|
|
def dependency(admin: AdminUser = Depends(get_current_admin)) -> AdminUser:
|
|
return admin
|
|
|
|
return dependency
|
|
|