from typing import Literal from fastapi import Cookie, Depends, Header from sqlalchemy import select from sqlalchemy.orm import Session from app.common.db.session import get_db from app.common.errors.app_error import AuthenticationError, AuthorizationError from app.common.security.jwt import decode_access_token from app.models.entities import AdminUser, User def _extract_token( authorization: str | None, cookie_token: str | None, ) -> str: if authorization and authorization.startswith("Bearer "): return authorization.split(" ", 1)[1].strip() if cookie_token: return cookie_token raise AuthenticationError() def get_current_user( db: Session = Depends(get_db), authorization: str | None = Header(default=None), user_access_token: str | None = Cookie(default=None), ) -> User: token = _extract_token(authorization, user_access_token) try: payload = decode_access_token(token) except Exception as exc: # noqa: BLE001 raise AuthenticationError() from exc if payload.get("scope") != "user": raise AuthenticationError() user = db.scalar(select(User).where(User.public_id == payload["sub"])) if not user: raise AuthenticationError() if user.status != 1: raise AuthorizationError("user disabled") return user def get_current_admin( db: Session = Depends(get_db), authorization: str | None = Header(default=None), admin_access_token: str | None = Cookie(default=None), ) -> AdminUser: token = _extract_token(authorization, admin_access_token) try: payload = decode_access_token(token) except Exception as exc: # noqa: BLE001 raise AuthenticationError() from exc if payload.get("scope") != "admin": raise AuthenticationError() admin = db.scalar(select(AdminUser).where(AdminUser.username == payload["sub"])) if not admin: raise AuthenticationError() if admin.status != 1: raise AuthorizationError("admin disabled") return admin def require_admin_permission(_permission: Literal["any"] = "any"): def dependency(admin: AdminUser = Depends(get_current_admin)) -> AdminUser: return admin return dependency