mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
feat: Fase 1 – Fondamenta complete (backend FastAPI + auth + permessi)
- docker-compose.yml: PostgreSQL 16, Redis 7, MinIO, Nginx - backend FastAPI: struttura monorepo, config pydantic-settings - modelli SQLAlchemy: tutti i modelli (tenants, users, mailboxes, messages, archival, permissions, labels, audit_log) - migrazione Alembic 0001: schema completo in pure SQL - auth API: login JWT, refresh token rotation, logout, 2FA TOTP (setup/verify/disable) - CRUD utenti: lista, crea, modifica, reset password, soft delete - permessi granulari (Fase 1-A): mailbox_permissions, assegna/revoca/lista - CRUD tenant: gestione super-admin - sicurezza: AES-256-GCM cifratura credenziali IMAP/SMTP, bcrypt password - RLS PostgreSQL: isolamento multi-tenant per request - seed sviluppo: tenant demo + admin + operator - test unit: security (bcrypt, JWT, AES), auth_service - test integration: auth endpoints, users endpoints - CI GitHub Actions: lint (ruff), test (pytest), build Docker, security scan - infra: nginx.conf, redis.conf - Makefile con comandi make dev/test/migrate/seed Definition of Done: ✅ Login, refresh token e TOTP funzionanti ✅ make dev porta in piedi tutto lo stack locale ✅ CI configurata
This commit is contained in:
@@ -0,0 +1 @@
|
||||
# Tests
|
||||
@@ -0,0 +1 @@
|
||||
# Integration tests
|
||||
@@ -0,0 +1,144 @@
|
||||
"""
|
||||
Fixtures per test di integrazione – DB in-memory SQLite + app FastAPI.
|
||||
|
||||
Per i test di integrazione si usa SQLite async invece di PostgreSQL per
|
||||
semplicità e velocità. In CI si può aggiungere un servizio PostgreSQL reale.
|
||||
"""
|
||||
|
||||
import os
|
||||
import uuid
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
# Override variabili d'ambiente prima di importare l'app
|
||||
os.environ["ENCRYPTION_KEY"] = "b" * 64
|
||||
os.environ["SECRET_KEY"] = "integration-test-secret-key-only-for-tests"
|
||||
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./test_integration.db"
|
||||
os.environ["DATABASE_URL_SYNC"] = "sqlite:///./test_integration.db"
|
||||
os.environ["APP_ENV"] = "development"
|
||||
os.environ["APP_DEBUG"] = "false"
|
||||
|
||||
from app.database import Base
|
||||
from app.main import app
|
||||
|
||||
# Engine SQLite per test
|
||||
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test_integration.db"
|
||||
|
||||
test_engine = create_async_engine(
|
||||
TEST_DATABASE_URL,
|
||||
echo=False,
|
||||
connect_args={"check_same_thread": False},
|
||||
)
|
||||
|
||||
TestAsyncSessionLocal = async_sessionmaker(
|
||||
bind=test_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
autoflush=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="session", autouse=True)
|
||||
async def setup_database():
|
||||
"""Crea tutte le tabelle nel DB di test."""
|
||||
# Import modelli per registrarli nel metadata
|
||||
import app.models # noqa: F401
|
||||
|
||||
async with test_engine.begin() as conn:
|
||||
# SQLite non supporta tutti i tipi PostgreSQL, usiamo tabelle semplici
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
yield
|
||||
async with test_engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
await test_engine.dispose()
|
||||
# Pulisci file DB
|
||||
import os
|
||||
if os.path.exists("test_integration.db"):
|
||||
os.remove("test_integration.db")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session() -> AsyncGenerator[AsyncSession, None]:
|
||||
"""Session DB isolata per ogni test (rollback automatico)."""
|
||||
async with TestAsyncSessionLocal() as session:
|
||||
yield session
|
||||
await session.rollback()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
|
||||
"""HTTP client per test API con override del DB."""
|
||||
from app.database import get_db
|
||||
|
||||
async def override_get_db():
|
||||
yield db_session
|
||||
|
||||
app.dependency_overrides[get_db] = override_get_db
|
||||
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app),
|
||||
base_url="http://testserver",
|
||||
) as c:
|
||||
yield c
|
||||
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def demo_tenant(db_session: AsyncSession):
|
||||
"""Crea un tenant di test."""
|
||||
from app.models.tenant import Tenant
|
||||
|
||||
tenant = Tenant(
|
||||
id=uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
|
||||
slug="test-tenant",
|
||||
name="Test Tenant",
|
||||
plan="pro",
|
||||
max_mailboxes=10,
|
||||
max_users=10,
|
||||
)
|
||||
db_session.add(tenant)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(tenant)
|
||||
return tenant
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def admin_user(db_session: AsyncSession, demo_tenant):
|
||||
"""Crea un utente admin nel tenant di test."""
|
||||
from app.core.security import hash_password
|
||||
from app.models.user import User
|
||||
|
||||
user = User(
|
||||
tenant_id=demo_tenant.id,
|
||||
email="admin@test.com",
|
||||
password_hash=hash_password("AdminPass1!"),
|
||||
full_name="Test Admin",
|
||||
role="admin",
|
||||
is_active=True,
|
||||
)
|
||||
db_session.add(user)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def admin_token(client: AsyncClient, admin_user, db_session: AsyncSession) -> str:
|
||||
"""
|
||||
Token JWT per l'utente admin.
|
||||
Nota: il login usa il DB sovrapposto dalla fixture, quindi
|
||||
il seed_user deve già esistere.
|
||||
"""
|
||||
from app.core.security import create_access_token
|
||||
token = create_access_token(
|
||||
subject=admin_user.id,
|
||||
tenant_id=admin_user.tenant_id,
|
||||
role=admin_user.role,
|
||||
)
|
||||
return token
|
||||
@@ -0,0 +1,142 @@
|
||||
"""
|
||||
Test di integrazione per gli endpoint di autenticazione.
|
||||
"""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
os.environ.setdefault("ENCRYPTION_KEY", "b" * 64)
|
||||
os.environ.setdefault("SECRET_KEY", "integration-test-secret-key-only-for-tests")
|
||||
os.environ.setdefault("DATABASE_URL", "sqlite+aiosqlite:///./test_integration.db")
|
||||
os.environ.setdefault("DATABASE_URL_SYNC", "sqlite:///./test_integration.db")
|
||||
|
||||
|
||||
class TestLoginEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_success(self, client, admin_user):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={
|
||||
"email": "admin@test.com",
|
||||
"password": "AdminPass1!",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "access_token" in data
|
||||
assert "refresh_token" in data
|
||||
assert data["token_type"] == "bearer"
|
||||
assert data["expires_in"] > 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_wrong_password_returns_401(self, client, admin_user):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={
|
||||
"email": "admin@test.com",
|
||||
"password": "WrongPassword1!",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_nonexistent_user_returns_401(self, client):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={
|
||||
"email": "nobody@example.com",
|
||||
"password": "Password1!",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_missing_fields_returns_422(self, client):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": "test@test.com"}, # manca password
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_invalid_email_returns_422(self, client):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": "not-an-email", "password": "Password1!"},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
|
||||
class TestMeEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_me_returns_current_user(self, client, admin_token, admin_user):
|
||||
response = await client.get(
|
||||
"/api/v1/auth/me",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "admin@test.com"
|
||||
assert data["role"] == "admin"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_me_without_token_returns_403(self, client):
|
||||
response = await client.get("/api/v1/auth/me")
|
||||
assert response.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_me_with_invalid_token_returns_401(self, client):
|
||||
response = await client.get(
|
||||
"/api/v1/auth/me",
|
||||
headers={"Authorization": "Bearer invalid.token.here"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestHealthEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_health_returns_ok(self, client):
|
||||
response = await client.get("/health")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
|
||||
|
||||
class TestRefreshEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_with_invalid_token_returns_401(self, client):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/refresh",
|
||||
json={"refresh_token": "invalid.token.here"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestTOTPEndpoints:
|
||||
@pytest.mark.asyncio
|
||||
async def test_totp_setup_returns_qr(self, client, admin_token):
|
||||
response = await client.post(
|
||||
"/api/v1/auth/totp/setup",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "secret" in data
|
||||
assert "qr_uri" in data
|
||||
assert "qr_image_base64" in data
|
||||
assert data["qr_uri"].startswith("otpauth://totp/")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_totp_verify_wrong_code_returns_400(self, client, admin_token):
|
||||
# Prima setup
|
||||
await client.post(
|
||||
"/api/v1/auth/totp/setup",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
# Poi verify con codice errato
|
||||
response = await client.post(
|
||||
"/api/v1/auth/totp/verify",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={"totp_code": "000000"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
@@ -0,0 +1,131 @@
|
||||
"""
|
||||
Test di integrazione per gli endpoint utenti.
|
||||
"""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
os.environ.setdefault("ENCRYPTION_KEY", "b" * 64)
|
||||
os.environ.setdefault("SECRET_KEY", "integration-test-secret-key-only-for-tests")
|
||||
|
||||
|
||||
class TestUsersEndpoint:
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_admin(self, client, admin_token):
|
||||
response = await client.get(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "items" in data
|
||||
assert "total" in data
|
||||
assert data["total"] >= 1 # almeno l'admin stesso
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_users_no_auth_returns_403(self, client):
|
||||
response = await client.get("/api/v1/users")
|
||||
assert response.status_code == 403
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_user_success(self, client, admin_token):
|
||||
response = await client.post(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"email": "newuser@test.com",
|
||||
"password": "NewUser1!",
|
||||
"full_name": "Nuovo Utente",
|
||||
"role": "operator",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 201
|
||||
data = response.json()
|
||||
assert data["email"] == "newuser@test.com"
|
||||
assert data["role"] == "operator"
|
||||
assert "password_hash" not in data # non deve esporre hash
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_user_duplicate_email_returns_409(self, client, admin_token):
|
||||
# Crea primo utente
|
||||
await client.post(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"email": "duplicate@test.com",
|
||||
"password": "DupUser1!",
|
||||
"full_name": "Dup User",
|
||||
"role": "operator",
|
||||
},
|
||||
)
|
||||
# Secondo tentativo con stessa email
|
||||
response = await client.post(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"email": "duplicate@test.com",
|
||||
"password": "DupUser1!",
|
||||
"full_name": "Dup User 2",
|
||||
"role": "operator",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 409
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_superadmin_forbidden(self, client, admin_token):
|
||||
response = await client.post(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"email": "sadmin@test.com",
|
||||
"password": "SuperAdmin1!",
|
||||
"full_name": "Super",
|
||||
"role": "super_admin",
|
||||
},
|
||||
)
|
||||
# Il validator Pydantic blocca la creazione di super_admin
|
||||
assert response.status_code in (400, 422)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_user_weak_password_returns_422(self, client, admin_token):
|
||||
response = await client.post(
|
||||
"/api/v1/users",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={
|
||||
"email": "weakpwd@test.com",
|
||||
"password": "weak", # troppo corta e senza maiuscole/numeri
|
||||
"full_name": "Weak Pwd User",
|
||||
"role": "operator",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_user_by_id(self, client, admin_token, admin_user):
|
||||
response = await client.get(
|
||||
f"/api/v1/users/{admin_user.id}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == str(admin_user.id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_nonexistent_user_returns_404(self, client, admin_token):
|
||||
import uuid
|
||||
fake_id = uuid.uuid4()
|
||||
response = await client.get(
|
||||
f"/api/v1/users/{fake_id}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user(self, client, admin_token, admin_user):
|
||||
response = await client.patch(
|
||||
f"/api/v1/users/{admin_user.id}",
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
json={"full_name": "Admin Aggiornato"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["full_name"] == "Admin Aggiornato"
|
||||
@@ -0,0 +1 @@
|
||||
# Unit tests
|
||||
@@ -0,0 +1,149 @@
|
||||
"""
|
||||
Test unitari per AuthService (mock del DB).
|
||||
"""
|
||||
|
||||
import os
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
os.environ.setdefault("ENCRYPTION_KEY", "a" * 64)
|
||||
os.environ.setdefault("SECRET_KEY", "test-secret-key-only")
|
||||
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
|
||||
os.environ.setdefault("DATABASE_URL_SYNC", "postgresql://test:test@localhost:5432/test")
|
||||
|
||||
from app.core.exceptions import (
|
||||
AccountDisabledError,
|
||||
AccountLockedError,
|
||||
InvalidCredentialsError,
|
||||
TOTPRequiredError,
|
||||
)
|
||||
from app.core.security import hash_password
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
def make_user(**kwargs) -> User:
|
||||
"""Factory per creare un utente mock."""
|
||||
user = MagicMock(spec=User)
|
||||
user.id = kwargs.get("id", uuid.uuid4())
|
||||
user.tenant_id = kwargs.get("tenant_id", uuid.uuid4())
|
||||
user.email = kwargs.get("email", "test@example.com")
|
||||
user.password_hash = kwargs.get("password_hash", hash_password("Password1!"))
|
||||
user.role = kwargs.get("role", "operator")
|
||||
user.is_active = kwargs.get("is_active", True)
|
||||
user.totp_enabled = kwargs.get("totp_enabled", False)
|
||||
user.totp_secret = kwargs.get("totp_secret", None)
|
||||
user.failed_login_count = kwargs.get("failed_login_count", 0)
|
||||
user.locked_until = kwargs.get("locked_until", None)
|
||||
return user
|
||||
|
||||
|
||||
class TestAuthServiceLogin:
|
||||
@pytest.fixture
|
||||
def mock_db(self):
|
||||
db = AsyncMock()
|
||||
db.add = MagicMock()
|
||||
db.execute = AsyncMock()
|
||||
db.flush = AsyncMock()
|
||||
return db
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_with_correct_credentials(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
user = make_user(password_hash=hash_password("Password1!"))
|
||||
|
||||
# Mock query utente
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = user
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
|
||||
with patch.object(service, "_handle_failed_login", new_callable=AsyncMock):
|
||||
with patch.object(service, "_reset_failed_login", new_callable=AsyncMock):
|
||||
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
||||
access, refresh = await service.login(
|
||||
email="test@example.com",
|
||||
password="Password1!",
|
||||
totp_code=None,
|
||||
)
|
||||
|
||||
assert access is not None
|
||||
assert refresh is not None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_user_not_found_raises(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = None
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
||||
with pytest.raises(InvalidCredentialsError):
|
||||
await service.login("notfound@example.com", "Password1!", None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_inactive_user_raises(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
user = make_user(is_active=False)
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = user
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
with pytest.raises(AccountDisabledError):
|
||||
await service.login("test@example.com", "Password1!", None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_locked_account_raises(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
user = make_user(
|
||||
locked_until=datetime.now(UTC) + timedelta(minutes=10)
|
||||
)
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = user
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
with pytest.raises(AccountLockedError):
|
||||
await service.login("test@example.com", "Password1!", None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_wrong_password_raises(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
user = make_user(password_hash=hash_password("CorrectPassword1!"))
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = user
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
with patch.object(service, "_handle_failed_login", new_callable=AsyncMock):
|
||||
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
||||
with pytest.raises(InvalidCredentialsError):
|
||||
await service.login("test@example.com", "WrongPassword1!", None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_login_totp_required_when_enabled(self, mock_db):
|
||||
from app.services.auth_service import AuthService
|
||||
|
||||
user = make_user(
|
||||
password_hash=hash_password("Password1!"),
|
||||
totp_enabled=True,
|
||||
)
|
||||
mock_result = MagicMock()
|
||||
mock_result.scalar_one_or_none.return_value = user
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
service = AuthService(mock_db)
|
||||
with patch.object(service, "_reset_failed_login", new_callable=AsyncMock):
|
||||
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
||||
with pytest.raises(TOTPRequiredError):
|
||||
await service.login("test@example.com", "Password1!", totp_code=None)
|
||||
@@ -0,0 +1,143 @@
|
||||
"""
|
||||
Test unitari per app.core.security.
|
||||
"""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
# Override variabili d'ambiente per i test (prima di importare app)
|
||||
os.environ["ENCRYPTION_KEY"] = "a" * 64
|
||||
os.environ["SECRET_KEY"] = "test-secret-key-for-unit-tests-only"
|
||||
os.environ["DATABASE_URL"] = "postgresql+asyncpg://test:test@localhost:5432/test"
|
||||
os.environ["DATABASE_URL_SYNC"] = "postgresql://test:test@localhost:5432/test"
|
||||
|
||||
from app.core.security import (
|
||||
hash_password,
|
||||
verify_password,
|
||||
create_access_token,
|
||||
create_refresh_token,
|
||||
decode_token,
|
||||
encrypt_credential,
|
||||
decrypt_credential,
|
||||
hash_token,
|
||||
)
|
||||
|
||||
|
||||
class TestPasswordHashing:
|
||||
def test_hash_password_returns_bcrypt_hash(self):
|
||||
hashed = hash_password("MySecurePassword1!")
|
||||
assert hashed.startswith("$2b$")
|
||||
assert len(hashed) > 20
|
||||
|
||||
def test_verify_correct_password(self):
|
||||
password = "MySecurePassword1!"
|
||||
hashed = hash_password(password)
|
||||
assert verify_password(password, hashed) is True
|
||||
|
||||
def test_verify_wrong_password(self):
|
||||
hashed = hash_password("CorrectPassword1!")
|
||||
assert verify_password("WrongPassword1!", hashed) is False
|
||||
|
||||
def test_hash_is_different_each_time(self):
|
||||
"""Bcrypt usa salt casuale: due hash dello stesso secret sono diversi."""
|
||||
p = "SamePassword1!"
|
||||
h1 = hash_password(p)
|
||||
h2 = hash_password(p)
|
||||
assert h1 != h2
|
||||
# Ma entrambi verificano correttamente
|
||||
assert verify_password(p, h1)
|
||||
assert verify_password(p, h2)
|
||||
|
||||
|
||||
class TestJWT:
|
||||
def test_create_and_decode_access_token(self):
|
||||
import uuid
|
||||
user_id = uuid.uuid4()
|
||||
tenant_id = uuid.uuid4()
|
||||
token = create_access_token(
|
||||
subject=user_id,
|
||||
tenant_id=tenant_id,
|
||||
role="admin",
|
||||
)
|
||||
payload = decode_token(token)
|
||||
assert payload["sub"] == str(user_id)
|
||||
assert payload["tid"] == str(tenant_id)
|
||||
assert payload["role"] == "admin"
|
||||
assert payload["type"] == "access"
|
||||
|
||||
def test_create_and_decode_refresh_token(self):
|
||||
import uuid
|
||||
user_id = uuid.uuid4()
|
||||
tenant_id = uuid.uuid4()
|
||||
token = create_refresh_token(subject=user_id, tenant_id=tenant_id)
|
||||
payload = decode_token(token)
|
||||
assert payload["sub"] == str(user_id)
|
||||
assert payload["type"] == "refresh"
|
||||
|
||||
def test_expired_token_raises(self):
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from jose import jwt
|
||||
from app.config import get_settings
|
||||
from jose import JWTError
|
||||
|
||||
settings = get_settings()
|
||||
payload = {
|
||||
"sub": "user-id",
|
||||
"tid": "tenant-id",
|
||||
"type": "access",
|
||||
"exp": datetime.now(UTC) - timedelta(seconds=1), # già scaduto
|
||||
}
|
||||
token = jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||||
with pytest.raises(JWTError):
|
||||
decode_token(token)
|
||||
|
||||
def test_invalid_signature_raises(self):
|
||||
from jose import JWTError
|
||||
with pytest.raises(JWTError):
|
||||
decode_token("this.is.not.a.valid.token")
|
||||
|
||||
|
||||
class TestAESEncryption:
|
||||
def test_encrypt_decrypt_roundtrip(self):
|
||||
secret = "imap_password_super_secret_123!"
|
||||
encrypted = encrypt_credential(secret)
|
||||
decrypted = decrypt_credential(encrypted)
|
||||
assert decrypted == secret
|
||||
|
||||
def test_encrypt_produces_different_output_each_time(self):
|
||||
"""Nonce casuale garantisce che due cifrature dello stesso plaintext siano diverse."""
|
||||
secret = "same_secret"
|
||||
enc1 = encrypt_credential(secret)
|
||||
enc2 = encrypt_credential(secret)
|
||||
assert enc1 != enc2
|
||||
# Ma entrambi decifrano correttamente
|
||||
assert decrypt_credential(enc1) == secret
|
||||
assert decrypt_credential(enc2) == secret
|
||||
|
||||
def test_decrypt_with_wrong_data_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
decrypt_credential("dGhpcyBpcyBub3QgdmFsaWQgYWVzIGRhdGE=")
|
||||
|
||||
def test_encrypt_empty_string(self):
|
||||
encrypted = encrypt_credential("")
|
||||
decrypted = decrypt_credential(encrypted)
|
||||
assert decrypted == ""
|
||||
|
||||
def test_encrypt_unicode_string(self):
|
||||
secret = "pàssword_con_àccenti_è_ù!"
|
||||
encrypted = encrypt_credential(secret)
|
||||
decrypted = decrypt_credential(encrypted)
|
||||
assert decrypted == secret
|
||||
|
||||
|
||||
class TestHashToken:
|
||||
def test_hash_is_deterministic(self):
|
||||
token = "my_refresh_token"
|
||||
assert hash_token(token) == hash_token(token)
|
||||
|
||||
def test_different_tokens_different_hashes(self):
|
||||
assert hash_token("token1") != hash_token("token2")
|
||||
|
||||
def test_hash_is_64_chars(self):
|
||||
"""SHA-256 produce 32 bytes = 64 caratteri hex."""
|
||||
assert len(hash_token("any_token")) == 64
|
||||
Reference in New Issue
Block a user