mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
201 lines
6.7 KiB
Python
201 lines
6.7 KiB
Python
"""
|
||
Fixtures per test di integrazione – DB in-memory SQLite + app FastAPI.
|
||
|
||
Per i test di integrazione si usa SQLite async invece di PostgreSQL per
|
||
semplicità e velocità. In CI si può aggiungere un servizio PostgreSQL reale.
|
||
"""
|
||
|
||
import os
|
||
import uuid
|
||
from typing import AsyncGenerator
|
||
|
||
import pytest
|
||
import pytest_asyncio
|
||
from httpx import ASGITransport, AsyncClient
|
||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||
|
||
# Override variabili d'ambiente prima di importare l'app
|
||
os.environ["ENCRYPTION_KEY"] = "b" * 64
|
||
os.environ["SECRET_KEY"] = "integration-test-secret-key-only-for-tests"
|
||
os.environ["DATABASE_URL"] = "sqlite+aiosqlite:///./test_integration.db"
|
||
os.environ["DATABASE_URL_SYNC"] = "sqlite:///./test_integration.db"
|
||
os.environ["APP_ENV"] = "development"
|
||
os.environ["APP_DEBUG"] = "false"
|
||
|
||
# ─── Compatibilità SQLite: tipi PostgreSQL non supportati ───────────────────
|
||
# SQLite non conosce INET, JSONB, ARRAY – mappiamo a tipi base compatibili.
|
||
# Deve essere eseguito PRIMA di importare i modelli ORM.
|
||
from sqlalchemy.dialects.sqlite import base as _sqlite_base
|
||
|
||
|
||
def _visit_inet(self, type_, **kw): # noqa: ARG001
|
||
return "VARCHAR(45)"
|
||
|
||
def _visit_jsonb(self, type_, **kw): # noqa: ARG001
|
||
return "JSON"
|
||
|
||
def _visit_array(self, type_, **kw): # noqa: ARG001
|
||
# SQLite non ha array nativi; usiamo TEXT (serializzato come JSON)
|
||
return "TEXT"
|
||
|
||
def _visit_tsvector(self, type_, **kw): # noqa: ARG001
|
||
return "TEXT"
|
||
|
||
def _visit_tsquery(self, type_, **kw): # noqa: ARG001
|
||
return "TEXT"
|
||
|
||
|
||
_sqlite_base.SQLiteTypeCompiler.visit_INET = _visit_inet
|
||
_sqlite_base.SQLiteTypeCompiler.visit_JSONB = _visit_jsonb
|
||
_sqlite_base.SQLiteTypeCompiler.visit_ARRAY = _visit_array
|
||
_sqlite_base.SQLiteTypeCompiler.visit_TSVECTOR = _visit_tsvector
|
||
_sqlite_base.SQLiteTypeCompiler.visit_TSQUERY = _visit_tsquery
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
from app.database import Base
|
||
from app.main import app
|
||
|
||
# Engine SQLite per test
|
||
TEST_DATABASE_URL = "sqlite+aiosqlite:///./test_integration.db"
|
||
|
||
test_engine = create_async_engine(
|
||
TEST_DATABASE_URL,
|
||
echo=False,
|
||
connect_args={"check_same_thread": False},
|
||
)
|
||
|
||
# ─── Compatibilità SQLite: ARRAY come JSON in DML ────────────────────────────
|
||
# SQLite non può fare binding di Python list come parametri SQL;
|
||
# li convertiamo in JSON string prima dell'esecuzione.
|
||
import json as _json
|
||
from sqlalchemy import event as _sa_event
|
||
|
||
|
||
@_sa_event.listens_for(test_engine.sync_engine, "before_cursor_execute", retval=True)
|
||
def _sqlite_list_to_json(conn, cursor, statement, parameters, context, executemany):
|
||
"""Converte le liste Python in stringhe JSON per la compatibilità con SQLite."""
|
||
def _convert(params):
|
||
if isinstance(params, (list, tuple)):
|
||
return type(params)(_json.dumps(v) if isinstance(v, list) else v for v in params)
|
||
return params
|
||
|
||
if executemany:
|
||
return statement, [_convert(p) for p in parameters]
|
||
return statement, _convert(parameters)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
|
||
TestAsyncSessionLocal = async_sessionmaker(
|
||
bind=test_engine,
|
||
class_=AsyncSession,
|
||
expire_on_commit=False,
|
||
autoflush=False,
|
||
)
|
||
|
||
|
||
@pytest_asyncio.fixture(scope="session", autouse=True)
|
||
async def setup_database():
|
||
"""Crea tutte le tabelle nel DB di test."""
|
||
# Import modelli per registrarli nel metadata
|
||
import app.models # noqa: F401
|
||
|
||
async with test_engine.begin() as conn:
|
||
# SQLite non supporta tutti i tipi PostgreSQL, usiamo tabelle semplici
|
||
await conn.run_sync(Base.metadata.drop_all)
|
||
await conn.run_sync(Base.metadata.create_all)
|
||
yield
|
||
async with test_engine.begin() as conn:
|
||
await conn.run_sync(Base.metadata.drop_all)
|
||
await test_engine.dispose()
|
||
# Pulisci file DB
|
||
import os
|
||
if os.path.exists("test_integration.db"):
|
||
os.remove("test_integration.db")
|
||
|
||
|
||
@pytest_asyncio.fixture
|
||
async def db_session() -> AsyncGenerator[AsyncSession, None]:
|
||
"""Session DB isolata per ogni test (rollback automatico)."""
|
||
async with TestAsyncSessionLocal() as session:
|
||
yield session
|
||
await session.rollback()
|
||
|
||
|
||
@pytest_asyncio.fixture
|
||
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
|
||
"""HTTP client per test API con override del DB."""
|
||
from app.database import get_db
|
||
|
||
async def override_get_db():
|
||
yield db_session
|
||
|
||
app.dependency_overrides[get_db] = override_get_db
|
||
|
||
async with AsyncClient(
|
||
transport=ASGITransport(app=app),
|
||
base_url="http://testserver",
|
||
) as c:
|
||
yield c
|
||
|
||
app.dependency_overrides.clear()
|
||
|
||
|
||
@pytest_asyncio.fixture
|
||
async def demo_tenant(db_session: AsyncSession):
|
||
"""
|
||
Crea un tenant di test con ID e slug univoci per ogni test.
|
||
|
||
Usiamo UUID randomici per evitare conflitti UNIQUE quando i test
|
||
vengono eseguiti nello stesso processo e il commit non viene rollbackato.
|
||
"""
|
||
from app.models.tenant import Tenant
|
||
|
||
tenant_id = uuid.uuid4()
|
||
tenant = Tenant(
|
||
id=tenant_id,
|
||
slug=f"test-{tenant_id.hex[:12]}",
|
||
name="Test Tenant",
|
||
plan="pro",
|
||
max_mailboxes=10,
|
||
max_users=10,
|
||
)
|
||
db_session.add(tenant)
|
||
await db_session.commit()
|
||
await db_session.refresh(tenant)
|
||
return tenant
|
||
|
||
|
||
@pytest_asyncio.fixture
|
||
async def admin_user(db_session: AsyncSession, demo_tenant):
|
||
"""Crea un utente admin nel tenant di test."""
|
||
from app.core.security import hash_password
|
||
from app.models.user import User
|
||
|
||
user = User(
|
||
tenant_id=demo_tenant.id,
|
||
email="admin@test.com",
|
||
password_hash=hash_password("AdminPass1!"),
|
||
full_name="Test Admin",
|
||
role="admin",
|
||
is_active=True,
|
||
)
|
||
db_session.add(user)
|
||
await db_session.commit()
|
||
await db_session.refresh(user)
|
||
return user
|
||
|
||
|
||
@pytest_asyncio.fixture
|
||
async def admin_token(client: AsyncClient, admin_user, db_session: AsyncSession) -> str:
|
||
"""
|
||
Token JWT per l'utente admin.
|
||
Nota: il login usa il DB sovrapposto dalla fixture, quindi
|
||
il seed_user deve già esistere.
|
||
"""
|
||
from app.core.security import create_access_token
|
||
token = create_access_token(
|
||
subject=admin_user.id,
|
||
tenant_id=admin_user.tenant_id,
|
||
role=admin_user.role,
|
||
)
|
||
return token
|