mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
58a233236c
- docker-compose.yml: PostgreSQL 16, Redis 7, MinIO, Nginx - backend FastAPI: struttura monorepo, config pydantic-settings - modelli SQLAlchemy: tutti i modelli (tenants, users, mailboxes, messages, archival, permissions, labels, audit_log) - migrazione Alembic 0001: schema completo in pure SQL - auth API: login JWT, refresh token rotation, logout, 2FA TOTP (setup/verify/disable) - CRUD utenti: lista, crea, modifica, reset password, soft delete - permessi granulari (Fase 1-A): mailbox_permissions, assegna/revoca/lista - CRUD tenant: gestione super-admin - sicurezza: AES-256-GCM cifratura credenziali IMAP/SMTP, bcrypt password - RLS PostgreSQL: isolamento multi-tenant per request - seed sviluppo: tenant demo + admin + operator - test unit: security (bcrypt, JWT, AES), auth_service - test integration: auth endpoints, users endpoints - CI GitHub Actions: lint (ruff), test (pytest), build Docker, security scan - infra: nginx.conf, redis.conf - Makefile con comandi make dev/test/migrate/seed Definition of Done: ✅ Login, refresh token e TOTP funzionanti ✅ make dev porta in piedi tutto lo stack locale ✅ CI configurata
150 lines
5.5 KiB
Python
150 lines
5.5 KiB
Python
"""
|
|
Test unitari per AuthService (mock del DB).
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
from datetime import UTC, datetime, timedelta
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
os.environ.setdefault("ENCRYPTION_KEY", "a" * 64)
|
|
os.environ.setdefault("SECRET_KEY", "test-secret-key-only")
|
|
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost:5432/test")
|
|
os.environ.setdefault("DATABASE_URL_SYNC", "postgresql://test:test@localhost:5432/test")
|
|
|
|
from app.core.exceptions import (
|
|
AccountDisabledError,
|
|
AccountLockedError,
|
|
InvalidCredentialsError,
|
|
TOTPRequiredError,
|
|
)
|
|
from app.core.security import hash_password
|
|
from app.models.user import User
|
|
|
|
|
|
def make_user(**kwargs) -> User:
|
|
"""Factory per creare un utente mock."""
|
|
user = MagicMock(spec=User)
|
|
user.id = kwargs.get("id", uuid.uuid4())
|
|
user.tenant_id = kwargs.get("tenant_id", uuid.uuid4())
|
|
user.email = kwargs.get("email", "test@example.com")
|
|
user.password_hash = kwargs.get("password_hash", hash_password("Password1!"))
|
|
user.role = kwargs.get("role", "operator")
|
|
user.is_active = kwargs.get("is_active", True)
|
|
user.totp_enabled = kwargs.get("totp_enabled", False)
|
|
user.totp_secret = kwargs.get("totp_secret", None)
|
|
user.failed_login_count = kwargs.get("failed_login_count", 0)
|
|
user.locked_until = kwargs.get("locked_until", None)
|
|
return user
|
|
|
|
|
|
class TestAuthServiceLogin:
|
|
@pytest.fixture
|
|
def mock_db(self):
|
|
db = AsyncMock()
|
|
db.add = MagicMock()
|
|
db.execute = AsyncMock()
|
|
db.flush = AsyncMock()
|
|
return db
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_with_correct_credentials(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
user = make_user(password_hash=hash_password("Password1!"))
|
|
|
|
# Mock query utente
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = user
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
|
|
with patch.object(service, "_handle_failed_login", new_callable=AsyncMock):
|
|
with patch.object(service, "_reset_failed_login", new_callable=AsyncMock):
|
|
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
|
access, refresh = await service.login(
|
|
email="test@example.com",
|
|
password="Password1!",
|
|
totp_code=None,
|
|
)
|
|
|
|
assert access is not None
|
|
assert refresh is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_user_not_found_raises(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = None
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
|
with pytest.raises(InvalidCredentialsError):
|
|
await service.login("notfound@example.com", "Password1!", None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_inactive_user_raises(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
user = make_user(is_active=False)
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = user
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
with pytest.raises(AccountDisabledError):
|
|
await service.login("test@example.com", "Password1!", None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_locked_account_raises(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
user = make_user(
|
|
locked_until=datetime.now(UTC) + timedelta(minutes=10)
|
|
)
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = user
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
with pytest.raises(AccountLockedError):
|
|
await service.login("test@example.com", "Password1!", None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_wrong_password_raises(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
user = make_user(password_hash=hash_password("CorrectPassword1!"))
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = user
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
with patch.object(service, "_handle_failed_login", new_callable=AsyncMock):
|
|
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
|
with pytest.raises(InvalidCredentialsError):
|
|
await service.login("test@example.com", "WrongPassword1!", None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_totp_required_when_enabled(self, mock_db):
|
|
from app.services.auth_service import AuthService
|
|
|
|
user = make_user(
|
|
password_hash=hash_password("Password1!"),
|
|
totp_enabled=True,
|
|
)
|
|
mock_result = MagicMock()
|
|
mock_result.scalar_one_or_none.return_value = user
|
|
mock_db.execute.return_value = mock_result
|
|
|
|
service = AuthService(mock_db)
|
|
with patch.object(service, "_reset_failed_login", new_callable=AsyncMock):
|
|
with patch.object(service, "_log_audit", new_callable=AsyncMock):
|
|
with pytest.raises(TOTPRequiredError):
|
|
await service.login("test@example.com", "Password1!", totp_code=None)
|