mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
58a233236c
- docker-compose.yml: PostgreSQL 16, Redis 7, MinIO, Nginx - backend FastAPI: struttura monorepo, config pydantic-settings - modelli SQLAlchemy: tutti i modelli (tenants, users, mailboxes, messages, archival, permissions, labels, audit_log) - migrazione Alembic 0001: schema completo in pure SQL - auth API: login JWT, refresh token rotation, logout, 2FA TOTP (setup/verify/disable) - CRUD utenti: lista, crea, modifica, reset password, soft delete - permessi granulari (Fase 1-A): mailbox_permissions, assegna/revoca/lista - CRUD tenant: gestione super-admin - sicurezza: AES-256-GCM cifratura credenziali IMAP/SMTP, bcrypt password - RLS PostgreSQL: isolamento multi-tenant per request - seed sviluppo: tenant demo + admin + operator - test unit: security (bcrypt, JWT, AES), auth_service - test integration: auth endpoints, users endpoints - CI GitHub Actions: lint (ruff), test (pytest), build Docker, security scan - infra: nginx.conf, redis.conf - Makefile con comandi make dev/test/migrate/seed Definition of Done: ✅ Login, refresh token e TOTP funzionanti ✅ make dev porta in piedi tutto lo stack locale ✅ CI configurata
159 lines
5.1 KiB
Python
159 lines
5.1 KiB
Python
"""
|
||
Modulo sicurezza – cifratura AES-256-GCM, hashing password, JWT utilities.
|
||
|
||
ADR-002: Le credenziali IMAP/SMTP vengono cifrate con AES-256-GCM prima di
|
||
essere scritte in DB. La chiave è in variabile d'ambiente (ENCRYPTION_KEY).
|
||
Formato storage: base64(nonce_12byte || ciphertext || tag_16byte)
|
||
"""
|
||
|
||
import base64
|
||
import os
|
||
from datetime import UTC, datetime, timedelta
|
||
from typing import Any
|
||
from uuid import UUID
|
||
|
||
import bcrypt as _bcrypt
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
from jose import JWTError, jwt
|
||
|
||
from app.config import get_settings
|
||
|
||
settings = get_settings()
|
||
|
||
# ─── Password hashing (bcrypt diretto, compatibile con bcrypt 4.x/5.x) ───────
|
||
_BCRYPT_ROUNDS = 12
|
||
|
||
|
||
def hash_password(password: str) -> str:
|
||
"""Genera hash bcrypt della password (work factor 12)."""
|
||
pwd_bytes = password.encode("utf-8")
|
||
salt = _bcrypt.gensalt(rounds=_BCRYPT_ROUNDS)
|
||
return _bcrypt.hashpw(pwd_bytes, salt).decode("utf-8")
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""Verifica password contro il suo hash."""
|
||
try:
|
||
return _bcrypt.checkpw(
|
||
plain_password.encode("utf-8"),
|
||
hashed_password.encode("utf-8"),
|
||
)
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# ─── JWT ──────────────────────────────────────────────────────────────────────
|
||
def create_access_token(
|
||
subject: str | UUID,
|
||
tenant_id: str | UUID,
|
||
role: str,
|
||
extra_claims: dict[str, Any] | None = None,
|
||
) -> str:
|
||
"""
|
||
Crea un JWT access token con scadenza configurabile.
|
||
|
||
Claims standard:
|
||
- sub: user_id (string)
|
||
- tid: tenant_id
|
||
- role: ruolo utente
|
||
- exp: scadenza
|
||
- iat: emesso a
|
||
"""
|
||
now = datetime.now(UTC)
|
||
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
|
||
|
||
payload: dict[str, Any] = {
|
||
"sub": str(subject),
|
||
"tid": str(tenant_id),
|
||
"role": role,
|
||
"iat": now,
|
||
"exp": expire,
|
||
"type": "access",
|
||
}
|
||
if extra_claims:
|
||
payload.update(extra_claims)
|
||
|
||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||
|
||
|
||
def create_refresh_token(subject: str | UUID, tenant_id: str | UUID) -> str:
|
||
"""
|
||
Crea un JWT refresh token con scadenza lunga (30 giorni default).
|
||
Non contiene il ruolo – viene rivalutato a ogni refresh.
|
||
"""
|
||
now = datetime.now(UTC)
|
||
expire = now + timedelta(days=settings.refresh_token_expire_days)
|
||
|
||
payload: dict[str, Any] = {
|
||
"sub": str(subject),
|
||
"tid": str(tenant_id),
|
||
"iat": now,
|
||
"exp": expire,
|
||
"type": "refresh",
|
||
}
|
||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||
|
||
|
||
def decode_token(token: str) -> dict[str, Any]:
|
||
"""
|
||
Decodifica e valida un JWT token.
|
||
Solleva JWTError se il token è invalido o scaduto.
|
||
"""
|
||
return jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
|
||
|
||
|
||
def is_token_valid(token: str, expected_type: str = "access") -> bool:
|
||
"""Verifica rapidamente la validità del token senza sollevare eccezioni."""
|
||
try:
|
||
payload = decode_token(token)
|
||
return payload.get("type") == expected_type
|
||
except JWTError:
|
||
return False
|
||
|
||
|
||
# ─── AES-256-GCM cifratura credenziali (ADR-002) ─────────────────────────────
|
||
def encrypt_credential(plaintext: str) -> str:
|
||
"""
|
||
Cifra una stringa con AES-256-GCM usando la chiave applicativa.
|
||
|
||
Formato output: base64(nonce_12byte || ciphertext || tag_16byte)
|
||
Il tag GCM (16 byte) è automaticamente concatenato al ciphertext da AESGCM.
|
||
"""
|
||
key = settings.encryption_key_bytes
|
||
aesgcm = AESGCM(key)
|
||
nonce = os.urandom(12) # 12 byte nonce raccomandato per GCM
|
||
|
||
# AESGCM.encrypt() restituisce ciphertext + tag concatenati
|
||
ciphertext_with_tag = aesgcm.encrypt(nonce, plaintext.encode("utf-8"), None)
|
||
|
||
# Concatena nonce + ciphertext_with_tag e codifica in base64
|
||
raw = nonce + ciphertext_with_tag
|
||
return base64.b64encode(raw).decode("ascii")
|
||
|
||
|
||
def decrypt_credential(encrypted: str) -> str:
|
||
"""
|
||
Decifra una stringa cifrata con encrypt_credential().
|
||
Solleva ValueError se la decifratura fallisce (chiave errata o dati corrotti).
|
||
"""
|
||
key = settings.encryption_key_bytes
|
||
aesgcm = AESGCM(key)
|
||
|
||
try:
|
||
raw = base64.b64decode(encrypted.encode("ascii"))
|
||
nonce = raw[:12]
|
||
ciphertext_with_tag = raw[12:]
|
||
plaintext_bytes = aesgcm.decrypt(nonce, ciphertext_with_tag, None)
|
||
return plaintext_bytes.decode("utf-8")
|
||
except Exception as e:
|
||
raise ValueError(f"Decifratura fallita: {e}") from e
|
||
|
||
|
||
# ─── Hash sicuro per refresh token storage ────────────────────────────────────
|
||
import hashlib
|
||
|
||
|
||
def hash_token(token: str) -> str:
|
||
"""SHA-256 del token raw per storage sicuro in DB."""
|
||
return hashlib.sha256(token.encode("utf-8")).hexdigest()
|