feat: initialize aivideo project

This commit is contained in:
2026-04-17 18:33:05 +08:00
commit 14b18d67fe
162 changed files with 26251 additions and 0 deletions

View File

@@ -0,0 +1,69 @@
from typing import Literal
from fastapi import Cookie, Depends, Header
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.common.db.session import get_db
from app.common.errors.app_error import AuthenticationError, AuthorizationError
from app.common.security.jwt import decode_access_token
from app.models.entities import AdminUser, User
def _extract_token(
authorization: str | None,
cookie_token: str | None,
) -> str:
if authorization and authorization.startswith("Bearer "):
return authorization.split(" ", 1)[1].strip()
if cookie_token:
return cookie_token
raise AuthenticationError()
def get_current_user(
db: Session = Depends(get_db),
authorization: str | None = Header(default=None),
user_access_token: str | None = Cookie(default=None),
) -> User:
token = _extract_token(authorization, user_access_token)
try:
payload = decode_access_token(token)
except Exception as exc: # noqa: BLE001
raise AuthenticationError() from exc
if payload.get("scope") != "user":
raise AuthenticationError()
user = db.scalar(select(User).where(User.public_id == payload["sub"]))
if not user:
raise AuthenticationError()
if user.status != 1:
raise AuthorizationError("user disabled")
return user
def get_current_admin(
db: Session = Depends(get_db),
authorization: str | None = Header(default=None),
admin_access_token: str | None = Cookie(default=None),
) -> AdminUser:
token = _extract_token(authorization, admin_access_token)
try:
payload = decode_access_token(token)
except Exception as exc: # noqa: BLE001
raise AuthenticationError() from exc
if payload.get("scope") != "admin":
raise AuthenticationError()
admin = db.scalar(select(AdminUser).where(AdminUser.username == payload["sub"]))
if not admin:
raise AuthenticationError()
if admin.status != 1:
raise AuthorizationError("admin disabled")
return admin
def require_admin_permission(_permission: Literal["any"] = "any"):
def dependency(admin: AdminUser = Depends(get_current_admin)) -> AdminUser:
return admin
return dependency

View File

@@ -0,0 +1,77 @@
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from fastapi import Response
from app.common.config.settings import get_settings
settings = get_settings()
def _encode(payload: dict[str, Any], secret: str, expires_delta: timedelta) -> str:
now = datetime.now(timezone.utc)
body = {
**payload,
"iat": int(now.timestamp()),
"exp": int((now + expires_delta).timestamp()),
}
return jwt.encode(body, secret, algorithm="HS256")
def create_access_token(subject: str, *, scope: str) -> str:
return _encode(
{"sub": subject, "scope": scope, "type": "access"},
settings.jwt_secret,
timedelta(minutes=settings.jwt_access_expire_minutes),
)
def create_refresh_token(subject: str, *, scope: str) -> str:
return _encode(
{"sub": subject, "scope": scope, "type": "refresh"},
settings.jwt_refresh_secret,
timedelta(days=settings.jwt_refresh_expire_days),
)
def decode_access_token(token: str) -> dict[str, Any]:
return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
def decode_refresh_token(token: str) -> dict[str, Any]:
return jwt.decode(token, settings.jwt_refresh_secret, algorithms=["HS256"])
def set_auth_cookies(response: Response, access_token: str, refresh_token: str, *, prefix: str) -> None:
common_kwargs = {
"httponly": True,
"secure": False,
"samesite": "lax",
"domain": settings.jwt_cookie_domain or None,
}
response.set_cookie(
key=f"{prefix}_access_token",
value=access_token,
max_age=settings.jwt_access_expire_minutes * 60,
**common_kwargs,
)
response.set_cookie(
key=f"{prefix}_refresh_token",
value=refresh_token,
max_age=settings.jwt_refresh_expire_days * 24 * 3600,
**common_kwargs,
)
def clear_auth_cookies(response: Response, *, prefix: str) -> None:
response.delete_cookie(
key=f"{prefix}_access_token",
domain=settings.jwt_cookie_domain or None,
)
response.delete_cookie(
key=f"{prefix}_refresh_token",
domain=settings.jwt_cookie_domain or None,
)

View File

@@ -0,0 +1,9 @@
import bcrypt
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, password_hash: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), password_hash.encode("utf-8"))