mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
195 lines
5.3 KiB
Python
195 lines
5.3 KiB
Python
"""
|
||
Router autenticazione – login, refresh, logout, 2FA TOTP.
|
||
|
||
Endpoint:
|
||
POST /api/v1/auth/login → access + refresh token
|
||
POST /api/v1/auth/refresh → rinnova token
|
||
POST /api/v1/auth/logout → revoca refresh token
|
||
GET /api/v1/auth/me → utente corrente
|
||
POST /api/v1/auth/totp/setup → genera segreto TOTP + QR
|
||
POST /api/v1/auth/totp/verify → verifica e attiva TOTP
|
||
POST /api/v1/auth/totp/disable → disabilita TOTP
|
||
POST /api/v1/auth/change-password → cambio password
|
||
"""
|
||
|
||
from typing import Annotated
|
||
|
||
from fastapi import APIRouter, Depends, Request
|
||
from slowapi import Limiter
|
||
from slowapi.util import get_remote_address
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.config import get_settings
|
||
from app.core.exceptions import InvalidCredentialsError
|
||
from app.database import get_db
|
||
from app.dependencies import CurrentUser, DB
|
||
from app.schemas.auth import (
|
||
LoginRequest,
|
||
PasswordChangeRequest,
|
||
RefreshRequest,
|
||
TOTPSetupResponse,
|
||
TOTPStatusResponse,
|
||
TOTPVerifyRequest,
|
||
TokenResponse,
|
||
)
|
||
from app.schemas.user import UserResponse
|
||
from app.services.auth_service import AuthService
|
||
|
||
settings = get_settings()
|
||
router = APIRouter(prefix="/auth", tags=["Autenticazione"])
|
||
limiter = Limiter(key_func=get_remote_address)
|
||
|
||
|
||
@router.post(
|
||
"/login",
|
||
response_model=TokenResponse,
|
||
summary="Login con email e password",
|
||
description="Autentica l'utente. Se 2FA è attivo, richiede anche il codice TOTP.",
|
||
)
|
||
async def login(
|
||
request: Request,
|
||
body: LoginRequest,
|
||
db: DB,
|
||
) -> TokenResponse:
|
||
ip = request.client.host if request.client else None
|
||
ua = request.headers.get("user-agent")
|
||
|
||
service = AuthService(db)
|
||
access_token, refresh_token = await service.login(
|
||
email=body.email,
|
||
password=body.password,
|
||
totp_code=body.totp_code,
|
||
ip_address=ip,
|
||
user_agent=ua,
|
||
)
|
||
|
||
return TokenResponse(
|
||
access_token=access_token,
|
||
refresh_token=refresh_token,
|
||
expires_in=settings.access_token_expire_minutes * 60,
|
||
)
|
||
|
||
|
||
@router.post(
|
||
"/refresh",
|
||
response_model=TokenResponse,
|
||
summary="Rinnova access token",
|
||
)
|
||
async def refresh_tokens(
|
||
body: RefreshRequest,
|
||
db: DB,
|
||
) -> TokenResponse:
|
||
service = AuthService(db)
|
||
access_token, refresh_token = await service.refresh_tokens(body.refresh_token)
|
||
|
||
return TokenResponse(
|
||
access_token=access_token,
|
||
refresh_token=refresh_token,
|
||
expires_in=settings.access_token_expire_minutes * 60,
|
||
)
|
||
|
||
|
||
@router.post(
|
||
"/logout",
|
||
status_code=204,
|
||
summary="Logout – revoca refresh token",
|
||
)
|
||
async def logout(
|
||
body: RefreshRequest,
|
||
db: DB,
|
||
) -> None:
|
||
service = AuthService(db)
|
||
await service.logout(body.refresh_token)
|
||
|
||
|
||
@router.get(
|
||
"/me",
|
||
response_model=UserResponse,
|
||
summary="Utente corrente autenticato",
|
||
)
|
||
async def me(current_user: CurrentUser) -> UserResponse:
|
||
return UserResponse.model_validate(current_user)
|
||
|
||
|
||
@router.post(
|
||
"/totp/setup",
|
||
response_model=TOTPSetupResponse,
|
||
summary="Avvia setup 2FA TOTP",
|
||
description="Genera segreto TOTP e QR code. Il 2FA viene attivato solo dopo la verifica.",
|
||
)
|
||
async def totp_setup(
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> TOTPSetupResponse:
|
||
service = AuthService(db)
|
||
data = await service.setup_totp(current_user)
|
||
return TOTPSetupResponse(**data)
|
||
|
||
|
||
@router.post(
|
||
"/totp/verify",
|
||
response_model=TOTPStatusResponse,
|
||
summary="Verifica codice TOTP e attiva 2FA",
|
||
)
|
||
async def totp_verify(
|
||
body: TOTPVerifyRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> TOTPStatusResponse:
|
||
service = AuthService(db)
|
||
await service.verify_and_enable_totp(current_user, body.totp_code)
|
||
return TOTPStatusResponse(totp_enabled=True)
|
||
|
||
|
||
@router.post(
|
||
"/totp/disable",
|
||
response_model=TOTPStatusResponse,
|
||
summary="Disabilita 2FA TOTP",
|
||
)
|
||
async def totp_disable(
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> TOTPStatusResponse:
|
||
service = AuthService(db)
|
||
await service.disable_totp(current_user)
|
||
return TOTPStatusResponse(totp_enabled=False)
|
||
|
||
|
||
@router.post(
|
||
"/change-password",
|
||
status_code=204,
|
||
summary="Cambio password utente corrente",
|
||
)
|
||
async def change_password(
|
||
request: Request,
|
||
body: PasswordChangeRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> None:
|
||
from app.core.security import verify_password, hash_password
|
||
from app.services.audit_service import log_audit
|
||
|
||
if not verify_password(body.current_password, current_user.password_hash):
|
||
from app.services.audit_service import log_audit as _la
|
||
await _la(
|
||
db,
|
||
"auth.password_changed",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
outcome="failure",
|
||
ip_address=request.client.host if request.client else None,
|
||
user_agent=request.headers.get("user-agent"),
|
||
payload={"reason": "wrong_current_password"},
|
||
)
|
||
raise InvalidCredentialsError()
|
||
|
||
current_user.password_hash = hash_password(body.new_password)
|
||
await log_audit(
|
||
db,
|
||
"auth.password_changed",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
ip_address=request.client.host if request.client else None,
|
||
user_agent=request.headers.get("user-agent"),
|
||
)
|