import logging import time import uuid from decimal import Decimal from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from open_webui.env import GLOBAL_LOG_LEVEL from open_webui.models.subscriptions import ( SubscriptionPlanModel, SubscriptionPlanForm, UpdateSubscriptionPlanForm, UserSubscriptionWithPlanModel, UpdateUserSubscriptionForm, SubscriptionUsageLogModel, RedemptionCodeModel, CreateRedemptionCodeForm, UpdateRedemptionCodeForm, SubscriptionPlans, UserSubscriptions, SubscriptionUsageLogs, RedemptionCodes, ) from open_webui.models.users import UserModel, Users from open_webui.utils.auth import get_verified_user, get_admin_user log = logging.getLogger(__name__) log.setLevel(GLOBAL_LOG_LEVEL) router = APIRouter() PAGE_ITEM_COUNT = 30 #################### # Subscription Plans API #################### @router.get("/plans", response_model=list[SubscriptionPlanModel]) async def get_subscription_plans( include_inactive: bool = False, user: UserModel = Depends(get_verified_user), ): """ Get all subscription plans. """ # only admin can see inactive plans if include_inactive and user.role != "admin": include_inactive = False return SubscriptionPlans.get_all_plans(include_inactive=include_inactive) @router.get("/plans/{plan_id}", response_model=SubscriptionPlanModel) async def get_subscription_plan( plan_id: str, _: UserModel = Depends(get_verified_user), ): """ Get a specific subscription plan. """ plan = SubscriptionPlans.get_plan_by_id(plan_id) if not plan: raise HTTPException(status_code=404, detail="Plan not found") return plan @router.post("/plans", response_model=SubscriptionPlanModel) async def create_subscription_plan( form_data: SubscriptionPlanForm, _: UserModel = Depends(get_admin_user), ): """ Create a new subscription plan (admin only). """ existing = SubscriptionPlans.get_plan_by_id(form_data.id) if existing: raise HTTPException(status_code=400, detail="Plan ID already exists") return SubscriptionPlans.create_plan(form_data) @router.put("/plans/{plan_id}", response_model=SubscriptionPlanModel) async def update_subscription_plan( plan_id: str, form_data: UpdateSubscriptionPlanForm, _: UserModel = Depends(get_admin_user), ): """ Update a subscription plan (admin only). """ plan = SubscriptionPlans.update_plan(plan_id, form_data) if not plan: raise HTTPException(status_code=404, detail="Plan not found") return plan @router.delete("/plans/{plan_id}") async def delete_subscription_plan( plan_id: str, _: UserModel = Depends(get_admin_user), ): """ Delete a subscription plan (admin only). """ plan = SubscriptionPlans.get_plan_by_id(plan_id) if not plan: raise HTTPException(status_code=404, detail="Plan not found") if plan.is_default: raise HTTPException(status_code=400, detail="Cannot delete default plan") if not SubscriptionPlans.delete_plan(plan_id): raise HTTPException(status_code=500, detail="Failed to delete plan") return {"success": True} #################### # User Subscription API #################### @router.get("/me", response_model=UserSubscriptionWithPlanModel) async def get_my_subscription( user: UserModel = Depends(get_verified_user), ): """ Get current user's subscription status. """ subscription = UserSubscriptions.get_with_plan(user.id) if not subscription: # initialize free subscription UserSubscriptions.init_free_subscription(user.id) subscription = UserSubscriptions.get_with_plan(user.id) if not subscription: raise HTTPException(status_code=500, detail="Failed to get subscription") return subscription class UsageResponse(BaseModel): total: int results: list[SubscriptionUsageLogModel] @router.get("/me/usage", response_model=UsageResponse) async def get_my_usage( page: int = 1, limit: int = PAGE_ITEM_COUNT, user: UserModel = Depends(get_verified_user), ): """ Get current user's usage history. """ offset = (page - 1) * limit total, logs = SubscriptionUsageLogs.get_by_user_id(user.id, offset=offset, limit=limit) return UsageResponse(total=total, results=logs) class RedeemRequest(BaseModel): code: str @router.post("/redeem", response_model=UserSubscriptionWithPlanModel) async def redeem_code( form_data: RedeemRequest, user: UserModel = Depends(get_verified_user), ): """ Redeem a redemption code. """ RedemptionCodes.redeem_code(form_data.code, user.id) return UserSubscriptions.get_with_plan(user.id) #################### # Admin Subscription Management API #################### class AdminSubscriptionsResponse(BaseModel): total: int results: list[UserSubscriptionWithPlanModel] @router.get("/admin/subscriptions", response_model=AdminSubscriptionsResponse) async def get_all_subscriptions( query: Optional[str] = None, page: int = 1, limit: int = PAGE_ITEM_COUNT, _: UserModel = Depends(get_admin_user), ): """ Get all user subscriptions (admin only). """ offset = (page - 1) * limit total, subs = UserSubscriptions.get_all_subscriptions(query=query, offset=offset, limit=limit) # add username to results user_ids = [sub.user_id for sub in subs] users = Users.get_users_by_user_ids(user_ids) user_map = {u.id: u.name for u in users} for sub in subs: setattr(sub, "username", user_map.get(sub.user_id, "")) return AdminSubscriptionsResponse(total=total, results=subs) @router.get("/admin/subscriptions/{user_id}", response_model=UserSubscriptionWithPlanModel) async def get_user_subscription( user_id: str, _: UserModel = Depends(get_admin_user), ): """ Get a specific user's subscription (admin only). """ subscription = UserSubscriptions.get_with_plan(user_id) if not subscription: raise HTTPException(status_code=404, detail="Subscription not found") return subscription @router.put("/admin/subscriptions/{user_id}", response_model=UserSubscriptionWithPlanModel) async def update_user_subscription( user_id: str, form_data: UpdateUserSubscriptionForm, _: UserModel = Depends(get_admin_user), ): """ Update a user's subscription (admin only). """ existing = UserSubscriptions.get_by_user_id(user_id) if not existing: raise HTTPException(status_code=404, detail="Subscription not found") if form_data.plan_id: plan = SubscriptionPlans.get_plan_by_id(form_data.plan_id) if not plan: raise HTTPException(status_code=400, detail="Invalid plan ID") UserSubscriptions.update_subscription(user_id, form_data) return UserSubscriptions.get_with_plan(user_id) #################### # Redemption Codes API #################### class RedemptionCodesResponse(BaseModel): total: int results: list[RedemptionCodeModel] @router.get("/redemption_codes", response_model=RedemptionCodesResponse) async def get_redemption_codes( keyword: Optional[str] = None, page: int = 1, limit: int = PAGE_ITEM_COUNT, _: UserModel = Depends(get_admin_user), ): """ Get all redemption codes (admin only). """ offset = (page - 1) * limit total, codes = RedemptionCodes.get_codes(keyword=keyword, offset=offset, limit=limit) # add username to results user_ids = {code.user_id for code in codes if code.user_id} if user_ids: users = Users.get_users_by_user_ids(user_ids) user_map = {u.id: u.name for u in users} for code in codes: if code.user_id: setattr(code, "username", user_map.get(code.user_id, "")) return RedemptionCodesResponse(total=total, results=codes) class CreateRedemptionCodesResponse(BaseModel): total: int codes: list[str] @router.post("/redemption_codes", response_model=CreateRedemptionCodesResponse) async def create_redemption_codes( form_data: CreateRedemptionCodeForm, _: UserModel = Depends(get_admin_user), ): """ Create redemption codes (admin only). """ # validate plan exists plan = SubscriptionPlans.get_plan_by_id(form_data.plan_id) if not plan: raise HTTPException(status_code=400, detail="Invalid plan ID") # validate form data if form_data.redemption_type == "duration": if not form_data.duration_days: raise HTTPException(status_code=400, detail="duration_days is required for duration type") elif form_data.redemption_type == "upgrade": if not form_data.upgrade_days: raise HTTPException(status_code=400, detail="upgrade_days is required for upgrade type") else: raise HTTPException(status_code=400, detail="Invalid redemption_type") now = int(time.time()) # check expiration if form_data.expired_at and form_data.expired_at < now: raise HTTPException(status_code=400, detail="Expiration time must be in the future") codes = [] for _ in range(form_data.count): code = RedemptionCodeModel( code=uuid.uuid4().hex[:16].upper(), purpose=form_data.purpose, redemption_type=form_data.redemption_type, plan_id=form_data.plan_id, duration_days=form_data.duration_days, upgrade_days=form_data.upgrade_days, expired_at=form_data.expired_at, created_at=now, ) codes.append(code) RedemptionCodes.insert_codes(codes) return CreateRedemptionCodesResponse( total=len(codes), codes=[code.code for code in codes] ) @router.put("/redemption_codes/{code}", response_model=RedemptionCodeModel) async def update_redemption_code( code: str, form_data: UpdateRedemptionCodeForm, _: UserModel = Depends(get_admin_user), ): """ Update a redemption code (admin only). """ existing = RedemptionCodes.get_code(code) if not existing: raise HTTPException(status_code=404, detail="Code not found") if existing.received_at: raise HTTPException(status_code=400, detail="Cannot update a code that has been used") updated = RedemptionCodes.update_code(code, form_data) if not updated: raise HTTPException(status_code=500, detail="Failed to update code") return updated @router.delete("/redemption_codes/{code}") async def delete_redemption_code( code: str, _: UserModel = Depends(get_admin_user), ): """ Delete a redemption code (admin only). """ if not RedemptionCodes.delete_code(code): raise HTTPException(status_code=404, detail="Code not found") return {"success": True} #################### # Statistics API #################### class SubscriptionStatsRequest(BaseModel): start_time: int end_time: int query: Optional[str] = None @router.post("/statistics") async def get_subscription_statistics( form_data: SubscriptionStatsRequest, _: UserModel = Depends(get_admin_user), ): """ Get subscription usage statistics (admin only). """ from collections import defaultdict # query user ids if needed user_ids = None if form_data.query: users = Users.get_users(filter={"query": form_data.query})["users"] user_map = {user.id: user.name for user in users} user_ids = list(user_map.keys()) if not user_ids: return { "total_messages": 0, "model_usage": [], "user_usage": [], } else: users = Users.get_users()["users"] user_map = {user.id: user.name for user in users} # get usage logs logs = SubscriptionUsageLogs.get_by_time_range( form_data.start_time, form_data.end_time, user_ids ) # build stats total_messages = len(logs) model_usage = defaultdict(int) user_usage = defaultdict(int) for log in logs: if log.model_id: model_usage[log.model_id] += 1 user_key = f"{log.user_id}:{user_map.get(log.user_id, log.user_id)}" user_usage[user_key] += 1 return { "total_messages": total_messages, "model_usage": [ {"name": model, "value": count} for model, count in model_usage.items() ], "user_usage": [ {"name": user.split(":", 1)[1], "value": count} for user, count in user_usage.items() ], }