import asyncio import json from datetime import datetime import redis.asyncio as redis from ..core import get_settings settings = get_settings() class StatsService: def __init__(self): self.redis: redis.Redis | None = None async def connect(self): self.redis = redis.from_url(settings.redis_url) async def disconnect(self): if self.redis: await self.redis.close() def _get_key(self, provider_id: int, date: str, hour: int) -> str: return f"stats:{provider_id}:{date}:{hour}" async def record_request( self, provider_id: int, input_tokens: int, output_tokens: int, cached: bool = False, error: bool = False, ): if not self.redis: return try: now = datetime.utcnow() date = now.strftime("%Y-%m-%d") hour = now.hour key = self._get_key(provider_id, date, hour) pipe = self.redis.pipeline() pipe.hincrby(key, "request_count", 1) pipe.hincrby(key, "input_tokens", input_tokens) pipe.hincrby(key, "output_tokens", output_tokens) if cached: pipe.hincrby(key, "cached_count", 1) if error: pipe.hincrby(key, "error_count", 1) pipe.expire(key, 86400 * 30) await pipe.execute() except asyncio.CancelledError: return except Exception: return async def get_stats(self, provider_id: int, date: str) -> dict: if not self.redis: return {} result = { "date": date, "request_count": 0, "input_tokens": 0, "output_tokens": 0, "cached_count": 0, "error_count": 0, "hourly": [], } for hour in range(24): key = self._get_key(provider_id, date, hour) try: data = await self.redis.hgetall(key) except asyncio.CancelledError: return result except Exception: data = {} hourly = { "hour": hour, "request_count": int(data.get(b"request_count", 0)), "input_tokens": int(data.get(b"input_tokens", 0)), "output_tokens": int(data.get(b"output_tokens", 0)), "cached_count": int(data.get(b"cached_count", 0)), "error_count": int(data.get(b"error_count", 0)), } result["hourly"].append(hourly) for k in ["request_count", "input_tokens", "output_tokens", "cached_count", "error_count"]: result[k] += hourly[k] return result async def get_rpm_tpm(self, provider_id: int) -> dict: if not self.redis: return {"rpm": 0, "tpm": 0} try: now = datetime.utcnow() minute_key = f"rpm:{provider_id}:{now.strftime('%Y-%m-%d-%H-%M')}" rpm = int(await self.redis.get(minute_key) or 0) tpm_key = f"tpm:{provider_id}:{now.strftime('%Y-%m-%d-%H-%M')}" tpm = int(await self.redis.get(tpm_key) or 0) return {"rpm": rpm, "tpm": tpm} except asyncio.CancelledError: return {"rpm": 0, "tpm": 0} except Exception: return {"rpm": 0, "tpm": 0} async def incr_rpm_tpm(self, provider_id: int, tokens: int): if not self.redis: return try: now = datetime.utcnow() minute_key = f"rpm:{provider_id}:{now.strftime('%Y-%m-%d-%H-%M')}" tpm_key = f"tpm:{provider_id}:{now.strftime('%Y-%m-%d-%H-%M')}" pipe = self.redis.pipeline() pipe.incr(minute_key) pipe.expire(minute_key, 120) pipe.incrby(tpm_key, tokens) pipe.expire(tpm_key, 120) await pipe.execute() except asyncio.CancelledError: return except Exception: return stats_service = StatsService()