Files
aivideo/backend/app/modules/admins/service.py

135 lines
4.8 KiB
Python

from datetime import datetime
from fastapi import Response
from sqlalchemy.orm import Session
from app.common.errors.app_error import AuthenticationError, NotFoundAppError
from app.common.security.jwt import (
clear_auth_cookies,
create_access_token,
create_refresh_token,
set_auth_cookies,
)
from app.common.security.password import verify_password
from app.models.entities import InviteRelation
from app.modules.admins.repository import AdminsRepository
from app.modules.wallets.service import WalletService
class AdminsService:
def __init__(self, db: Session) -> None:
self.db = db
self.repository = AdminsRepository(db)
self.wallet_service = WalletService(db)
def login(self, payload, response: Response) -> dict:
admin = self.repository.get_admin_by_username(payload.username)
if not admin or not verify_password(payload.password, admin.password_hash):
raise AuthenticationError("invalid admin credentials")
admin.last_login_at = datetime.utcnow()
access_token = create_access_token(admin.username, scope="admin")
refresh_token = create_refresh_token(admin.username, scope="admin")
set_auth_cookies(response, access_token, refresh_token, prefix="admin")
self.db.commit()
return self.serialize_admin(admin)
def logout(self, response: Response) -> None:
clear_auth_cookies(response, prefix="admin")
@staticmethod
def serialize_admin(admin) -> dict:
return {
"id": admin.id,
"username": admin.username,
"nickname": admin.nickname,
"isSuperAdmin": admin.is_super_admin,
}
def dashboard(self) -> dict:
total_tasks = self.repository.count_tasks()
success_tasks = self.repository.count_success_tasks()
return {
"users": self.repository.count_users(),
"paidOrders": self.repository.count_paid_orders(),
"tasks": total_tasks,
"successRate": round(success_tasks / total_tasks * 100, 2) if total_tasks else 0,
}
def list_users(self) -> list[dict]:
rows = self.repository.list_users().limit(200).all()
return [
{
"id": item.id,
"publicId": item.public_id,
"username": item.username or "",
"nickname": item.nickname,
"email": item.email or "",
"status": item.status,
"createdAt": item.created_at.isoformat(),
}
for item in rows
]
def get_user_detail(self, user_id: int) -> dict:
user = self.repository.get_user(user_id)
if not user:
raise NotFoundAppError("user not found", code=10020)
wallet = self.wallet_service.get_wallet_summary(user.id)
return {
"id": user.id,
"publicId": user.public_id,
"username": user.username or "",
"nickname": user.nickname,
"email": user.email or "",
"status": user.status,
"wallet": wallet,
}
def manual_adjust_wallet(self, user_id: int, amount_points: int, reason: str) -> dict:
user = self.repository.get_user(user_id)
if not user:
raise NotFoundAppError("user not found", code=10020)
tx = self.wallet_service.add_points(
user.id,
amount_points,
biz_type="manual_adjust",
related_type="user",
related_id=user.id,
remark=reason,
operator_type="admin",
)
self.db.commit()
return {"transactionNo": tx.transaction_no, "amountPoints": amount_points}
def user_invite_relations(self, user_id: int) -> list[dict]:
rows = self.repository.invite_relations().filter(
(InviteRelation.inviter_user_id == user_id) | (InviteRelation.invitee_user_id == user_id)
).all()
return [
{
"id": item.id,
"inviterUserId": item.inviter_user_id,
"inviteeUserId": item.invitee_user_id,
"rewardStatus": item.reward_status,
"rewardPoints": item.reward_points,
"createdAt": item.created_at.isoformat(),
}
for item in rows
]
def list_invite_relations(self) -> list[dict]:
rows = self.repository.invite_relations().limit(200).all()
return [
{
"id": item.id,
"inviterUserId": item.inviter_user_id,
"inviteeUserId": item.invitee_user_id,
"rewardStatus": item.reward_status,
"rewardPoints": item.reward_points,
"rewardedAt": item.rewarded_at.isoformat() if item.rewarded_at else None,
"createdAt": item.created_at.isoformat(),
}
for item in rows
]