Files

105 lines
3.0 KiB
Python

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.common.db.session import get_db
from app.common.responses.api_response import success_response
from app.common.security.deps import get_current_user, require_admin_permission
from app.models.entities import User
from app.modules.video_tasks.schema import CreateVideoTaskRequest
from app.modules.video_tasks.service import VideoTasksService
router = APIRouter(tags=["video-tasks"])
@router.post("/api/v1/video-tasks")
def create_task(
payload: CreateVideoTaskRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).create_task(current_user.id, payload))
@router.get("/api/v1/video-tasks")
def list_tasks(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).list_tasks(current_user.id))
@router.get("/api/v1/video-tasks/{task_no}")
def get_task_detail(
task_no: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).get_task_detail(current_user.id, task_no))
@router.post("/api/v1/video-tasks/{task_no}/retry")
def retry_task(
task_no: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).retry_task(current_user.id, task_no))
@router.post("/api/v1/video-tasks/{task_no}/cancel")
def cancel_task(
task_no: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).cancel_task(current_user.id, task_no))
@router.delete("/api/v1/video-tasks/{task_no}")
def delete_task(
task_no: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).delete_task(current_user.id, task_no))
admin_router = APIRouter(prefix="/api/v1/admin/video-tasks", tags=["admin-video-tasks"])
@admin_router.get("")
def admin_list_tasks(
_=Depends(require_admin_permission()),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).admin_list_tasks())
@admin_router.get("/{task_id}")
def admin_get_task(
task_id: int,
_=Depends(require_admin_permission()),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).admin_get_task(task_id))
@admin_router.post("/{task_id}/retry")
def admin_retry_task(
task_id: int,
_=Depends(require_admin_permission()),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).admin_retry_task(task_id))
@admin_router.post("/{task_id}/refund")
def admin_refund_task(
task_id: int,
_=Depends(require_admin_permission()),
db: Session = Depends(get_db),
):
return success_response(VideoTasksService(db).admin_refund_task(task_id))