Files
2026-06-04 20:54:49 +02:00

196 lines
5.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Router autenticazione login, refresh, logout, 2FA TOTP.
Endpoint:
POST /api/v1/auth/login → access + refresh token
POST /api/v1/auth/refresh → rinnova token
POST /api/v1/auth/logout → revoca refresh token
GET /api/v1/auth/me → utente corrente
POST /api/v1/auth/totp/setup → genera segreto TOTP + QR
POST /api/v1/auth/totp/verify → verifica e attiva TOTP
POST /api/v1/auth/totp/disable → disabilita TOTP
POST /api/v1/auth/change-password → cambio password
"""
from typing import Annotated
from fastapi import APIRouter, Depends, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.core.exceptions import InvalidCredentialsError
from app.database import get_db
from app.dependencies import CurrentUser, DB
from app.schemas.auth import (
LoginRequest,
PasswordChangeRequest,
RefreshRequest,
TOTPSetupResponse,
TOTPStatusResponse,
TOTPVerifyRequest,
TokenResponse,
)
from app.schemas.user import UserResponse
from app.services.audit_service import get_real_ip
from app.services.auth_service import AuthService
settings = get_settings()
router = APIRouter(prefix="/auth", tags=["Autenticazione"])
limiter = Limiter(key_func=get_remote_address)
@router.post(
"/login",
response_model=TokenResponse,
summary="Login con email e password",
description="Autentica l'utente. Se 2FA è attivo, richiede anche il codice TOTP.",
)
async def login(
request: Request,
body: LoginRequest,
db: DB,
) -> TokenResponse:
ip = get_real_ip(request)
ua = request.headers.get("user-agent")
service = AuthService(db)
access_token, refresh_token = await service.login(
email=body.email,
password=body.password,
totp_code=body.totp_code,
ip_address=ip,
user_agent=ua,
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.access_token_expire_minutes * 60,
)
@router.post(
"/refresh",
response_model=TokenResponse,
summary="Rinnova access token",
)
async def refresh_tokens(
body: RefreshRequest,
db: DB,
) -> TokenResponse:
service = AuthService(db)
access_token, refresh_token = await service.refresh_tokens(body.refresh_token)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.access_token_expire_minutes * 60,
)
@router.post(
"/logout",
status_code=204,
summary="Logout revoca refresh token",
)
async def logout(
body: RefreshRequest,
db: DB,
) -> None:
service = AuthService(db)
await service.logout(body.refresh_token)
@router.get(
"/me",
response_model=UserResponse,
summary="Utente corrente autenticato",
)
async def me(current_user: CurrentUser) -> UserResponse:
return UserResponse.model_validate(current_user)
@router.post(
"/totp/setup",
response_model=TOTPSetupResponse,
summary="Avvia setup 2FA TOTP",
description="Genera segreto TOTP e QR code. Il 2FA viene attivato solo dopo la verifica.",
)
async def totp_setup(
current_user: CurrentUser,
db: DB,
) -> TOTPSetupResponse:
service = AuthService(db)
data = await service.setup_totp(current_user)
return TOTPSetupResponse(**data)
@router.post(
"/totp/verify",
response_model=TOTPStatusResponse,
summary="Verifica codice TOTP e attiva 2FA",
)
async def totp_verify(
body: TOTPVerifyRequest,
current_user: CurrentUser,
db: DB,
) -> TOTPStatusResponse:
service = AuthService(db)
await service.verify_and_enable_totp(current_user, body.totp_code)
return TOTPStatusResponse(totp_enabled=True)
@router.post(
"/totp/disable",
response_model=TOTPStatusResponse,
summary="Disabilita 2FA TOTP",
)
async def totp_disable(
current_user: CurrentUser,
db: DB,
) -> TOTPStatusResponse:
service = AuthService(db)
await service.disable_totp(current_user)
return TOTPStatusResponse(totp_enabled=False)
@router.post(
"/change-password",
status_code=204,
summary="Cambio password utente corrente",
)
async def change_password(
request: Request,
body: PasswordChangeRequest,
current_user: CurrentUser,
db: DB,
) -> None:
from app.core.security import verify_password, hash_password
from app.services.audit_service import log_audit
if not verify_password(body.current_password, current_user.password_hash):
from app.services.audit_service import log_audit as _la
await _la(
db,
"auth.password_changed",
tenant_id=current_user.tenant_id,
user_id=current_user.id,
outcome="failure",
ip_address=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
payload={"reason": "wrong_current_password"},
)
raise InvalidCredentialsError()
current_user.password_hash = hash_password(body.new_password)
await log_audit(
db,
"auth.password_changed",
tenant_id=current_user.tenant_id,
user_id=current_user.id,
ip_address=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
)