Files
GMG-Smart-Quote/backend/app/api/auth.py
T
2026-05-04 14:42:16 +02:00

70 lines
2.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.deps import get_db, get_current_user
from app.core.security import verify_password, create_access_token, create_refresh_token, decode_token
from app.models.user import User
from app.schemas.auth import LoginRequest, TokenResponse, RefreshRequest, UserOut
router = APIRouter()
@router.post("/login", response_model=TokenResponse)
async def login(payload: LoginRequest, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == payload.email))
user = result.scalar_one_or_none()
if not user or not verify_password(payload.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Credenziali non valide",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account disabilitato",
)
access_token = create_access_token(
subject=user.id,
extra_claims={"role": user.role.value, "group_id": user.group_id},
)
refresh_token = create_refresh_token(subject=user.id)
return TokenResponse(access_token=access_token, refresh_token=refresh_token)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(payload: RefreshRequest, db: AsyncSession = Depends(get_db)):
token_data = decode_token(payload.refresh_token)
if not token_data or token_data.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token non valido o scaduto",
)
user_id = token_data.get("sub")
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Utente non trovato o disabilitato",
)
access_token = create_access_token(
subject=user.id,
extra_claims={"role": user.role.value, "group_id": user.group_id},
)
new_refresh_token = create_refresh_token(subject=user.id)
return TokenResponse(access_token=access_token, refresh_token=new_refresh_token)
@router.get("/me", response_model=UserOut)
async def get_me(current_user: User = Depends(get_current_user)):
return current_user