Files
PecHub/backend/app/core/security.py
T
2026-06-18 15:14:10 +02:00

164 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Modulo sicurezza cifratura AES-256-GCM, hashing password, JWT utilities.
ADR-002: Le credenziali IMAP/SMTP vengono cifrate con AES-256-GCM prima di
essere scritte in DB. La chiave è in variabile d'ambiente (ENCRYPTION_KEY).
Formato storage: base64(nonce_12byte || ciphertext || tag_16byte)
"""
import base64
import os
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import bcrypt as _bcrypt
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from jose import JWTError, jwt
from app.config import get_settings
settings = get_settings()
# ─── Password hashing (bcrypt diretto, compatibile con bcrypt 4.x/5.x) ───────
_BCRYPT_ROUNDS = 12
def hash_password(password: str) -> str:
"""Genera hash bcrypt della password (work factor 12)."""
pwd_bytes = password.encode("utf-8")
salt = _bcrypt.gensalt(rounds=_BCRYPT_ROUNDS)
return _bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifica password contro il suo hash."""
try:
return _bcrypt.checkpw(
plain_password.encode("utf-8"),
hashed_password.encode("utf-8"),
)
except Exception:
return False
# ─── JWT ──────────────────────────────────────────────────────────────────────
def create_access_token(
subject: str | UUID,
tenant_id: str | UUID,
role: str,
extra_claims: dict[str, Any] | None = None,
) -> str:
"""
Crea un JWT access token con scadenza configurabile.
Claims standard:
- sub: user_id (string)
- tid: tenant_id
- role: ruolo utente
- exp: scadenza
- iat: emesso a
"""
now = datetime.now(UTC)
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
payload: dict[str, Any] = {
"sub": str(subject),
"tid": str(tenant_id),
"role": role,
"iat": now,
"exp": expire,
"type": "access",
}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token(subject: str | UUID, tenant_id: str | UUID) -> str:
"""
Crea un JWT refresh token con scadenza lunga (30 giorni default).
Non contiene il ruolo viene rivalutato a ogni refresh.
Include un jti (JWT ID) UUID per garantire unicita' anche se due token
vengono creati nello stesso secondo per lo stesso utente.
"""
import uuid as _uuid
now = datetime.now(UTC)
expire = now + timedelta(days=settings.refresh_token_expire_days)
payload: dict[str, Any] = {
"sub": str(subject),
"tid": str(tenant_id),
"jti": str(_uuid.uuid4()), # JWT ID unico per evitare hash duplicati in DB
"iat": now,
"exp": expire,
"type": "refresh",
}
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def decode_token(token: str) -> dict[str, Any]:
"""
Decodifica e valida un JWT token.
Solleva JWTError se il token è invalido o scaduto.
"""
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
def is_token_valid(token: str, expected_type: str = "access") -> bool:
"""Verifica rapidamente la validità del token senza sollevare eccezioni."""
try:
payload = decode_token(token)
return payload.get("type") == expected_type
except JWTError:
return False
# ─── AES-256-GCM cifratura credenziali (ADR-002) ─────────────────────────────
def encrypt_credential(plaintext: str) -> str:
"""
Cifra una stringa con AES-256-GCM usando la chiave applicativa.
Formato output: base64(nonce_12byte || ciphertext || tag_16byte)
Il tag GCM (16 byte) è automaticamente concatenato al ciphertext da AESGCM.
"""
key = settings.encryption_key_bytes
aesgcm = AESGCM(key)
nonce = os.urandom(12) # 12 byte nonce raccomandato per GCM
# AESGCM.encrypt() restituisce ciphertext + tag concatenati
ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
# Concatena nonce + ciphertext_with_tag e codifica in base64
raw = nonce + ciphertext_with_tag
return base64.b64encode(raw).decode("ascii")
def decrypt_credential(encrypted: str) -> str:
"""
Decifra una stringa cifrata con encrypt_credential().
Solleva ValueError se la decifratura fallisce (chiave errata o dati corrotti).
"""
key = settings.encryption_key_bytes
aesgcm = AESGCM(key)
try:
raw = base64.b64decode(encrypted.encode("ascii"))
nonce = raw[:12]
ciphertext_with_tag = raw[12:]
plaintext_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
return plaintext_bytes.decode("utf-8")
except Exception as e:
raise ValueError(f"Decifratura fallita: {e}") from e
# ─── Hash sicuro per refresh token storage ────────────────────────────────────
import hashlib
def hash_token(token: str) -> str:
"""SHA-256 del token raw per storage sicuro in DB."""
return hashlib.sha256(token.encode("utf-8")).hexdigest()