74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
from fastapi import APIRouter, HTTPException, Depends, Header
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from pydantic import BaseModel
|
|
|
|
from ..core import get_db
|
|
from ..models import Admin
|
|
from ..services import hash_password, verify_password, create_token, verify_token
|
|
|
|
router = APIRouter(prefix="/api/v1/admin", tags=["admin"])
|
|
|
|
|
|
async def get_current_admin(
|
|
authorization: str = Header(None),
|
|
):
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="未授权")
|
|
token = authorization[7:]
|
|
payload = verify_token(token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Token 无效")
|
|
return payload
|
|
|
|
|
|
class AdminLoginRequest(BaseModel):
|
|
username: str
|
|
password: str
|
|
|
|
|
|
class AdminChangePasswordRequest(BaseModel):
|
|
old_password: str
|
|
new_password: str
|
|
|
|
|
|
@router.post("/login")
|
|
async def login(
|
|
data: AdminLoginRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
result = await db.execute(select(Admin).where(Admin.username == data.username))
|
|
admin = result.scalar_one_or_none()
|
|
if not admin or not verify_password(data.password, admin.password_hash):
|
|
raise HTTPException(status_code=401, detail="用户名或密码错误")
|
|
token = create_token({"sub": admin.username, "id": admin.id})
|
|
return {"token": token, "username": admin.username}
|
|
|
|
|
|
@router.get("/me")
|
|
async def me(payload: dict = Depends(get_current_admin)):
|
|
return {"id": payload.get("id"), "username": payload.get("sub"), "exp": payload.get("exp")}
|
|
|
|
|
|
@router.post("/change-password")
|
|
async def change_password(
|
|
data: AdminChangePasswordRequest,
|
|
payload: dict = Depends(get_current_admin),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
admin_id = payload.get("id")
|
|
if not admin_id:
|
|
raise HTTPException(status_code=401, detail="未授权")
|
|
|
|
result = await db.execute(select(Admin).where(Admin.id == admin_id))
|
|
admin = result.scalar_one_or_none()
|
|
if not admin or not verify_password(data.old_password, admin.password_hash):
|
|
raise HTTPException(status_code=400, detail="原密码错误")
|
|
|
|
if len(data.new_password) < 8:
|
|
raise HTTPException(status_code=400, detail="新密码至少 8 位")
|
|
|
|
admin.password_hash = hash_password(data.new_password)
|
|
await db.commit()
|
|
return {"success": True}
|