from sqlalchemy.orm import Session from app.common.errors.app_error import NotFoundAppError from app.models.entities import ProviderModel, VideoModel, VideoModelSupplierBinding from app.modules.video_models.repository import VideoModelsRepository class VideoModelsService: def __init__(self, db: Session) -> None: self.db = db self.repository = VideoModelsRepository(db) def list_public_models(self) -> list[dict]: items = [] for item in self.repository.list_video_models().filter(VideoModel.status == 1).all(): pricing = self.repository.active_pricing_rule(item.id) items.append(self.serialize_model(item, pricing)) return items def list_admin_models(self) -> list[dict]: return [ self.serialize_model(item, self.repository.active_pricing_rule(item.id)) for item in self.repository.list_video_models().all() ] def create_model(self, payload) -> dict: item = VideoModel(**payload.model_dump()) self.db.add(item) self.db.commit() self.db.refresh(item) return self.serialize_model(item, None) def update_model(self, model_id: int, payload) -> dict: item = self.repository.get_video_model(model_id) if not item: raise NotFoundAppError("video model not found", code=50001) for key, value in payload.model_dump().items(): setattr(item, key, value) self.db.commit() return self.serialize_model(item, self.repository.active_pricing_rule(item.id)) def list_bindings(self) -> list[dict]: provider_models = self.repository.provider_models() video_models = { item.id: item for item in self.repository.list_video_models().all() } return [ self.serialize_binding(item, provider_models, video_models) for item in self.repository.list_bindings().all() ] def create_binding(self, payload) -> dict: item = VideoModelSupplierBinding(**payload.model_dump()) self.db.add(item) self.db.commit() self.db.refresh(item) return self._serialize_binding_single(item) def update_binding(self, binding_id: int, payload) -> dict: item = self.repository.get_binding(binding_id) if not item: raise NotFoundAppError("binding not found", code=60003) for key, value in payload.model_dump().items(): setattr(item, key, value) self.db.commit() return self._serialize_binding_single(item) @staticmethod def serialize_model(item: VideoModel, pricing) -> dict: return { "id": item.id, "modelKey": item.model_key, "modelName": item.model_name, "frontendTitle": item.frontend_title, "frontendDescription": item.frontend_description, "defaultDurationSeconds": item.default_duration_seconds, "defaultRatio": item.default_ratio, "defaultResolution": item.default_resolution, "status": item.status, "sortOrder": item.sort_order, "pricing": { "pointsPerSecond": pricing.points_per_second if pricing else 0, "minimumPoints": pricing.minimum_points if pricing else 0, }, } @staticmethod def serialize_binding( item: VideoModelSupplierBinding, provider_models: dict[int, ProviderModel], video_models: dict[int, VideoModel], ) -> dict: provider_model = provider_models.get(item.provider_model_id) video_model = video_models.get(item.video_model_id) return { "id": item.id, "videoModelId": item.video_model_id, "videoModelName": video_model.model_name if video_model else "", "providerModelId": item.provider_model_id, "providerModelName": provider_model.model_name if provider_model else "", "routingPriority": item.routing_priority, "isPrimary": item.is_primary, "status": item.status, "timeoutSecondsOverride": item.timeout_seconds_override, } def _serialize_binding_single(self, item: VideoModelSupplierBinding) -> dict: provider_models = self.repository.provider_models() video_models = { row.id: row for row in self.repository.list_video_models().all() } return self.serialize_binding(item, provider_models, video_models)