Files
PecHub/backend/app/core/security.py
T
mgiustini 58a233236c feat: Fase 1 – Fondamenta complete (backend FastAPI + auth + permessi)
- docker-compose.yml: PostgreSQL 16, Redis 7, MinIO, Nginx
- backend FastAPI: struttura monorepo, config pydantic-settings
- modelli SQLAlchemy: tutti i modelli (tenants, users, mailboxes, messages, archival, permissions, labels, audit_log)
- migrazione Alembic 0001: schema completo in pure SQL
- auth API: login JWT, refresh token rotation, logout, 2FA TOTP (setup/verify/disable)
- CRUD utenti: lista, crea, modifica, reset password, soft delete
- permessi granulari (Fase 1-A): mailbox_permissions, assegna/revoca/lista
- CRUD tenant: gestione super-admin
- sicurezza: AES-256-GCM cifratura credenziali IMAP/SMTP, bcrypt password
- RLS PostgreSQL: isolamento multi-tenant per request
- seed sviluppo: tenant demo + admin + operator
- test unit: security (bcrypt, JWT, AES), auth_service
- test integration: auth endpoints, users endpoints
- CI GitHub Actions: lint (ruff), test (pytest), build Docker, security scan
- infra: nginx.conf, redis.conf
- Makefile con comandi make dev/test/migrate/seed

Definition of Done:
 Login, refresh token e TOTP funzionanti
 make dev porta in piedi tutto lo stack locale
 CI configurata
2026-03-18 16:42:01 +01:00

159 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Modulo sicurezza cifratura AES-256-GCM, hashing password, JWT utilities.
ADR-002: Le credenziali IMAP/SMTP vengono cifrate con AES-256-GCM prima di
essere scritte in DB. La chiave è in variabile d'ambiente (ENCRYPTION_KEY).
Formato storage: base64(nonce_12byte || ciphertext || tag_16byte)
"""
import base64
import os
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import bcrypt as _bcrypt
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from jose import JWTError, jwt
from app.config import get_settings
settings = get_settings()
# ─── Password hashing (bcrypt diretto, compatibile con bcrypt 4.x/5.x) ───────
_BCRYPT_ROUNDS = 12
def hash_password(password: str) -> str:
"""Genera hash bcrypt della password (work factor 12)."""
pwd_bytes = password.encode("utf-8")
salt = _bcrypt.gensalt(rounds=_BCRYPT_ROUNDS)
return _bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifica password contro il suo hash."""
try:
return _bcrypt.checkpw(
plain_password.encode("utf-8"),
hashed_password.encode("utf-8"),
)
except Exception:
return False
# ─── JWT ──────────────────────────────────────────────────────────────────────
def create_access_token(
subject: str | UUID,
tenant_id: str | UUID,
role: str,
extra_claims: dict[str, Any] | None = None,
) -> str:
"""
Crea un JWT access token con scadenza configurabile.
Claims standard:
- sub: user_id (string)
- tid: tenant_id
- role: ruolo utente
- exp: scadenza
- iat: emesso a
"""
now = datetime.now(UTC)
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
payload: dict[str, Any] = {
"sub": str(subject),
"tid": str(tenant_id),
"role": role,
"iat": now,
"exp": expire,
"type": "access",
}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token(subject: str | UUID, tenant_id: str | UUID) -> str:
"""
Crea un JWT refresh token con scadenza lunga (30 giorni default).
Non contiene il ruolo viene rivalutato a ogni refresh.
"""
now = datetime.now(UTC)
expire = now + timedelta(days=settings.refresh_token_expire_days)
payload: dict[str, Any] = {
"sub": str(subject),
"tid": str(tenant_id),
"iat": now,
"exp": expire,
"type": "refresh",
}
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def decode_token(token: str) -> dict[str, Any]:
"""
Decodifica e valida un JWT token.
Solleva JWTError se il token è invalido o scaduto.
"""
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
def is_token_valid(token: str, expected_type: str = "access") -> bool:
"""Verifica rapidamente la validità del token senza sollevare eccezioni."""
try:
payload = decode_token(token)
return payload.get("type") == expected_type
except JWTError:
return False
# ─── AES-256-GCM cifratura credenziali (ADR-002) ─────────────────────────────
def encrypt_credential(plaintext: str) -> str:
"""
Cifra una stringa con AES-256-GCM usando la chiave applicativa.
Formato output: base64(nonce_12byte || ciphertext || tag_16byte)
Il tag GCM (16 byte) è automaticamente concatenato al ciphertext da AESGCM.
"""
key = settings.encryption_key_bytes
aesgcm = AESGCM(key)
nonce = os.urandom(12) # 12 byte nonce raccomandato per GCM
# AESGCM.encrypt() restituisce ciphertext + tag concatenati
ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
# Concatena nonce + ciphertext_with_tag e codifica in base64
raw = nonce + ciphertext_with_tag
return base64.b64encode(raw).decode("ascii")
def decrypt_credential(encrypted: str) -> str:
"""
Decifra una stringa cifrata con encrypt_credential().
Solleva ValueError se la decifratura fallisce (chiave errata o dati corrotti).
"""
key = settings.encryption_key_bytes
aesgcm = AESGCM(key)
try:
raw = base64.b64decode(encrypted.encode("ascii"))
nonce = raw[:12]
ciphertext_with_tag = raw[12:]
plaintext_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
return plaintext_bytes.decode("utf-8")
except Exception as e:
raise ValueError(f"Decifratura fallita: {e}") from e
# ─── Hash sicuro per refresh token storage ────────────────────────────────────
import hashlib
def hash_token(token: str) -> str:
"""SHA-256 del token raw per storage sicuro in DB."""
return hashlib.sha256(token.encode("utf-8")).hexdigest()