""" Modulo sicurezza – cifratura AES-256-GCM, hashing password, JWT utilities. ADR-002: Le credenziali IMAP/SMTP vengono cifrate con AES-256-GCM prima di essere scritte in DB. La chiave è in variabile d'ambiente (ENCRYPTION_KEY). Formato storage: base64(nonce_12byte || ciphertext || tag_16byte) """ import base64 import os from datetime import UTC, datetime, timedelta from typing import Any from uuid import UUID import bcrypt as _bcrypt from cryptography.hazmat.primitives.ciphers.aead import AESGCM from jose import JWTError, jwt from app.config import get_settings settings = get_settings() # ─── Password hashing (bcrypt diretto, compatibile con bcrypt 4.x/5.x) ─────── _BCRYPT_ROUNDS = 12 def hash_password(password: str) -> str: """Genera hash bcrypt della password (work factor 12).""" pwd_bytes = password.encode("utf-8") salt = _bcrypt.gensalt(rounds=_BCRYPT_ROUNDS) return _bcrypt.hashpw(pwd_bytes, salt).decode("utf-8") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verifica password contro il suo hash.""" try: return _bcrypt.checkpw( plain_password.encode("utf-8"), hashed_password.encode("utf-8"), ) except Exception: return False # ─── JWT ────────────────────────────────────────────────────────────────────── def create_access_token( subject: str | UUID, tenant_id: str | UUID, role: str, extra_claims: dict[str, Any] | None = None, ) -> str: """ Crea un JWT access token con scadenza configurabile. Claims standard: - sub: user_id (string) - tid: tenant_id - role: ruolo utente - exp: scadenza - iat: emesso a """ now = datetime.now(UTC) expire = now + timedelta(minutes=settings.access_token_expire_minutes) payload: dict[str, Any] = { "sub": str(subject), "tid": str(tenant_id), "role": role, "iat": now, "exp": expire, "type": "access", } if extra_claims: payload.update(extra_claims) return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def create_refresh_token(subject: str | UUID, tenant_id: str | UUID) -> str: """ Crea un JWT refresh token con scadenza lunga (30 giorni default). Non contiene il ruolo – viene rivalutato a ogni refresh. """ now = datetime.now(UTC) expire = now + timedelta(days=settings.refresh_token_expire_days) payload: dict[str, Any] = { "sub": str(subject), "tid": str(tenant_id), "iat": now, "exp": expire, "type": "refresh", } return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def decode_token(token: str) -> dict[str, Any]: """ Decodifica e valida un JWT token. Solleva JWTError se il token è invalido o scaduto. """ return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) def is_token_valid(token: str, expected_type: str = "access") -> bool: """Verifica rapidamente la validità del token senza sollevare eccezioni.""" try: payload = decode_token(token) return payload.get("type") == expected_type except JWTError: return False # ─── AES-256-GCM cifratura credenziali (ADR-002) ───────────────────────────── def encrypt_credential(plaintext: str) -> str: """ Cifra una stringa con AES-256-GCM usando la chiave applicativa. Formato output: base64(nonce_12byte || ciphertext || tag_16byte) Il tag GCM (16 byte) è automaticamente concatenato al ciphertext da AESGCM. """ key = settings.encryption_key_bytes aesgcm = AESGCM(key) nonce = os.urandom(12) # 12 byte nonce raccomandato per GCM # AESGCM.encrypt() restituisce ciphertext + tag concatenati ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None) # Concatena nonce + ciphertext_with_tag e codifica in base64 raw = nonce + ciphertext_with_tag return base64.b64encode(raw).decode("ascii") def decrypt_credential(encrypted: str) -> str: """ Decifra una stringa cifrata con encrypt_credential(). Solleva ValueError se la decifratura fallisce (chiave errata o dati corrotti). """ key = settings.encryption_key_bytes aesgcm = AESGCM(key) try: raw = base64.b64decode(encrypted.encode("ascii")) nonce = raw[:12] ciphertext_with_tag = raw[12:] plaintext_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None) return plaintext_bytes.decode("utf-8") except Exception as e: raise ValueError(f"Decifratura fallita: {e}") from e # ─── Hash sicuro per refresh token storage ──────────────────────────────────── import hashlib def hash_token(token: str) -> str: """SHA-256 del token raw per storage sicuro in DB.""" return hashlib.sha256(token.encode("utf-8")).hexdigest()