from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from app.common.db.session import get_db from app.common.responses.api_response import success_response from app.common.security.deps import get_current_user, require_admin_permission from app.models.entities import User from app.modules.video_tasks.schema import CreateVideoTaskRequest from app.modules.video_tasks.service import VideoTasksService router = APIRouter(tags=["video-tasks"]) @router.post("/api/v1/video-tasks") def create_task( payload: CreateVideoTaskRequest, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).create_task(current_user.id, payload)) @router.get("/api/v1/video-tasks") def list_tasks( current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).list_tasks(current_user.id)) @router.get("/api/v1/video-tasks/{task_no}") def get_task_detail( task_no: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).get_task_detail(current_user.id, task_no)) @router.post("/api/v1/video-tasks/{task_no}/retry") def retry_task( task_no: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).retry_task(current_user.id, task_no)) @router.post("/api/v1/video-tasks/{task_no}/cancel") def cancel_task( task_no: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).cancel_task(current_user.id, task_no)) @router.delete("/api/v1/video-tasks/{task_no}") def delete_task( task_no: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).delete_task(current_user.id, task_no)) admin_router = APIRouter(prefix="/api/v1/admin/video-tasks", tags=["admin-video-tasks"]) @admin_router.get("") def admin_list_tasks( _=Depends(require_admin_permission()), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).admin_list_tasks()) @admin_router.get("/{task_id}") def admin_get_task( task_id: int, _=Depends(require_admin_permission()), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).admin_get_task(task_id)) @admin_router.post("/{task_id}/retry") def admin_retry_task( task_id: int, _=Depends(require_admin_permission()), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).admin_retry_task(task_id)) @admin_router.post("/{task_id}/refund") def admin_refund_task( task_id: int, _=Depends(require_admin_permission()), db: Session = Depends(get_db), ): return success_response(VideoTasksService(db).admin_refund_task(task_id))