Files
2026-03-19 16:58:23 +01:00

352 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Test di integrazione API invio PEC (POST /send e GET /send/jobs).
Usa SQLite in-memory + mock dell'arq pool per evitare dipendenze esterne.
"""
import uuid
from unittest.mock import AsyncMock, patch
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import encrypt_credential
# ─── Fixtures ─────────────────────────────────────────────────────────────────
@pytest_asyncio.fixture
async def active_mailbox(db_session: AsyncSession, demo_tenant):
"""Crea una casella PEC attiva nel tenant di test."""
from app.models.mailbox import Mailbox
mailbox = Mailbox(
tenant_id=demo_tenant.id,
email_address="test@pec.example.it",
display_name="Test PEC",
provider="test",
imap_host_enc=encrypt_credential("imap.example.it"),
imap_port_enc=encrypt_credential("993"),
imap_user_enc=encrypt_credential("test@pec.example.it"),
imap_pass_enc=encrypt_credential("secret"),
imap_use_ssl=True,
smtp_host_enc=encrypt_credential("smtp.example.it"),
smtp_port_enc=encrypt_credential("465"),
smtp_user_enc=encrypt_credential("test@pec.example.it"),
smtp_pass_enc=encrypt_credential("secret"),
smtp_use_tls=True,
status="active",
)
db_session.add(mailbox)
await db_session.commit()
await db_session.refresh(mailbox)
return mailbox
@pytest_asyncio.fixture
async def auth_headers(admin_token: str) -> dict:
"""Header Authorization con token admin."""
return {"Authorization": f"Bearer {admin_token}"}
# ─── Test POST /send ──────────────────────────────────────────────────────────
class TestCreateSendJob:
"""Test endpoint POST /api/v1/send."""
@pytest.mark.asyncio
async def test_send_requires_authentication(self, client: AsyncClient, active_mailbox):
"""Senza token → 401."""
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": "Test",
"body_text": "corpo",
},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_send_missing_to_addresses(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Lista destinatari vuota → 422 validazione."""
with patch("app.services.send_service._get_arq_pool") as mock_pool:
mock_pool.return_value = AsyncMock()
mock_pool.return_value.enqueue_job = AsyncMock()
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": [],
"subject": "Test",
"body_text": "corpo",
},
headers=auth_headers,
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_send_missing_subject(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Oggetto vuoto → 422 validazione."""
with patch("app.services.send_service._get_arq_pool") as mock_pool:
mock_pool.return_value = AsyncMock()
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": " ",
"body_text": "corpo",
},
headers=auth_headers,
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_send_mailbox_not_found(self, client: AsyncClient, auth_headers: dict):
"""Casella inesistente → 404."""
with patch("app.services.send_service._get_arq_pool") as mock_pool:
mock_pool.return_value = AsyncMock()
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(uuid.uuid4()),
"to_addresses": ["dest@pec.it"],
"subject": "Test",
"body_text": "corpo",
},
headers=auth_headers,
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_send_success_creates_job(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Invio valido → 201 con SendJobResponse."""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(return_value=None)
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["matteo1801@spidmail.it"],
"subject": "Test PEChub Fase 4",
"body_text": "Messaggio di test inviato da PEChub.",
},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["status"] == "pending"
assert data["mailbox_id"] == str(active_mailbox.id)
assert data["attempt_count"] == 0
assert data["max_attempts"] == 5
assert "id" in data
# Verifica che arq sia stato chiamato
mock_arq.enqueue_job.assert_called_once()
call_args = mock_arq.enqueue_job.call_args
assert call_args[0][0] == "send_pec"
@pytest.mark.asyncio
async def test_send_with_cc(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Invio con Cc → 201 con cc_addresses nel messaggio."""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(return_value=None)
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"cc_addresses": ["cc@pec.it"],
"subject": "Test con Cc",
"body_text": "corpo",
},
headers=auth_headers,
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_send_arq_failure_still_returns_201(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""
Se arq fallisce (Redis down), il job viene comunque creato nel DB
e l'API risponde 201 (il job resta pending).
"""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(side_effect=ConnectionError("Redis non disponibile"))
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
response = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": "Test Redis down",
"body_text": "corpo",
},
headers=auth_headers,
)
# Il job deve essere creato anche se Redis fallisce
assert response.status_code == 201
assert response.json()["status"] == "pending"
# ─── Test GET /send/jobs ──────────────────────────────────────────────────────
class TestListSendJobs:
"""Test endpoint GET /api/v1/send/jobs."""
@pytest.mark.asyncio
async def test_list_requires_authentication(self, client: AsyncClient):
"""Senza token → 401."""
response = await client.get("/api/v1/send/jobs")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_list_returns_empty_for_new_tenant(
self, client: AsyncClient, auth_headers: dict
):
"""Tenant senza job → lista vuota."""
response = await client.get("/api/v1/send/jobs", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "total" in data
@pytest.mark.asyncio
async def test_list_after_send(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Dopo un invio, la lista deve contenere almeno un job."""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(return_value=None)
# Crea un job
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": "Job per lista",
"body_text": "corpo",
},
headers=auth_headers,
)
# Lista
response = await client.get("/api/v1/send/jobs", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
# ─── Test GET /send/jobs/{id} ─────────────────────────────────────────────────
class TestGetSendJob:
"""Test endpoint GET /api/v1/send/jobs/{job_id}."""
@pytest.mark.asyncio
async def test_get_nonexistent_job(self, client: AsyncClient, auth_headers: dict):
"""Job inesistente → 404."""
response = await client.get(
f"/api/v1/send/jobs/{uuid.uuid4()}", headers=auth_headers
)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_existing_job(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Recupera un job esistente."""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(return_value=None)
# Crea
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
create_resp = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": "Job da recuperare",
"body_text": "corpo",
},
headers=auth_headers,
)
assert create_resp.status_code == 201
job_id = create_resp.json()["id"]
# Recupera
response = await client.get(
f"/api/v1/send/jobs/{job_id}", headers=auth_headers
)
assert response.status_code == 200
assert response.json()["id"] == job_id
# ─── Test DELETE /send/jobs/{id} ─────────────────────────────────────────────
class TestCancelSendJob:
"""Test endpoint DELETE /api/v1/send/jobs/{job_id}."""
@pytest.mark.asyncio
async def test_cancel_pending_job(
self, client: AsyncClient, auth_headers: dict, active_mailbox
):
"""Annulla un job in stato pending → 204."""
mock_arq = AsyncMock()
mock_arq.enqueue_job = AsyncMock(return_value=None)
# Crea job
with patch("app.services.send_service._get_arq_pool", return_value=mock_arq):
create_resp = await client.post(
"/api/v1/send",
json={
"mailbox_id": str(active_mailbox.id),
"to_addresses": ["dest@pec.it"],
"subject": "Job da annullare",
"body_text": "corpo",
},
headers=auth_headers,
)
job_id = create_resp.json()["id"]
# Annulla
response = await client.delete(
f"/api/v1/send/jobs/{job_id}", headers=auth_headers
)
assert response.status_code == 204
# Verifica stato
get_resp = await client.get(
f"/api/v1/send/jobs/{job_id}", headers=auth_headers
)
assert get_resp.json()["status"] == "failed"
assert "Annullato" in get_resp.json()["last_error"]