50 lines
1.7 KiB
Python
50 lines
1.7 KiB
Python
from datetime import datetime
|
|
|
|
from sqlalchemy import and_, or_, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.entities import PricingRule, ProviderModel, VideoModel, VideoModelSupplierBinding
|
|
|
|
|
|
class VideoModelsRepository:
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
|
|
def list_video_models(self):
|
|
return self.db.query(VideoModel).order_by(VideoModel.sort_order.asc(), VideoModel.id.asc())
|
|
|
|
def get_video_model(self, model_id: int) -> VideoModel | None:
|
|
return self.db.scalar(select(VideoModel).where(VideoModel.id == model_id))
|
|
|
|
def list_bindings(self):
|
|
return (
|
|
self.db.query(VideoModelSupplierBinding)
|
|
.order_by(
|
|
VideoModelSupplierBinding.video_model_id.asc(),
|
|
VideoModelSupplierBinding.routing_priority.asc(),
|
|
)
|
|
)
|
|
|
|
def get_binding(self, binding_id: int) -> VideoModelSupplierBinding | None:
|
|
return self.db.scalar(
|
|
select(VideoModelSupplierBinding).where(VideoModelSupplierBinding.id == binding_id)
|
|
)
|
|
|
|
def active_pricing_rule(self, video_model_id: int) -> PricingRule | None:
|
|
now = datetime.utcnow()
|
|
return self.db.scalar(
|
|
select(PricingRule)
|
|
.where(
|
|
PricingRule.video_model_id == video_model_id,
|
|
PricingRule.status == 1,
|
|
PricingRule.effective_at <= now,
|
|
or_(PricingRule.expired_at.is_(None), PricingRule.expired_at > now),
|
|
)
|
|
.order_by(PricingRule.version_no.desc(), PricingRule.id.desc())
|
|
)
|
|
|
|
def provider_models(self) -> dict[int, ProviderModel]:
|
|
rows = self.db.scalars(select(ProviderModel)).all()
|
|
return {row.id: row for row in rows}
|
|
|