from datetime import datetime from sqlalchemy import and_, or_, select from sqlalchemy.orm import Session from app.models.entities import PricingRule, ProviderModel, VideoModel, VideoModelSupplierBinding class VideoModelsRepository: def __init__(self, db: Session) -> None: self.db = db def list_video_models(self): return self.db.query(VideoModel).order_by(VideoModel.sort_order.asc(), VideoModel.id.asc()) def get_video_model(self, model_id: int) -> VideoModel | None: return self.db.scalar(select(VideoModel).where(VideoModel.id == model_id)) def list_bindings(self): return ( self.db.query(VideoModelSupplierBinding) .order_by( VideoModelSupplierBinding.video_model_id.asc(), VideoModelSupplierBinding.routing_priority.asc(), ) ) def get_binding(self, binding_id: int) -> VideoModelSupplierBinding | None: return self.db.scalar( select(VideoModelSupplierBinding).where(VideoModelSupplierBinding.id == binding_id) ) def active_pricing_rule(self, video_model_id: int) -> PricingRule | None: now = datetime.utcnow() return self.db.scalar( select(PricingRule) .where( PricingRule.video_model_id == video_model_id, PricingRule.status == 1, PricingRule.effective_at <= now, or_(PricingRule.expired_at.is_(None), PricingRule.expired_at > now), ) .order_by(PricingRule.version_no.desc(), PricingRule.id.desc()) ) def provider_models(self) -> dict[int, ProviderModel]: rows = self.db.scalars(select(ProviderModel)).all() return {row.id: row for row in rows}