Files
aivideo/backend/app/core/storage.py

54 lines
1.7 KiB
Python

from __future__ import annotations
import hashlib
from pathlib import Path
from uuid import uuid4
from fastapi import UploadFile
from app.common.config.settings import get_settings
settings = get_settings()
class LocalStorageService:
def __init__(self) -> None:
self.root = settings.storage_root
self.root.mkdir(parents=True, exist_ok=True)
def build_public_url(self, storage_key: str) -> str:
base = settings.storage_public_base_url.rstrip("/")
normalized_key = storage_key.replace("\\", "/")
return f"{base}/{normalized_key}"
def save_upload(self, upload: UploadFile, *, folder: str) -> dict:
ext = Path(upload.filename or "file.bin").suffix
storage_key = f"{folder}/{uuid4().hex}{ext}"
path = self.root / storage_key
path.parent.mkdir(parents=True, exist_ok=True)
content = upload.file.read()
path.write_bytes(content)
return {
"storage_key": storage_key,
"file_size": len(content),
"sha256": hashlib.sha256(content).hexdigest(),
"public_url": self.build_public_url(storage_key),
}
def save_bytes(self, content: bytes, *, filename: str, folder: str) -> dict:
ext = Path(filename).suffix or ".bin"
storage_key = f"{folder}/{uuid4().hex}{ext}"
path = self.root / storage_key
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(content)
return {
"storage_key": storage_key,
"file_size": len(content),
"sha256": hashlib.sha256(content).hexdigest(),
"public_url": self.build_public_url(storage_key),
}
storage_service = LocalStorageService()