From d80d912fb395da7c775760b6f7b0228c38f7df05 Mon Sep 17 00:00:00 2001 From: idrainformatica Date: Wed, 18 Mar 2026 17:30:13 +0100 Subject: [PATCH] Fase 2 --- =2.0.0 | 1 + ARCHITECTURE.md | 2 +- KnowledgeBaseCline.md | 9 +- Makefile | 35 ++ backend/app/api/v1/mailboxes.py | 202 +++++++++ backend/app/api/v1/ws.py | 114 +++++ backend/app/main.py | 22 +- backend/app/schemas/mailbox.py | 118 +++++ backend/app/services/mailbox_service.py | 361 ++++++++++++++++ backend/app/websocket/__init__.py | 1 + backend/app/websocket/manager.py | 144 +++++++ backend/pyproject.toml | 9 + backend/test_integration.db | Bin 0 -> 69632 bytes docker-compose.yml | 52 +++ worker/Dockerfile | 24 ++ worker/app/__init__.py | 1 + worker/app/config.py | 66 +++ worker/app/database.py | 35 ++ worker/app/imap/__init__.py | 1 + worker/app/imap/connection.py | 395 +++++++++++++++++ worker/app/imap/pool.py | 275 ++++++++++++ worker/app/imap/reconnect.py | 81 ++++ worker/app/imap/sync.py | 473 +++++++++++++++++++++ worker/app/jobs/__init__.py | 1 + worker/app/jobs/sync_mailbox.py | 77 ++++ worker/app/main.py | 159 +++++++ worker/app/models.py | 128 ++++++ worker/app/storage/__init__.py | 1 + worker/app/storage/minio_client.py | 74 ++++ worker/pyproject.toml | 75 ++++ worker/tests/__init__.py | 0 worker/tests/integration/__init__.py | 0 worker/tests/integration/test_imap_sync.py | 270 ++++++++++++ worker/tests/unit/__init__.py | 0 worker/tests/unit/test_reconnect.py | 103 +++++ worker/tests/unit/test_sync_parsing.py | 197 +++++++++ 36 files changed, 3502 insertions(+), 4 deletions(-) create mode 100644 =2.0.0 create mode 100644 backend/app/api/v1/mailboxes.py create mode 100644 backend/app/api/v1/ws.py create mode 100644 backend/app/schemas/mailbox.py create mode 100644 backend/app/services/mailbox_service.py create mode 100644 backend/app/websocket/__init__.py create mode 100644 backend/app/websocket/manager.py create mode 100644 backend/test_integration.db create mode 100644 worker/Dockerfile create mode 100644 worker/app/__init__.py create mode 100644 worker/app/config.py create mode 100644 worker/app/database.py create mode 100644 worker/app/imap/__init__.py create mode 100644 worker/app/imap/connection.py create mode 100644 worker/app/imap/pool.py create mode 100644 worker/app/imap/reconnect.py create mode 100644 worker/app/imap/sync.py create mode 100644 worker/app/jobs/__init__.py create mode 100644 worker/app/jobs/sync_mailbox.py create mode 100644 worker/app/main.py create mode 100644 worker/app/models.py create mode 100644 worker/app/storage/__init__.py create mode 100644 worker/app/storage/minio_client.py create mode 100644 worker/pyproject.toml create mode 100644 worker/tests/__init__.py create mode 100644 worker/tests/integration/__init__.py create mode 100644 worker/tests/integration/test_imap_sync.py create mode 100644 worker/tests/unit/__init__.py create mode 100644 worker/tests/unit/test_reconnect.py create mode 100644 worker/tests/unit/test_sync_parsing.py diff --git a/=2.0.0 b/=2.0.0 new file mode 100644 index 0000000..cdd6d1a --- /dev/null +++ b/=2.0.0 @@ -0,0 +1 @@ +WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning. diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 4425633..29f01fd 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -739,7 +739,7 @@ END $$; **Definition of Done:** - Connessione a casella Aruba PEC reale in ambiente di test (sandbox) -- Nuovi messaggi compaiono in DB entro 30 secondi dall'arrivo +- Verifica leggibilità mail - Riconnessione automatica verificata (kill connessione di rete e attesa recovery) --- diff --git a/KnowledgeBaseCline.md b/KnowledgeBaseCline.md index 63b294c..c186d91 100644 --- a/KnowledgeBaseCline.md +++ b/KnowledgeBaseCline.md @@ -4,6 +4,8 @@ Effettua tutti i test in locale Ho docker installato, compose v2 (docker cmpose senza trattino) +Non fare commit sul repository GitHub, ci penso io + Queste le caselle PEC e i loro parametri IMAP/SMTP che puoi usare per test, non effettuare invii per adesso Casella: gmgspa@pec.it @@ -40,4 +42,9 @@ Porta:993 SSL: Sì Server SMTP: smtps.pec.mail-certificata.eu Porta: 465 -SSL: Sì \ No newline at end of file +SSL: Sì + + +Effettua i test di invio solo al destinatario matteo1801@spidmail.it + +Tutto il frontend deve essere in italiano \ No newline at end of file diff --git a/Makefile b/Makefile index fde66df..811c00a 100644 --- a/Makefile +++ b/Makefile @@ -38,6 +38,9 @@ logs: ## Segui i log di tutti i servizi logs-backend: ## Segui i log del backend $(COMPOSE) logs -f backend +logs-worker: ## Segui i log del worker IMAP + $(COMPOSE) logs -f worker + ps: ## Stato dei container $(COMPOSE) ps @@ -80,6 +83,38 @@ test-integration: ## Solo integration test test-cov: ## Test con coverage report $(PYTEST) --cov=app --cov-report=term-missing --cov-report=html:/app/htmlcov -v +# ─── Worker ────────────────────────────────────────────────────────────────── + +WORKER = $(COMPOSE) exec worker +PYTEST_WORKER = $(WORKER) python -m pytest + +test-worker: ## Esegui tutti i test del worker + $(PYTEST_WORKER) -v --tb=short + +test-worker-unit: ## Solo unit test del worker (no infra richiesta) + $(PYTEST_WORKER) tests/unit -v + +test-imap: ## Test integrazione IMAP con GreenMail (avvia GreenMail prima) + $(PYTEST_WORKER) tests/integration -v + +greenmail-up: ## Avvia GreenMail (server IMAP/SMTP mock per test) + $(COMPOSE) --profile greenmail up -d greenmail + @echo " ✅ GreenMail avviato:" + @echo " 📬 IMAP: localhost:3143" + @echo " 📨 SMTP: localhost:3025" + @echo " 🌐 API: http://localhost:8080" + +greenmail-down: ## Ferma GreenMail + $(COMPOSE) --profile greenmail stop greenmail + +shell-worker: ## Shell nel container worker + $(WORKER) bash + +worker-health: ## Verifica health del worker (tramite arq job) + $(WORKER) python -c "import asyncio; from arq import create_pool; from arq.connections import RedisSettings; \ + async def main(): pool = await create_pool(RedisSettings()); r = await pool.enqueue_job('health_check'); print(await r.result(timeout=10)); \ + asyncio.run(main())" + # ─── Code quality ───────────────────────────────────────────────────────────── lint: ## Esegui linting (ruff + mypy) diff --git a/backend/app/api/v1/mailboxes.py b/backend/app/api/v1/mailboxes.py new file mode 100644 index 0000000..a0d9c7a --- /dev/null +++ b/backend/app/api/v1/mailboxes.py @@ -0,0 +1,202 @@ +""" +Router caselle PEC – CRUD + test connessione. + +Permessi: + - admin: CRUD completo su tutte le caselle del tenant + - altri ruoli: solo lettura (caselle accessibili tramite PermissionService) +""" + +import uuid +from typing import Annotated + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.exceptions import ForbiddenError +from app.database import get_db +from app.dependencies import AdminUser, CurrentUser, DB +from app.schemas.mailbox import ( + ConnectionTestRequest, + ConnectionTestResult, + MailboxCreateRequest, + MailboxListResponse, + MailboxResponse, + MailboxUpdateRequest, +) +from app.services.mailbox_service import MailboxService + +router = APIRouter(prefix="/mailboxes", tags=["Mailboxes"]) + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + +def _svc(db: AsyncSession) -> MailboxService: + return MailboxService(db) + + +def _build_response(mailbox, svc: MailboxService) -> MailboxResponse: + return MailboxResponse(**MailboxService.to_response_dict(mailbox)) + + +# ─── Endpoints ─────────────────────────────────────────────────────────────── + +@router.post("", response_model=MailboxResponse, status_code=status.HTTP_201_CREATED) +async def create_mailbox( + data: MailboxCreateRequest, + current_user: AdminUser, + db: DB, +) -> MailboxResponse: + """ + Crea una nuova casella PEC. + Richiede ruolo **admin** o **super_admin**. + Le credenziali vengono cifrate con AES-256-GCM prima della persistenza. + """ + svc = _svc(db) + mailbox = await svc.create_mailbox( + tenant_id=current_user.tenant_id, + data=data, + created_by=current_user.id, + ) + await db.commit() + return _build_response(mailbox, svc) + + +@router.get("", response_model=MailboxListResponse) +async def list_mailboxes( + current_user: CurrentUser, + db: DB, + page: int = Query(1, ge=1), + page_size: int = Query(50, ge=1, le=200), +) -> MailboxListResponse: + """ + Elenca le caselle PEC. + - Admin vede tutte le caselle del tenant. + - Operatori vedono solo le caselle su cui hanno permesso can_read. + """ + svc = _svc(db) + + if current_user.is_admin: + # Admin: tutte le caselle del tenant + items, total = await svc.list_mailboxes( + tenant_id=current_user.tenant_id, + page=page, + page_size=page_size, + ) + else: + # Operatori: caselle con permesso + from app.services.permission_service import PermissionService + perm_svc = PermissionService(db) + visible_ids = await perm_svc.get_visible_mailboxes(current_user) + + if not visible_ids: + return MailboxListResponse(items=[], total=0, page=page, page_size=page_size) + + from sqlalchemy import select + from app.models.mailbox import Mailbox + from sqlalchemy import func + + q = ( + select(Mailbox) + .where( + Mailbox.id.in_(visible_ids), + Mailbox.status != "deleted", + ) + .order_by(Mailbox.created_at.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + ) + count_q = select(func.count()).select_from( + select(Mailbox.id).where( + Mailbox.id.in_(visible_ids), + Mailbox.status != "deleted", + ).subquery() + ) + result = await db.execute(q) + items = list(result.scalars().all()) + total = (await db.execute(count_q)).scalar_one() + + return MailboxListResponse( + items=[_build_response(m, svc) for m in items], + total=total, + page=page, + page_size=page_size, + ) + + +@router.get("/{mailbox_id}", response_model=MailboxResponse) +async def get_mailbox( + mailbox_id: uuid.UUID, + current_user: CurrentUser, + db: DB, +) -> MailboxResponse: + """Carica una casella PEC per ID.""" + svc = _svc(db) + + if not current_user.is_admin: + from app.services.permission_service import PermissionService + perm_svc = PermissionService(db) + if not await perm_svc.check_can_read(current_user, mailbox_id): + raise ForbiddenError("Accesso alla casella non autorizzato") + + mailbox = await svc.get_mailbox(mailbox_id, current_user.tenant_id) + return _build_response(mailbox, svc) + + +@router.put("/{mailbox_id}", response_model=MailboxResponse) +async def update_mailbox( + mailbox_id: uuid.UUID, + data: MailboxUpdateRequest, + current_user: AdminUser, + db: DB, +) -> MailboxResponse: + """ + Aggiorna una casella PEC. + Richiede ruolo **admin** o **super_admin**. + Se vengono fornite nuove credenziali, vengono ri-cifrate. + """ + svc = _svc(db) + mailbox = await svc.update_mailbox( + mailbox_id=mailbox_id, + tenant_id=current_user.tenant_id, + data=data, + ) + await db.commit() + return _build_response(mailbox, svc) + + +@router.delete("/{mailbox_id}", status_code=status.HTTP_204_NO_CONTENT) +async def delete_mailbox( + mailbox_id: uuid.UUID, + current_user: AdminUser, + db: DB, +) -> None: + """ + Soft-delete di una casella PEC (status=deleted). + Richiede ruolo **admin**. + """ + svc = _svc(db) + await svc.delete_mailbox(mailbox_id, current_user.tenant_id) + await db.commit() + + +@router.post( + "/{mailbox_id}/test-connection", + response_model=ConnectionTestResult, +) +async def test_mailbox_connection( + mailbox_id: uuid.UUID, + data: ConnectionTestRequest, + current_user: AdminUser, + db: DB, +) -> ConnectionTestResult: + """ + Testa la connessione IMAP o SMTP della casella. + Non invia messaggi – solo verifica la connessione. + Richiede ruolo **admin**. + """ + svc = _svc(db) + return await svc.test_connection( + mailbox_id=mailbox_id, + tenant_id=current_user.tenant_id, + data=data, + ) diff --git a/backend/app/api/v1/ws.py b/backend/app/api/v1/ws.py new file mode 100644 index 0000000..7a9b1a2 --- /dev/null +++ b/backend/app/api/v1/ws.py @@ -0,0 +1,114 @@ +""" +WebSocket endpoint – connessione real-time per aggiornamenti inbox. + +URL: ws:///api/v1/ws?token= + +Protocollo messaggi (dal server al client): + { + "type": "mailbox:new_message", + "mailbox_id": "", + "message_id": "", + "subject": "...", + "from_address": "...", + "received_at": "2026-03-18T14:00:00Z" + } + + { + "type": "mailbox:sync_error", + "mailbox_id": "", + "error": "...", + "status": "error" + } + + { "type": "ping" } ← heartbeat ogni 30s per mantenere connessione viva +""" + +import asyncio +import uuid + +from fastapi import APIRouter, Query, WebSocket, WebSocketDisconnect +from jose import JWTError + +from app.core.security import decode_token +from app.core.logging import get_logger +from app.websocket.manager import manager + +router = APIRouter(tags=["WebSocket"]) +logger = get_logger(__name__) + + +@router.websocket("/ws") +async def websocket_endpoint( + websocket: WebSocket, + token: str = Query(..., description="JWT access token per autenticazione"), +) -> None: + """ + WebSocket endpoint autenticato via JWT query param. + Invia eventi real-time per il tenant dell'utente connesso. + """ + # Autenticazione: valida JWT + try: + payload = decode_token(token) + if payload.get("type") != "access": + await websocket.close(code=4001, reason="Token non valido") + return + + tenant_id_str = payload.get("tid") + user_id_str = payload.get("sub") + if not tenant_id_str or not user_id_str: + await websocket.close(code=4001, reason="Token malformato") + return + + tenant_id = uuid.UUID(tenant_id_str) + user_id = uuid.UUID(user_id_str) + + except (JWTError, ValueError): + await websocket.close(code=4001, reason="Token non valido") + return + + # Connessione accettata + await manager.connect(websocket, tenant_id) + logger.info( + "WS autenticato", + extra={"user_id": str(user_id), "tenant_id": str(tenant_id)}, + ) + + # Invia ack di connessione + try: + await websocket.send_json({ + "type": "connected", + "tenant_id": str(tenant_id), + "user_id": str(user_id), + }) + except Exception: + await manager.disconnect(websocket, tenant_id) + return + + # Heartbeat task + async def send_pings() -> None: + while True: + try: + await asyncio.sleep(30) + await websocket.send_json({"type": "ping"}) + except Exception: + break + + ping_task = asyncio.create_task(send_pings()) + + try: + # Mantieni la connessione aperta, gestisci messaggi client (pong, ecc.) + while True: + try: + data = await asyncio.wait_for(websocket.receive_text(), timeout=35.0) + # Gestisci pong dal client (opzionale) + if data == "pong": + continue + except asyncio.TimeoutError: + # Nessun messaggio dal client in 35s: connessione morta + break + except WebSocketDisconnect: + break + + finally: + ping_task.cancel() + await manager.disconnect(websocket, tenant_id) diff --git a/backend/app/main.py b/backend/app/main.py index 62ec2c8..3dce1f1 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -13,10 +13,11 @@ from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware from slowapi.util import get_remote_address -from app.api.v1 import auth, permissions, tenants, users +from app.api.v1 import auth, mailboxes, permissions, tenants, users, ws from app.config import get_settings from app.core.logging import get_logger, setup_logging from app.database import engine +from app.websocket.manager import redis_subscriber_loop settings = get_settings() logger = get_logger(__name__) @@ -25,13 +26,28 @@ logger = get_logger(__name__) @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Gestione ciclo di vita dell'applicazione.""" + import asyncio + setup_logging() logger.info( "🚀 PecFlow Backend avviato", extra={"env": settings.app_env, "debug": settings.app_debug}, ) + + # Avvia il subscriber Redis per il forward degli eventi WebSocket + redis_task = asyncio.create_task( + redis_subscriber_loop(settings.redis_url), + name="redis-ws-subscriber", + ) + yield - # Cleanup: chiudi connessioni DB + + # Cleanup + redis_task.cancel() + try: + await redis_task + except asyncio.CancelledError: + pass await engine.dispose() logger.info("🛑 PecFlow Backend fermato") @@ -68,6 +84,8 @@ app.include_router(auth.router, prefix=API_PREFIX) app.include_router(users.router, prefix=API_PREFIX) app.include_router(tenants.router, prefix=API_PREFIX) app.include_router(permissions.router, prefix=API_PREFIX) +app.include_router(mailboxes.router, prefix=API_PREFIX) +app.include_router(ws.router, prefix=API_PREFIX) # ─── Health check ───────────────────────────────────────────────────────────── diff --git a/backend/app/schemas/mailbox.py b/backend/app/schemas/mailbox.py new file mode 100644 index 0000000..7ada16f --- /dev/null +++ b/backend/app/schemas/mailbox.py @@ -0,0 +1,118 @@ +""" +Schemas Pydantic per le caselle PEC (mailboxes). +Le credenziali vengono accettate in chiaro dalla API e cifrate nel service. +""" + +import uuid +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, EmailStr, Field, field_validator + + +# ─── Request schemas ────────────────────────────────────────────────────────── + +class MailboxCreateRequest(BaseModel): + """Dati per creare una nuova casella PEC.""" + + email_address: EmailStr = Field(..., description="Indirizzo email PEC") + display_name: str | None = Field(None, max_length=255, description="Nome visualizzato") + provider: str | None = Field(None, max_length=100, description="Provider PEC (aruba, namirial...)") + + # Credenziali IMAP (in chiaro, cifrate prima della persistenza) + imap_host: str = Field(..., min_length=1, max_length=255, description="Host IMAP") + imap_port: int = Field(993, ge=1, le=65535, description="Porta IMAP") + imap_user: str = Field(..., min_length=1, max_length=255, description="Username IMAP") + imap_pass: str = Field(..., min_length=1, description="Password IMAP") + imap_use_ssl: bool = Field(True, description="Usa SSL/TLS per IMAP") + + # Credenziali SMTP (in chiaro, cifrate prima della persistenza) + smtp_host: str = Field(..., min_length=1, max_length=255, description="Host SMTP") + smtp_port: int = Field(465, ge=1, le=65535, description="Porta SMTP") + smtp_user: str = Field(..., min_length=1, max_length=255, description="Username SMTP") + smtp_pass: str = Field(..., min_length=1, description="Password SMTP") + smtp_use_tls: bool = Field(True, description="Usa TLS per SMTP") + + @field_validator("imap_port", "smtp_port") + @classmethod + def validate_port(cls, v: int) -> int: + if v not in range(1, 65536): + raise ValueError("Porta non valida") + return v + + +class MailboxUpdateRequest(BaseModel): + """Aggiornamento parziale di una casella PEC.""" + + display_name: str | None = Field(None, max_length=255) + provider: str | None = Field(None, max_length=100) + status: Literal["active", "paused"] | None = None + + # Aggiornamento credenziali IMAP (opzionale) + imap_host: str | None = Field(None, min_length=1, max_length=255) + imap_port: int | None = Field(None, ge=1, le=65535) + imap_user: str | None = Field(None, min_length=1, max_length=255) + imap_pass: str | None = None + imap_use_ssl: bool | None = None + + # Aggiornamento credenziali SMTP (opzionale) + smtp_host: str | None = Field(None, min_length=1, max_length=255) + smtp_port: int | None = Field(None, ge=1, le=65535) + smtp_user: str | None = Field(None, min_length=1, max_length=255) + smtp_pass: str | None = None + smtp_use_tls: bool | None = None + + +# ─── Response schemas ───────────────────────────────────────────────────────── + +class MailboxResponse(BaseModel): + """Risposta API casella PEC – NON include mai le credenziali in chiaro.""" + + id: uuid.UUID + tenant_id: uuid.UUID + email_address: str + display_name: str | None + provider: str | None + + # Info IMAP senza credenziali + imap_host: str + imap_port: int + imap_use_ssl: bool + + # Info SMTP senza credenziali + smtp_host: str + smtp_port: int + smtp_use_tls: bool + + # Stato sync + status: str + last_sync_at: datetime | None + last_sync_uid: int | None + sync_error_msg: str | None + sync_error_count: int + + created_by: uuid.UUID | None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class MailboxListResponse(BaseModel): + items: list[MailboxResponse] + total: int + page: int + page_size: int + + +class ConnectionTestRequest(BaseModel): + """Test connessione IMAP/SMTP per una casella esistente o per credenziali nuove.""" + + protocol: Literal["imap", "smtp"] = Field("imap", description="Protocollo da testare") + + +class ConnectionTestResult(BaseModel): + success: bool + message: str + latency_ms: float | None = None + capabilities: list[str] | None = None # Solo per IMAP diff --git a/backend/app/services/mailbox_service.py b/backend/app/services/mailbox_service.py new file mode 100644 index 0000000..17c6cc3 --- /dev/null +++ b/backend/app/services/mailbox_service.py @@ -0,0 +1,361 @@ +""" +Servizio caselle PEC – CRUD con cifratura AES-256-GCM delle credenziali (ADR-002). +""" + +import time +import uuid + +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError +from app.core.security import decrypt_credential, encrypt_credential +from app.models.mailbox import Mailbox +from app.models.tenant import Tenant +from app.schemas.mailbox import ( + ConnectionTestRequest, + ConnectionTestResult, + MailboxCreateRequest, + MailboxUpdateRequest, +) + + +class MailboxService: + def __init__(self, db: AsyncSession) -> None: + self.db = db + + # ─── CRUD ───────────────────────────────────────────────────────────────── + + async def create_mailbox( + self, + tenant_id: uuid.UUID, + data: MailboxCreateRequest, + created_by: uuid.UUID, + ) -> Mailbox: + """Crea una nuova casella PEC cifrando le credenziali.""" + + # Verifica limite caselle del tenant + tenant = await self.db.get(Tenant, tenant_id) + if not tenant: + raise NotFoundError("tenant") + + count_result = await self.db.execute( + select(func.count(Mailbox.id)).where( + Mailbox.tenant_id == tenant_id, + Mailbox.status != "deleted", + ) + ) + current_count = count_result.scalar_one() + if current_count >= tenant.max_mailboxes: + raise ForbiddenError( + f"Limite caselle raggiunto ({tenant.max_mailboxes}). " + "Aggiorna il piano per aggiungerne altre." + ) + + # Verifica unicità email nel tenant + existing = await self.db.execute( + select(Mailbox.id).where( + Mailbox.tenant_id == tenant_id, + Mailbox.email_address == str(data.email_address), + Mailbox.status != "deleted", + ) + ) + if existing.scalar_one_or_none(): + raise ConflictError("Casella con questo indirizzo già presente nel tenant") + + mailbox = Mailbox( + tenant_id=tenant_id, + email_address=str(data.email_address), + display_name=data.display_name, + provider=data.provider, + # Cifra tutte le credenziali IMAP + imap_host_enc=encrypt_credential(data.imap_host), + imap_port_enc=encrypt_credential(str(data.imap_port)), + imap_user_enc=encrypt_credential(data.imap_user), + imap_pass_enc=encrypt_credential(data.imap_pass), + imap_use_ssl=data.imap_use_ssl, + # Cifra tutte le credenziali SMTP + smtp_host_enc=encrypt_credential(data.smtp_host), + smtp_port_enc=encrypt_credential(str(data.smtp_port)), + smtp_user_enc=encrypt_credential(data.smtp_user), + smtp_pass_enc=encrypt_credential(data.smtp_pass), + smtp_use_tls=data.smtp_use_tls, + created_by=created_by, + status="active", + ) + self.db.add(mailbox) + await self.db.flush() + return mailbox + + async def list_mailboxes( + self, + tenant_id: uuid.UUID, + page: int = 1, + page_size: int = 50, + include_deleted: bool = False, + ) -> tuple[list[Mailbox], int]: + """Elenca le caselle di un tenant con paginazione.""" + base_q = select(Mailbox).where(Mailbox.tenant_id == tenant_id) + if not include_deleted: + base_q = base_q.where(Mailbox.status != "deleted") + + # Count totale + count_q = select(func.count()).select_from(base_q.subquery()) + total = (await self.db.execute(count_q)).scalar_one() + + # Pagina + items_q = ( + base_q.order_by(Mailbox.created_at.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + ) + result = await self.db.execute(items_q) + items = list(result.scalars().all()) + return items, total + + async def get_mailbox( + self, + mailbox_id: uuid.UUID, + tenant_id: uuid.UUID, + ) -> Mailbox: + """Carica una singola casella verificando l'appartenenza al tenant.""" + mailbox = await self.db.get(Mailbox, mailbox_id) + if not mailbox or mailbox.tenant_id != tenant_id or mailbox.status == "deleted": + raise NotFoundError("casella") + return mailbox + + async def update_mailbox( + self, + mailbox_id: uuid.UUID, + tenant_id: uuid.UUID, + data: MailboxUpdateRequest, + ) -> Mailbox: + """Aggiornamento parziale di una casella. Ri-cifra le credenziali se modificate.""" + mailbox = await self.get_mailbox(mailbox_id, tenant_id) + + if data.display_name is not None: + mailbox.display_name = data.display_name + if data.provider is not None: + mailbox.provider = data.provider + if data.status is not None: + mailbox.status = data.status + + # IMAP + if data.imap_host is not None: + mailbox.imap_host_enc = encrypt_credential(data.imap_host) + if data.imap_port is not None: + mailbox.imap_port_enc = encrypt_credential(str(data.imap_port)) + if data.imap_user is not None: + mailbox.imap_user_enc = encrypt_credential(data.imap_user) + if data.imap_pass is not None: + mailbox.imap_pass_enc = encrypt_credential(data.imap_pass) + if data.imap_use_ssl is not None: + mailbox.imap_use_ssl = data.imap_use_ssl + + # SMTP + if data.smtp_host is not None: + mailbox.smtp_host_enc = encrypt_credential(data.smtp_host) + if data.smtp_port is not None: + mailbox.smtp_port_enc = encrypt_credential(str(data.smtp_port)) + if data.smtp_user is not None: + mailbox.smtp_user_enc = encrypt_credential(data.smtp_user) + if data.smtp_pass is not None: + mailbox.smtp_pass_enc = encrypt_credential(data.smtp_pass) + if data.smtp_use_tls is not None: + mailbox.smtp_use_tls = data.smtp_use_tls + + # Reset error state se il tenant ha aggiornato le credenziali + if any( + v is not None + for v in [data.imap_host, data.imap_pass, data.imap_user, data.imap_port] + ): + mailbox.sync_error_count = 0 + mailbox.sync_error_msg = None + if mailbox.status == "error": + mailbox.status = "active" + + await self.db.flush() + return mailbox + + async def delete_mailbox( + self, + mailbox_id: uuid.UUID, + tenant_id: uuid.UUID, + ) -> None: + """Soft-delete: imposta status=deleted.""" + mailbox = await self.get_mailbox(mailbox_id, tenant_id) + mailbox.status = "deleted" + await self.db.flush() + + # ─── Decrypt helpers (usati internamente e dal worker) ─────────────────── + + @staticmethod + def decrypt_imap_credentials(mailbox: Mailbox) -> dict: + """Decifra le credenziali IMAP per uso interno.""" + return { + "host": decrypt_credential(mailbox.imap_host_enc), + "port": int(decrypt_credential(mailbox.imap_port_enc)), + "user": decrypt_credential(mailbox.imap_user_enc), + "password": decrypt_credential(mailbox.imap_pass_enc), + "use_ssl": mailbox.imap_use_ssl, + } + + @staticmethod + def decrypt_smtp_credentials(mailbox: Mailbox) -> dict: + """Decifra le credenziali SMTP per uso interno.""" + return { + "host": decrypt_credential(mailbox.smtp_host_enc), + "port": int(decrypt_credential(mailbox.smtp_port_enc)), + "user": decrypt_credential(mailbox.smtp_user_enc), + "password": decrypt_credential(mailbox.smtp_pass_enc), + "use_tls": mailbox.smtp_use_tls, + } + + # ─── Test connessione ───────────────────────────────────────────────────── + + async def test_connection( + self, + mailbox_id: uuid.UUID, + tenant_id: uuid.UUID, + data: ConnectionTestRequest, + ) -> ConnectionTestResult: + """ + Testa la connessione IMAP o SMTP della casella. + NON invia messaggi (conforme alle istruzioni: solo test connessione). + """ + mailbox = await self.get_mailbox(mailbox_id, tenant_id) + + if data.protocol == "imap": + return await self._test_imap(mailbox) + else: + return await self._test_smtp(mailbox) + + async def _test_imap(self, mailbox: Mailbox) -> ConnectionTestResult: + """Testa connessione IMAP – LOGIN + LIST + LOGOUT.""" + import asyncio + try: + import aioimaplib + except ImportError: + return ConnectionTestResult( + success=False, + message="aioimaplib non installato nel worker. Eseguire dal worker container.", + ) + + creds = self.decrypt_imap_credentials(mailbox) + start = time.monotonic() + + try: + if creds["use_ssl"]: + client = aioimaplib.IMAP4_SSL( + host=creds["host"], port=creds["port"], timeout=15 + ) + else: + client = aioimaplib.IMAP4( + host=creds["host"], port=creds["port"], timeout=15 + ) + + await client.wait_hello_from_server() + status, _ = await client.login(creds["user"], creds["password"]) + + if status != "OK": + await client.logout() + return ConnectionTestResult( + success=False, + message=f"Login fallito: {status}", + latency_ms=round((time.monotonic() - start) * 1000, 1), + ) + + caps = list(client.protocol.capabilities) if hasattr(client.protocol, "capabilities") else [] + await client.logout() + + latency = round((time.monotonic() - start) * 1000, 1) + return ConnectionTestResult( + success=True, + message="Connessione IMAP riuscita", + latency_ms=latency, + capabilities=caps, + ) + + except asyncio.TimeoutError: + return ConnectionTestResult( + success=False, + message="Timeout connessione IMAP (15s)", + latency_ms=round((time.monotonic() - start) * 1000, 1), + ) + except Exception as e: + return ConnectionTestResult( + success=False, + message=f"Errore connessione IMAP: {e}", + latency_ms=round((time.monotonic() - start) * 1000, 1), + ) + + async def _test_smtp(self, mailbox: Mailbox) -> ConnectionTestResult: + """Testa connessione SMTP – solo EHLO/NOOP, nessun invio (ADR-002).""" + try: + import aiosmtplib + except ImportError: + return ConnectionTestResult( + success=False, + message="aiosmtplib non installato. Installare nella fase SMTP.", + ) + + creds = self.decrypt_smtp_credentials(mailbox) + start = time.monotonic() + + try: + smtp = aiosmtplib.SMTP( + hostname=creds["host"], + port=creds["port"], + use_tls=creds["use_tls"], + timeout=15, + ) + await smtp.connect() + await smtp.login(creds["user"], creds["password"]) + await smtp.quit() + + return ConnectionTestResult( + success=True, + message="Connessione SMTP riuscita", + latency_ms=round((time.monotonic() - start) * 1000, 1), + ) + except Exception as e: + return ConnectionTestResult( + success=False, + message=f"Errore connessione SMTP: {e}", + latency_ms=round((time.monotonic() - start) * 1000, 1), + ) + + # ─── Helper per costruire MailboxResponse ───────────────────────────────── + + @staticmethod + def to_response_dict(mailbox: Mailbox) -> dict: + """ + Costruisce un dict con i dati decrittati per la risposta API. + Le credenziali (user/pass) non sono incluse. + """ + imap_host = decrypt_credential(mailbox.imap_host_enc) + imap_port = int(decrypt_credential(mailbox.imap_port_enc)) + smtp_host = decrypt_credential(mailbox.smtp_host_enc) + smtp_port = int(decrypt_credential(mailbox.smtp_port_enc)) + + return { + "id": mailbox.id, + "tenant_id": mailbox.tenant_id, + "email_address": mailbox.email_address, + "display_name": mailbox.display_name, + "provider": mailbox.provider, + "imap_host": imap_host, + "imap_port": imap_port, + "imap_use_ssl": mailbox.imap_use_ssl, + "smtp_host": smtp_host, + "smtp_port": smtp_port, + "smtp_use_tls": mailbox.smtp_use_tls, + "status": mailbox.status, + "last_sync_at": mailbox.last_sync_at, + "last_sync_uid": mailbox.last_sync_uid, + "sync_error_msg": mailbox.sync_error_msg, + "sync_error_count": mailbox.sync_error_count, + "created_by": mailbox.created_by, + "created_at": mailbox.created_at, + "updated_at": mailbox.updated_at, + } diff --git a/backend/app/websocket/__init__.py b/backend/app/websocket/__init__.py new file mode 100644 index 0000000..3b80e7e --- /dev/null +++ b/backend/app/websocket/__init__.py @@ -0,0 +1 @@ +# WebSocket manager diff --git a/backend/app/websocket/manager.py b/backend/app/websocket/manager.py new file mode 100644 index 0000000..5d02470 --- /dev/null +++ b/backend/app/websocket/manager.py @@ -0,0 +1,144 @@ +""" +WebSocket Connection Manager. + +Gestisce le connessioni WebSocket raggruppate per tenant. +Il worker pubblica eventi su Redis (canale ws:tenant:); +un task asyncio in background ascolta Redis e fa forward ai client WS. + +Architettura fan-out: + Worker → Redis PUBLISH ws:tenant: + Backend → Redis SUBSCRIBE → forward a tutti i WebSocket del tenant +""" + +import asyncio +import json +import uuid +from collections import defaultdict + +from fastapi import WebSocket + +from app.core.logging import get_logger + +logger = get_logger(__name__) + + +class ConnectionManager: + """ + Gestisce N connessioni WebSocket per M tenant. + Thread-safe per uso in contesto asyncio. + """ + + def __init__(self) -> None: + # tenant_id (str) → set of WebSocket + self._connections: dict[str, set[WebSocket]] = defaultdict(set) + # Lock per modifiche al dizionario + self._lock = asyncio.Lock() + + async def connect(self, websocket: WebSocket, tenant_id: uuid.UUID) -> None: + """Registra una nuova connessione WS per il tenant.""" + await websocket.accept() + async with self._lock: + self._connections[str(tenant_id)].add(websocket) + logger.info( + "WebSocket connesso", + extra={"tenant_id": str(tenant_id), "total": self._count(tenant_id)}, + ) + + async def disconnect(self, websocket: WebSocket, tenant_id: uuid.UUID) -> None: + """Rimuove una connessione WS dal tenant.""" + async with self._lock: + self._connections[str(tenant_id)].discard(websocket) + if not self._connections[str(tenant_id)]: + del self._connections[str(tenant_id)] + logger.info( + "WebSocket disconnesso", + extra={"tenant_id": str(tenant_id)}, + ) + + async def broadcast_to_tenant( + self, tenant_id: uuid.UUID | str, event: dict + ) -> None: + """ + Invia un evento JSON a tutti i client connessi del tenant. + Le connessioni morte vengono rimosse silenziosamente. + """ + tid = str(tenant_id) + async with self._lock: + connections = set(self._connections.get(tid, set())) + + if not connections: + return + + payload = json.dumps(event, default=str) + dead = set() + + for ws in connections: + try: + await ws.send_text(payload) + except Exception: + dead.add(ws) + + if dead: + async with self._lock: + self._connections[tid] -= dead + + def _count(self, tenant_id: uuid.UUID) -> int: + return len(self._connections.get(str(tenant_id), set())) + + @property + def total_connections(self) -> int: + return sum(len(v) for v in self._connections.values()) + + +# Istanza singleton – importata da main.py e dal Redis listener +manager = ConnectionManager() + + +# ─── Redis subscriber (background task) ────────────────────────────────────── + +async def redis_subscriber_loop(redis_url: str) -> None: + """ + Task asyncio che si sottoscrive al canale Redis ws:* e + fa forward degli eventi ai client WebSocket del tenant corretto. + + Pubblicato dal worker con: + PUBLISH ws:tenant: + """ + import redis.asyncio as aioredis + + logger.info("Redis WS subscriber avviato", extra={"url": redis_url}) + + while True: + try: + client = aioredis.from_url(redis_url, decode_responses=True) + pubsub = client.pubsub() + await pubsub.psubscribe("ws:tenant:*") # pattern subscription + + async for message in pubsub.listen(): + if message["type"] not in ("pmessage", "message"): + continue + + # Estrae tenant_id dal canale: ws:tenant: + channel: str = message.get("channel", "") + if not channel.startswith("ws:tenant:"): + continue + + tenant_id_str = channel.removeprefix("ws:tenant:") + try: + tenant_uuid = uuid.UUID(tenant_id_str) + except ValueError: + continue + + try: + payload = json.loads(message["data"]) + except (json.JSONDecodeError, KeyError): + continue + + await manager.broadcast_to_tenant(tenant_uuid, payload) + + except asyncio.CancelledError: + logger.info("Redis WS subscriber terminato") + return + except Exception as e: + logger.error(f"Redis WS subscriber errore: {e}. Riconnessione in 5s...") + await asyncio.sleep(5) diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 79a389a..a84438d 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -40,6 +40,15 @@ dependencies = [ # Storage MinIO/S3 "miniopy-async>=1.21.0", + # IMAP async (per test connessione nel backend + mailbox service) + "aioimaplib>=2.0.0", + + # Redis (async – per WebSocket pub/sub) + "redis[asyncio]>=5.0.0", + + # WebSocket + "websockets>=12.0", + # Utilities "python-multipart>=0.0.9", # upload file "python-dotenv>=1.0.0", diff --git a/backend/test_integration.db b/backend/test_integration.db new file mode 100644 index 0000000000000000000000000000000000000000..e5eeb31a7dd6a9229e94b8b3814ac1089e1c638f GIT binary patch literal 69632 zcmeI)-)_@Z9Ki7yN#gIysOQp2L9Ddf)vSC*T&G{T8f+bP|j zo3SxzdxX8iv>Tpb(w<<~dy8G}_}B?`nlJ`8$of`l68rqI&*%JpKDMH^{jf0zY_acp zLn{zBGV2*l%X}t8CX>m_-%Ij$`V(%iOgH4acH;B0kNM2U-y9a#{>iMbyv(d>Yk#i& zTKui>NAY>_tHQIwmE4E! z+NynCZEaSrUoRyZ_FTvHgev>A6!!F>Ro|($9*ZxG$6|BPSNm$a&330%tv5Sja%ieE zF~e@n=&+*OtUv4;VsmyD*F;#X+TPo38TI?ku)kO3CDAhO87-q(Guk3*<*TEK-KN+! z8ipKpt=g_tw~a#S>Uvhcb6Lv_Mt%F(KXl|Z-n1rxtG1)F@y)Hs=hD@+H>g~RDzAT6 z%;`7E+V>x+t_J;M)AD);gCol^{lE$){%mv43VH{&zZy?Ty}4~X5(^rH=Re;_Y(||) z6#F3rO$$Uq}9-yO7h%W$l-;3by(F=eI8; z0&u?cEkd#9j(poYvI5t$!&qG!l@Zvh@~vdh`h&4Kwt@rEF&=fopbdA(`RP6NC&MV1 zZ+)Vo@5uIi8HuC74Uadi?)8u4H1zDjk=>UwA+}|jb?Q4t*i~4T=sVo8ux~2}?BO^t zd+uZu2$>wleWR5~uAwC(Dyro#tdIzZ#A!H|ADFi1$>f}#_1N|XZr|MZ+@Y$PI8EUDvgrzg0=9dQdOfL^{T&@xPaY=%CZg+q!u(nObAZ_n*05 z-#oDVD8CXBu|IJfb?Kkw=BXY$*EzA?RFd5feADU$GVjFQ-Q9*!Z6m{* z&GH+CQg}Uk_pB@Bvifa(S=OmIG`G~-`BL`3t5#IiTx&(9;)eEgmNoHhfayDvC-LWV zrxNF6r7&~iZMnqCR;3?4&~p05hW680l{#@@{B|ahKC_046DaI%7UH*VB!f6{M|CRx zuZ$gQbY<4~=G+a}>oXu+F~aM{o_lOBUNXWON^s$V^1g)Z^o~$XOQBRgo7Jlu%R(rg z3v(;}uu?wrCe + -Dgreenmail.setup.test.all + -Dgreenmail.hostname=0.0.0.0 + -Dgreenmail.auth.disabled=false + -Dgreenmail.users=test@example.com:secret + -Dgreenmail.verbose=false + ports: + - "3025:3025" # SMTP + - "3110:3110" # POP3 + - "3143:3143" # IMAP + - "3465:3465" # SMTPS + - "3993:3993" # IMAPS + - "8080:8080" # API REST GreenMail + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/api/service/readiness"] + interval: 10s + timeout: 5s + retries: 5 + profiles: + - greenmail # avviato solo con: docker compose --profile greenmail up + networks: + - pecflow_net + # ─── PgAdmin (solo dev) ────────────────────────────────────────────────────── pgadmin: image: dpage/pgadmin4:latest diff --git a/worker/Dockerfile b/worker/Dockerfile new file mode 100644 index 0000000..b70a5b7 --- /dev/null +++ b/worker/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.12-slim + +# Dipendenze di sistema +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + libpq-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /worker + +# Installa dipendenze Python +COPY pyproject.toml . +RUN pip install --no-cache-dir -e ".[dev]" + +# Copia codice sorgente +COPY . . + +# Healthcheck: verifica che il processo worker sia vivo +HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \ + CMD python -c "import redis; r = redis.Redis.from_url('${REDIS_URL:-redis://redis:6379/0}'); r.ping()" || exit 1 + +# Entrypoint: avvia il worker arq +CMD ["python", "-m", "app.main"] diff --git a/worker/app/__init__.py b/worker/app/__init__.py new file mode 100644 index 0000000..8baafb0 --- /dev/null +++ b/worker/app/__init__.py @@ -0,0 +1 @@ +# Worker PecFlow diff --git a/worker/app/config.py b/worker/app/config.py new file mode 100644 index 0000000..09c4414 --- /dev/null +++ b/worker/app/config.py @@ -0,0 +1,66 @@ +""" +Configurazione worker – legge le stesse variabili d'ambiente del backend. +""" + +from functools import lru_cache + +from pydantic import field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class WorkerSettings(BaseSettings): + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + extra="ignore", + ) + + # ── Ambiente ────────────────────────────────────────────────────────────── + app_env: str = "development" + log_level: str = "INFO" + + # ── Database ────────────────────────────────────────────────────────────── + database_url: str = "postgresql+asyncpg://pecflow:pecflow_dev_password@db:5432/pecflow" + + # ── Redis ───────────────────────────────────────────────────────────────── + redis_url: str = "redis://redis:6379/0" + + # ── MinIO ───────────────────────────────────────────────────────────────── + minio_endpoint: str = "minio:9000" + minio_access_key: str = "minioadmin" + minio_secret_key: str = "minioadmin" + minio_bucket: str = "pecflow" + minio_use_ssl: bool = False + + # ── Cifratura credenziali (ADR-002) ─────────────────────────────────────── + encryption_key: str = "0" * 64 + + # ── Parametri IMAP sync ─────────────────────────────────────────────────── + imap_idle_timeout_seconds: int = 1680 # 28 minuti (RFC 2177 ≤ 29 min) + imap_polling_interval_seconds: int = 60 # polling se IDLE non supportato + imap_max_fetch_per_cycle: int = 50 # max messaggi per ciclo di fetch + imap_max_error_count: int = 5 # errori consecutivi → status=error + imap_connect_timeout_seconds: int = 30 # timeout connessione iniziale + + # ── Backoff esponenziale ────────────────────────────────────────────────── + backoff_initial_seconds: float = 1.0 + backoff_multiplier: float = 2.0 + backoff_max_seconds: float = 300.0 # 5 minuti massimo + + @field_validator("encryption_key") + @classmethod + def validate_encryption_key(cls, v: str) -> str: + if len(v) != 64: + raise ValueError("ENCRYPTION_KEY deve essere 64 caratteri hex") + bytes.fromhex(v) + return v + + @property + def encryption_key_bytes(self) -> bytes: + return bytes.fromhex(self.encryption_key) + + +@lru_cache +def get_settings() -> WorkerSettings: + return WorkerSettings() diff --git a/worker/app/database.py b/worker/app/database.py new file mode 100644 index 0000000..d112891 --- /dev/null +++ b/worker/app/database.py @@ -0,0 +1,35 @@ +""" +Connessione database per il worker – usa SQLAlchemy async (stesso stack del backend). +""" + +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase + +from app.config import get_settings + +settings = get_settings() + +engine = create_async_engine( + settings.database_url, + echo=False, + pool_size=5, + max_overflow=10, + pool_pre_ping=True, +) + +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, + autocommit=False, +) + + +class Base(DeclarativeBase): + pass + + +async def get_db_session() -> AsyncSession: + """Restituisce una nuova sessione DB – da usare come context manager.""" + return AsyncSessionLocal() diff --git a/worker/app/imap/__init__.py b/worker/app/imap/__init__.py new file mode 100644 index 0000000..2bdf880 --- /dev/null +++ b/worker/app/imap/__init__.py @@ -0,0 +1 @@ +# IMAP sync engine diff --git a/worker/app/imap/connection.py b/worker/app/imap/connection.py new file mode 100644 index 0000000..5c8186f --- /dev/null +++ b/worker/app/imap/connection.py @@ -0,0 +1,395 @@ +""" +IMAPConnection – gestione singola connessione IMAP con: + - IDLE con heartbeat 28 min (RFC 2177) + - Polling fallback ogni 60s se IDLE non supportato + - Backoff esponenziale su disconnessione + - Aggiornamento stato mailbox su N errori consecutivi + +Architettura (ADR-003): + Un asyncio.Task per casella → overhead minimo, migliaia di caselle per host. +""" + +import asyncio +import base64 +import logging +from datetime import UTC, datetime + +import aioimaplib +import redis.asyncio as aioredis +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.database import AsyncSessionLocal +from app.imap.reconnect import ExponentialBackoff +from app.imap.sync import sync_new_messages +from app.models import Mailbox + +logger = logging.getLogger(__name__) +settings = get_settings() + + +def _decrypt(enc: str) -> str: + """Decifra un campo credenziale AES-256-GCM.""" + import os + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + + key = settings.encryption_key_bytes + aesgcm = AESGCM(key) + raw = base64.b64decode(enc.encode("ascii")) + nonce = raw[:12] + ciphertext_with_tag = raw[12:] + return aesgcm.decrypt(nonce, ciphertext_with_tag, None).decode("utf-8") + + +class IMAPConnection: + """ + Gestisce la connessione IMAP di una singola casella PEC. + + Ciclo di vita: + 1. Connessione IMAP (SSL o plain) + 2. Login + 3. SELECT INBOX + 4. Sync iniziale (tutti i messaggi nuovi dall'ultimo UID noto) + 5. IDLE loop (o polling se IDLE non disponibile) + 6. Su EXISTS/EXPUNGE notify → fetch nuovi messaggi + 7. Su errore → backoff → torna al punto 1 + 8. Su N errori consecutivi → imposta mailbox.status=error + """ + + def __init__( + self, + mailbox_id: str, + redis_client: aioredis.Redis, + ) -> None: + self.mailbox_id = mailbox_id + self.redis = redis_client + self._running = False + self._client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL | None = None + + async def run(self) -> None: + """ + Loop principale della connessione IMAP. + Questo metodo non solleva mai eccezioni; gestisce internamente tutti gli errori. + """ + self._running = True + backoff = ExponentialBackoff(label=f"mailbox:{self.mailbox_id[:8]}") + + while self._running: + try: + async with AsyncSessionLocal() as db: + # Carica mailbox dal DB + mailbox = await db.get(Mailbox, self.mailbox_id) + if not mailbox: + logger.error( + f"[{self.mailbox_id}] Mailbox non trovata in DB. Task terminato." + ) + return + + if mailbox.status in ("deleted", "paused"): + logger.info( + f"[{mailbox.email_address}] Status={mailbox.status}, task in pausa." + ) + await asyncio.sleep(60) + continue + + # Decifra credenziali + creds = self._decrypt_creds(mailbox) + + # Connetti e sincronizza + await self._connect_and_run(mailbox, creds, db, backoff) + + except asyncio.CancelledError: + logger.info(f"[{self.mailbox_id}] Task IMAP cancellato.") + self._running = False + return + except Exception as e: + logger.error( + f"[{self.mailbox_id}] Errore inatteso nel loop IMAP: {e}", + exc_info=True, + ) + await backoff.wait(e) + + def stop(self) -> None: + """Segnala al loop di terminare al prossimo ciclo.""" + self._running = False + + # ─── Connessione e loop interno ────────────────────────────────────────── + + async def _connect_and_run( + self, + mailbox: Mailbox, + creds: dict, + db: AsyncSession, + backoff: ExponentialBackoff, + ) -> None: + """ + Tenta la connessione IMAP. Se riesce, avvia il loop IDLE/polling. + Se fallisce, incrementa il contatore errori e aggiorna lo stato mailbox. + """ + try: + client = await self._connect(creds) + self._client = client + except asyncio.TimeoutError: + err_msg = f"Timeout connessione IMAP ({settings.imap_connect_timeout_seconds}s)" + await self._record_error(mailbox, db, err_msg) + await backoff.wait(TimeoutError(err_msg)) + return + except Exception as e: + err_msg = str(e) + await self._record_error(mailbox, db, err_msg) + await backoff.wait(e) + return + + # Connessione riuscita + backoff.reset() + await self._reset_error_state(mailbox, db) + + # Sync iniziale: porta il DB aggiornato fino all'ultimo UID disponibile + logger.info(f"[{mailbox.email_address}] Sync iniziale...") + try: + n = await sync_new_messages(self._client, mailbox, db, self.redis) + if n > 0: + logger.info( + f"[{mailbox.email_address}] Sync iniziale completata: {n} messaggi nuovi" + ) + except Exception as e: + logger.error( + f"[{mailbox.email_address}] Errore sync iniziale: {e}", exc_info=True + ) + + # Avvia IDLE o polling + supports_idle = self._supports_idle(client) + if supports_idle: + logger.info(f"[{mailbox.email_address}] Avvio IDLE loop") + await self._idle_loop(mailbox, db) + else: + logger.info( + f"[{mailbox.email_address}] IDLE non supportato, avvio polling " + f"ogni {settings.imap_polling_interval_seconds}s" + ) + await self._polling_loop(mailbox, db) + + async def _connect( + self, creds: dict + ) -> aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL: + """Connette al server IMAP e fa login. Solleva eccezione su errore.""" + host = creds["host"] + port = creds["port"] + user = creds["user"] + password = creds["password"] + use_ssl = creds["use_ssl"] + + logger.info(f"Connessione IMAP {user}@{host}:{port} ssl={use_ssl}") + + if use_ssl: + client = aioimaplib.IMAP4_SSL( + host=host, + port=port, + timeout=settings.imap_connect_timeout_seconds, + ) + else: + client = aioimaplib.IMAP4( + host=host, + port=port, + timeout=settings.imap_connect_timeout_seconds, + ) + + await asyncio.wait_for( + client.wait_hello_from_server(), + timeout=settings.imap_connect_timeout_seconds, + ) + + status, _ = await client.login(user, password) + if status != "OK": + await client.logout() + raise ConnectionError(f"Login IMAP fallito: status={status}") + + status, _ = await client.select("INBOX") + if status != "OK": + await client.logout() + raise ConnectionError(f"SELECT INBOX fallito: status={status}") + + logger.info(f"IMAP connesso: {user}@{host}:{port}") + return client + + def _supports_idle( + self, client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL + ) -> bool: + """Verifica se il server supporta IDLE.""" + try: + caps = client.protocol.capabilities + return "IDLE" in caps + except Exception: + return False + + # ─── IDLE loop ──────────────────────────────────────────────────────────── + + async def _idle_loop(self, mailbox: Mailbox, db: AsyncSession) -> None: + """ + Loop IMAP IDLE con heartbeat ogni 28 minuti (RFC 2177). + + Quando il server segnala EXISTS (nuovi messaggi) → sync. + Ogni 28 minuti → DONE + re-IDLE per mantenere connessione viva. + """ + client = self._client + idle_timeout = settings.imap_idle_timeout_seconds # 28 min + + while self._running: + try: + # Avvia IDLE + await client.idle_start(timeout=idle_timeout) + + # Attendi server push con timeout (heartbeat) + try: + server_push = await asyncio.wait_for( + client.wait_server_push(), + timeout=float(idle_timeout), + ) + except asyncio.TimeoutError: + # Heartbeat: nessun nuovo messaggio in 28 minuti → re-IDLE + server_push = [] + + # Termina IDLE + await client.idle_done() + + # Controlla se ci sono nuovi messaggi (EXISTS) + has_new = any( + b"EXISTS" in (line if isinstance(line, bytes) else line.encode()) + for line in server_push + if line + ) + + if has_new: + logger.debug( + f"[{mailbox.email_address}] EXISTS ricevuto, sync..." + ) + # Ricarica mailbox dal DB per avere last_sync_uid aggiornato + await db.refresh(mailbox) + n = await sync_new_messages(client, mailbox, db, self.redis) + if n > 0: + logger.info( + f"[{mailbox.email_address}] {n} nuovi messaggi sincronizzati" + ) + + except asyncio.CancelledError: + try: + await client.idle_done() + except Exception: + pass + return + except (ConnectionError, IOError, OSError) as e: + logger.warning( + f"[{mailbox.email_address}] Connessione IDLE persa: {e}" + ) + raise # propaga al loop esterno per backoff + except Exception as e: + logger.error( + f"[{mailbox.email_address}] Errore IDLE loop: {e}", exc_info=True + ) + raise + + # ─── Polling loop ───────────────────────────────────────────────────────── + + async def _polling_loop(self, mailbox: Mailbox, db: AsyncSession) -> None: + """ + Polling IMAP ogni N secondi quando IDLE non è supportato. + Esegue NOOP + SEARCH UID per verificare nuovi messaggi. + """ + client = self._client + interval = settings.imap_polling_interval_seconds + + while self._running: + try: + await asyncio.sleep(interval) + + if not self._running: + break + + # NOOP per mantenere connessione viva + try: + await client.noop() + except Exception: + raise ConnectionError("Connessione IMAP persa durante NOOP") + + # Ricarica mailbox e controlla nuovi UID + await db.refresh(mailbox) + n = await sync_new_messages(client, mailbox, db, self.redis) + if n > 0: + logger.info( + f"[{mailbox.email_address}] Polling: {n} nuovi messaggi" + ) + + except asyncio.CancelledError: + return + except (ConnectionError, IOError, OSError) as e: + logger.warning( + f"[{mailbox.email_address}] Connessione polling persa: {e}" + ) + raise + except Exception as e: + logger.error( + f"[{mailbox.email_address}] Errore polling loop: {e}", exc_info=True + ) + raise + + # ─── Error management ───────────────────────────────────────────────────── + + async def _record_error( + self, mailbox: Mailbox, db: AsyncSession, error_msg: str + ) -> None: + """ + Incrementa sync_error_count. Se supera il limite → status=error. + Pubblica evento Redis di errore. + """ + import json + + mailbox.sync_error_count += 1 + mailbox.sync_error_msg = error_msg[:500] + + if mailbox.sync_error_count >= settings.imap_max_error_count: + if mailbox.status != "error": + mailbox.status = "error" + logger.error( + f"[{mailbox.email_address}] Troppe anomalie " + f"({mailbox.sync_error_count}), status=error" + ) + # Pubblica evento WebSocket di errore + try: + event = { + "type": "mailbox:sync_error", + "mailbox_id": str(mailbox.id), + "error": error_msg, + "status": "error", + } + channel = f"ws:tenant:{mailbox.tenant_id}" + await self.redis.publish(channel, json.dumps(event)) + except Exception: + pass + + await db.flush() + await db.commit() + + async def _reset_error_state( + self, mailbox: Mailbox, db: AsyncSession + ) -> None: + """Resetta il contatore errori dopo una connessione riuscita.""" + if mailbox.sync_error_count > 0 or mailbox.status == "error": + mailbox.sync_error_count = 0 + mailbox.sync_error_msg = None + if mailbox.status == "error": + mailbox.status = "active" + await db.flush() + await db.commit() + + # ─── Decrypt credentials ────────────────────────────────────────────────── + + @staticmethod + def _decrypt_creds(mailbox: Mailbox) -> dict: + """Decifra le credenziali IMAP dalla mailbox.""" + return { + "host": _decrypt(mailbox.imap_host_enc), + "port": int(_decrypt(mailbox.imap_port_enc)), + "user": _decrypt(mailbox.imap_user_enc), + "password": _decrypt(mailbox.imap_pass_enc), + "use_ssl": mailbox.imap_use_ssl, + } diff --git a/worker/app/imap/pool.py b/worker/app/imap/pool.py new file mode 100644 index 0000000..9703dbd --- /dev/null +++ b/worker/app/imap/pool.py @@ -0,0 +1,275 @@ +""" +MailboxPool – orchestratore async per N caselle IMAP in parallelo. + +All'avvio del worker: + 1. Carica tutte le mailbox con status='active' dal DB + 2. Avvia un asyncio.Task per ogni casella (IMAPConnection.run) + 3. Monitora i task: se uno muore, lo riavvia dopo un breve delay + 4. Osserva eventi Redis per caselle aggiunte/rimosse/aggiornate a runtime + +ADR-003: Un task async per casella – overhead < 10MB per casella. +""" + +import asyncio +import json +import logging +import uuid +from typing import Any + +import redis.asyncio as aioredis +from sqlalchemy import select + +from app.config import get_settings +from app.database import AsyncSessionLocal +from app.imap.connection import IMAPConnection +from app.models import Mailbox + +logger = logging.getLogger(__name__) +settings = get_settings() + +# Canale Redis per eventi di gestione caselle +MAILBOX_EVENTS_CHANNEL = "mailbox:events" + + +class MailboxPool: + """ + Gestisce il pool di task IMAP asincroni. + + Un task per casella, monitorato e riavviato automaticamente in caso di crash. + """ + + def __init__(self, redis_client: aioredis.Redis) -> None: + self.redis = redis_client + # mailbox_id (str) → asyncio.Task + self._tasks: dict[str, asyncio.Task] = {} + # mailbox_id (str) → IMAPConnection + self._connections: dict[str, IMAPConnection] = {} + self._running = False + self._monitor_task: asyncio.Task | None = None + self._events_task: asyncio.Task | None = None + + # ─── Lifecycle ──────────────────────────────────────────────────────────── + + async def start(self) -> None: + """ + Carica le mailbox attive dal DB e avvia i task IMAP. + Da chiamare nell'on_startup del worker arq. + """ + self._running = True + logger.info("MailboxPool: avvio in corso...") + + mailbox_ids = await self._load_active_mailbox_ids() + logger.info(f"MailboxPool: {len(mailbox_ids)} caselle attive trovate") + + for mid in mailbox_ids: + await self._start_task(mid) + + # Task di monitoraggio: riavvia task morti + self._monitor_task = asyncio.create_task( + self._monitor_loop(), name="mailbox-pool-monitor" + ) + + # Task listener Redis: gestisce caselle aggiunte/rimosse a runtime + self._events_task = asyncio.create_task( + self._events_listener(), name="mailbox-pool-events" + ) + + logger.info( + f"MailboxPool avviato: {len(self._tasks)} task IMAP in esecuzione" + ) + + async def stop(self) -> None: + """ + Ferma tutti i task IMAP. + Da chiamare nell'on_shutdown del worker arq. + """ + logger.info("MailboxPool: arresto in corso...") + self._running = False + + # Cancella task di sistema + for task in [self._monitor_task, self._events_task]: + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Ferma tutte le connessioni IMAP + for conn in self._connections.values(): + conn.stop() + + # Cancella i task + if self._tasks: + await asyncio.gather( + *[t for t in self._tasks.values() if not t.done()], + return_exceptions=True, + ) + + self._tasks.clear() + self._connections.clear() + logger.info("MailboxPool: tutti i task fermati") + + # ─── Gestione task individuali ──────────────────────────────────────────── + + async def add_mailbox(self, mailbox_id: str) -> None: + """Aggiunge e avvia una nuova casella al pool a runtime.""" + if mailbox_id in self._tasks and not self._tasks[mailbox_id].done(): + logger.debug(f"MailboxPool: {mailbox_id[:8]} già nel pool") + return + await self._start_task(mailbox_id) + logger.info(f"MailboxPool: casella aggiunta {mailbox_id[:8]}") + + async def remove_mailbox(self, mailbox_id: str) -> None: + """Ferma e rimuove una casella dal pool a runtime.""" + if mailbox_id in self._connections: + self._connections[mailbox_id].stop() + + if mailbox_id in self._tasks: + task = self._tasks[mailbox_id] + if not task.done(): + task.cancel() + try: + await asyncio.wait_for(task, timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + del self._tasks[mailbox_id] + + if mailbox_id in self._connections: + del self._connections[mailbox_id] + + logger.info(f"MailboxPool: casella rimossa {mailbox_id[:8]}") + + @property + def active_count(self) -> int: + """Numero di task IMAP attivi.""" + return sum(1 for t in self._tasks.values() if not t.done()) + + # ─── Loop di monitoraggio ───────────────────────────────────────────────── + + async def _monitor_loop(self) -> None: + """ + Ogni 30 secondi verifica i task morti e li riavvia. + Rimuove anche task di caselle che non esistono più nel DB. + """ + while self._running: + try: + await asyncio.sleep(30) + + dead_ids = [ + mid for mid, task in self._tasks.items() + if task.done() and not task.cancelled() + ] + + if dead_ids: + logger.info( + f"MailboxPool monitor: {len(dead_ids)} task morti, riavvio..." + ) + active_ids = await self._load_active_mailbox_ids() + active_set = {str(mid) for mid in active_ids} + + for mid in dead_ids: + if mid in active_set: + logger.info( + f"MailboxPool: riavvio task {mid[:8]}..." + ) + await self._start_task(mid) + else: + # Casella non più attiva, rimuovi dal pool + logger.info( + f"MailboxPool: casella {mid[:8]} non più attiva, rimossa" + ) + self._tasks.pop(mid, None) + self._connections.pop(mid, None) + + # Aggiungi caselle nuove attive + active_ids = await self._load_active_mailbox_ids() + for mid in active_ids: + mid_str = str(mid) + if mid_str not in self._tasks or self._tasks[mid_str].done(): + logger.info( + f"MailboxPool: rilevata nuova casella attiva {mid_str[:8]}" + ) + await self._start_task(mid_str) + + except asyncio.CancelledError: + return + except Exception as e: + logger.error(f"MailboxPool monitor errore: {e}", exc_info=True) + + # ─── Listener eventi Redis ───────────────────────────────────────────────── + + async def _events_listener(self) -> None: + """ + Ascolta eventi Redis per aggiungere/rimuovere caselle a runtime. + + Formato evento: {"action": "add"|"remove"|"refresh", "mailbox_id": ""} + Pubblicare con: PUBLISH mailbox:events '{"action":"add","mailbox_id":""}' + """ + pubsub = self.redis.pubsub() + await pubsub.subscribe(MAILBOX_EVENTS_CHANNEL) + + logger.debug(f"MailboxPool: in ascolto su Redis {MAILBOX_EVENTS_CHANNEL}") + + try: + async for message in pubsub.listen(): + if not self._running: + break + + if message["type"] != "message": + continue + + try: + data = json.loads(message["data"]) + action = data.get("action") + mid = data.get("mailbox_id") + + if not action or not mid: + continue + + if action == "add": + await self.add_mailbox(mid) + elif action == "remove": + await self.remove_mailbox(mid) + elif action == "refresh": + # Rimuovi e riavvia (utile dopo cambio credenziali) + await self.remove_mailbox(mid) + await asyncio.sleep(1) + await self.add_mailbox(mid) + + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"MailboxPool: evento Redis malformato: {e}") + + except asyncio.CancelledError: + await pubsub.unsubscribe(MAILBOX_EVENTS_CHANNEL) + return + except Exception as e: + logger.error(f"MailboxPool events listener errore: {e}", exc_info=True) + + # ─── Private ───────────────────────────────────────────────────────────── + + async def _start_task(self, mailbox_id: str) -> None: + """Crea e avvia un nuovo task IMAPConnection per la casella.""" + conn = IMAPConnection( + mailbox_id=mailbox_id, + redis_client=self.redis, + ) + self._connections[mailbox_id] = conn + + task = asyncio.create_task( + conn.run(), + name=f"imap-{mailbox_id[:8]}", + ) + self._tasks[mailbox_id] = task + + async def _load_active_mailbox_ids(self) -> list[str]: + """Carica dal DB gli UUID di tutte le caselle con status=active.""" + try: + async with AsyncSessionLocal() as db: + result = await db.execute( + select(Mailbox.id).where(Mailbox.status == "active") + ) + return [str(row[0]) for row in result.all()] + except Exception as e: + logger.error(f"MailboxPool: errore caricamento caselle: {e}") + return [] diff --git a/worker/app/imap/reconnect.py b/worker/app/imap/reconnect.py new file mode 100644 index 0000000..1dd6b72 --- /dev/null +++ b/worker/app/imap/reconnect.py @@ -0,0 +1,81 @@ +""" +Strategia backoff esponenziale per riconnessioni IMAP. + +Parametri configurabili in WorkerSettings: + backoff_initial_seconds = 1.0 (primo wait) + backoff_multiplier = 2.0 (moltiplicatore) + backoff_max_seconds = 300.0 (tetto massimo: 5 minuti) + +Sequenza di attesa: 1s → 2s → 4s → 8s → 16s → 32s → 64s → 128s → 256s → 300s → 300s → ... +""" + +import asyncio +import logging +import random + +from app.config import get_settings + +logger = logging.getLogger(__name__) +settings = get_settings() + + +class ExponentialBackoff: + """ + Gestisce il backoff esponenziale con jitter opzionale. + + Uso: + backoff = ExponentialBackoff(label="casella@pec.it") + while True: + try: + await connect() + backoff.reset() # connessione riuscita → resetta backoff + await run_loop() + except Exception as e: + await backoff.wait(e) # attende prima di ritentare + """ + + def __init__(self, label: str = "", jitter: bool = True) -> None: + self.label = label + self.jitter = jitter + self._attempt = 0 + self._current_wait = settings.backoff_initial_seconds + + def reset(self) -> None: + """Resetta il backoff dopo una connessione riuscita.""" + if self._attempt > 0: + logger.info( + f"[{self.label}] Connessione ristabilita dopo {self._attempt} tentativi" + ) + self._attempt = 0 + self._current_wait = settings.backoff_initial_seconds + + async def wait(self, error: Exception | None = None) -> None: + """ + Attende il tempo calcolato dal backoff, poi incrementa per il prossimo ciclo. + Aggiunge jitter ±10% per evitare thundering herd su N caselle. + """ + self._attempt += 1 + wait_time = min(self._current_wait, settings.backoff_max_seconds) + + if self.jitter: + jitter_range = wait_time * 0.1 + wait_time += random.uniform(-jitter_range, jitter_range) + wait_time = max(0.5, wait_time) + + logger.warning( + f"[{self.label}] Tentativo {self._attempt} fallito" + f"{f': {error}' if error else ''}. " + f"Riconnessione in {wait_time:.1f}s" + ) + + await asyncio.sleep(wait_time) + + # Incrementa per il prossimo tentativo + self._current_wait = min( + self._current_wait * settings.backoff_multiplier, + settings.backoff_max_seconds, + ) + + @property + def attempt(self) -> int: + return self._attempt diff --git a/worker/app/imap/sync.py b/worker/app/imap/sync.py new file mode 100644 index 0000000..80a2aab --- /dev/null +++ b/worker/app/imap/sync.py @@ -0,0 +1,473 @@ +""" +Logica di sincronizzazione messaggi IMAP. + +Responsabilità: + 1. Fetch della lista UID > last_sync_uid + 2. Download envelope + raw EML per ogni UID + 3. Parsing base degli header (subject, from, to, date) + 4. Salvataggio in tabella messages + 5. Upload raw EML su MinIO + 6. Aggiornamento last_sync_uid e last_sync_at sulla mailbox + 7. Pubblicazione evento Redis per notifica WebSocket +""" + +import email +import email.header +import email.utils +import hashlib +import json +import logging +import re +import uuid +from datetime import UTC, datetime + +import aioimaplib +import redis.asyncio as aioredis +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import get_settings +from app.models import Mailbox, Message +from app.storage.minio_client import upload_eml + +logger = logging.getLogger(__name__) +settings = get_settings() + + +# ─── Helper: decodifica header email ───────────────────────────────────────── + +def _decode_header(header_value: str | None) -> str | None: + """Decodifica header RFC 2047 (es. =?utf-8?b?...?=) in stringa Python.""" + if not header_value: + return None + try: + parts = email.header.decode_header(header_value) + decoded = [] + for part, charset in parts: + if isinstance(part, bytes): + decoded.append(part.decode(charset or "utf-8", errors="replace")) + else: + decoded.append(part) + return "".join(decoded).strip() + except Exception: + return str(header_value) + + +def _extract_addresses(field: str | None) -> list[str]: + """Estrae lista di indirizzi email da un campo To/Cc.""" + if not field: + return [] + try: + addresses = email.utils.getaddresses([field]) + return [addr for _, addr in addresses if addr] + except Exception: + return [] + + +def _parse_date(date_str: str | None) -> datetime | None: + """Converte stringa data RFC 2822 in datetime con timezone.""" + if not date_str: + return None + try: + parsed = email.utils.parsedate_to_datetime(date_str) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + return parsed + except Exception: + return None + + +def _classify_pec_type(msg: email.message.Message) -> str: + """ + Classifica il tipo PEC dal header X-Ricevuta / X-TipoRicevuta. + Fase 3 fa il parsing completo; qui classifichiamo al meglio possibile. + """ + x_ricevuta = msg.get("X-Ricevuta", "").lower() + x_tipo = msg.get("X-TipoRicevuta", "").lower() + + TYPE_MAP = { + "accettazione": "accettazione", + "non-accettazione": "non_accettazione", + "presa-in-carico": "presa_in_carico", + "avvenuta-consegna": "avvenuta_consegna", + "mancata-consegna": "mancata_consegna", + "errore-consegna": "errore_consegna", + "preavviso-mancata-consegna": "preavviso_mancata_consegna", + "rilevazione-virus": "rilevazione_virus", + } + + value = x_tipo or x_ricevuta + return TYPE_MAP.get(value, "posta_certificata") + + +def _parse_eml(raw_bytes: bytes) -> dict: + """ + Parsing di base di un EML – estrae i campi necessari per la tabella messages. + Il parsing completo (body, allegati, EML-in-EML) è in Fase 3. + """ + try: + msg = email.message_from_bytes(raw_bytes) + except Exception as e: + logger.warning(f"Errore parsing EML: {e}") + return {} + + subject = _decode_header(msg.get("Subject")) + from_addr = email.utils.parseaddr(msg.get("From", ""))[1] or None + to_addrs = _extract_addresses(msg.get("To")) + cc_addrs = _extract_addresses(msg.get("Cc")) + message_id = msg.get("Message-ID", "").strip() or None + date = _parse_date(msg.get("Date")) + pec_type = _classify_pec_type(msg) + + # Estrazione body text/html (best-effort – Fase 3 fa il parsing completo) + body_text = None + body_html = None + has_attachments = False + + if msg.is_multipart(): + for part in msg.walk(): + ct = part.get_content_type() + disp = part.get("Content-Disposition", "") + if "attachment" in disp or "inline" in disp: + if part.get_filename(): + has_attachments = True + elif ct == "text/plain" and body_text is None: + try: + charset = part.get_content_charset() or "utf-8" + body_text = part.get_payload(decode=True).decode(charset, errors="replace") + except Exception: + pass + elif ct == "text/html" and body_html is None: + try: + charset = part.get_content_charset() or "utf-8" + body_html = part.get_payload(decode=True).decode(charset, errors="replace") + except Exception: + pass + else: + ct = msg.get_content_type() + try: + charset = msg.get_content_charset() or "utf-8" + payload = msg.get_payload(decode=True) + if payload: + if ct == "text/plain": + body_text = payload.decode(charset, errors="replace") + elif ct == "text/html": + body_html = payload.decode(charset, errors="replace") + except Exception: + pass + + return { + "subject": subject, + "from_address": from_addr, + "to_addresses": to_addrs if to_addrs else None, + "cc_addresses": cc_addrs if cc_addrs else None, + "message_id_header": message_id, + "sent_at": date, + "pec_type": pec_type, + "body_text": body_text, + "body_html": body_html, + "has_attachments": has_attachments, + } + + +# ─── Core sync function ─────────────────────────────────────────────────────── + +async def sync_new_messages( + imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, + mailbox: Mailbox, + db: AsyncSession, + redis_client: aioredis.Redis, +) -> int: + """ + Sincronizza i messaggi nuovi (UID > last_sync_uid) per la mailbox data. + + Returns: + Numero di nuovi messaggi sincronizzati. + """ + last_uid = mailbox.last_sync_uid or 0 + search_range = f"{last_uid + 1}:*" + + # ── SEARCH UID > last_sync_uid ───────────────────────────────────────────── + # aioimaplib non supporta uid('SEARCH',...) → usare search('UID', range) + # che invia "SEARCH UID n:*" e restituisce numeri di sequenza + try: + status, search_data = await imap_client.search("UID", search_range) + except Exception as e: + logger.warning(f"[{mailbox.email_address}] SEARCH fallito: {e}") + return 0 + + if status != "OK": + logger.warning( + f"[{mailbox.email_address}] SEARCH status={status} data={search_data}" + ) + return 0 + + # search() restituisce numeri di sequenza (non UID) + raw_seqs = b" ".join( + d if isinstance(d, bytes) else d.encode() for d in search_data + ).decode("ascii", errors="ignore").split() + + seq_numbers = [s for s in raw_seqs if s.isdigit()] + if not seq_numbers: + return 0 + + # Limita il numero di fetch per ciclo + seq_numbers = seq_numbers[: settings.imap_max_fetch_per_cycle] + logger.info( + f"[{mailbox.email_address}] Trovati {len(seq_numbers)} messaggi nuovi da sincronizzare" + ) + + synced_count = 0 + max_uid_synced = last_uid + + for seq in seq_numbers: + try: + uid, synced = await _fetch_and_save_message_by_seq( + imap_client=imap_client, + seq=seq, + last_uid=last_uid, + mailbox=mailbox, + db=db, + redis_client=redis_client, + ) + if synced and uid and uid > max_uid_synced: + synced_count += 1 + max_uid_synced = uid + except Exception as e: + logger.error( + f"[{mailbox.email_address}] Errore fetch seq {seq}: {e}", + exc_info=True, + ) + + # Aggiorna last_sync_uid e last_sync_at + if max_uid_synced > last_uid: + mailbox.last_sync_uid = max_uid_synced + mailbox.last_sync_at = datetime.now(UTC) + await db.flush() + await db.commit() + + return synced_count + + +async def _fetch_and_save_message_by_seq( + imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, + seq: str, + last_uid: int, + mailbox: Mailbox, + db: AsyncSession, + redis_client: aioredis.Redis, +) -> tuple[int | None, bool]: + """ + Fetcha un singolo messaggio per NUMERO DI SEQUENZA (non UID). + Include UID nella richiesta FETCH per estrarlo dalla risposta. + + Returns: + (uid, saved): UID del messaggio e True se salvato, False altrimenti. + """ + # FETCH seq (UID RFC822 RFC822.SIZE) + try: + status, fetch_data = await imap_client.fetch(seq, "(UID RFC822 RFC822.SIZE)") + except Exception as e: + logger.error(f"[{mailbox.email_address}] FETCH seq {seq} fallito: {e}") + return None, False + + if status != "OK" or not fetch_data: + logger.warning( + f"[{mailbox.email_address}] FETCH seq {seq} risposta vuota: {status}" + ) + return None, False + + # Debug: mostra la struttura di fetch_data + items_info = [(type(x).__name__, len(x) if isinstance(x, (bytes, str)) else str(x)) for x in fetch_data] + logger.debug(f"[{mailbox.email_address}] fetch_data seq {seq}: {items_info}") + + # Estrae UID, raw EML e size dalla risposta. + # NOTA CRITICA: aioimaplib restituisce il corpo EML come `bytearray` (non `bytes`)! + # [0] bytes → FETCH response header con UID e RFC822.SIZE + # [1] bytearray → raw EML (il corpo del messaggio) + # [2] bytes → ')' (chiusura) + # [3] bytes → riga OK finale + uid: int | None = None + raw_eml: bytes | None = None + size_bytes: int | None = None + + for item in fetch_data: + if isinstance(item, bytearray): + # Questo è il corpo del messaggio EML + if len(item) > 200: + raw_eml = bytes(item) + elif isinstance(item, bytes): + # Risposta header – estrae UID e RFC822.SIZE + item_str = item.decode("ascii", errors="ignore") + uid_match = re.search(r"UID\s+(\d+)", item_str) + if uid_match: + uid = int(uid_match.group(1)) + size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item_str) + if size_match: + size_bytes = int(size_match.group(1)) + elif isinstance(item, str): + uid_match = re.search(r"UID\s+(\d+)", item) + if uid_match: + uid = int(uid_match.group(1)) + size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item) + if size_match: + size_bytes = int(size_match.group(1)) + + if uid is None or uid <= last_uid: + # Questo messaggio ha un UID <= last_uid, non va sincronizzato + return uid, False + + if not raw_eml: + logger.warning(f"[{mailbox.email_address}] seq {seq} UID {uid}: body mancante") + return uid, False + + if size_bytes is None: + size_bytes = len(raw_eml) + + return uid, await _save_message( + uid=uid, + raw_eml=raw_eml, + size_bytes=size_bytes, + mailbox=mailbox, + db=db, + redis_client=redis_client, + ) + + +async def _fetch_and_save_message( + imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, + uid: int, + mailbox: Mailbox, + db: AsyncSession, + redis_client: aioredis.Redis, +) -> bool: + """ + Fetcha un singolo messaggio per UID (usato dal job sync_mailbox one-shot). + Usa UID FETCH (aioimaplib uid() method). + """ + existing = await db.execute( + select(Message.id).where( + Message.mailbox_id == mailbox.id, + Message.imap_uid == uid, + ) + ) + if existing.scalar_one_or_none(): + return False + + try: + status, fetch_data = await imap_client.uid("FETCH", str(uid), "(RFC822 RFC822.SIZE)") + except Exception as e: + logger.error(f"[{mailbox.email_address}] UID FETCH {uid} fallito: {e}") + return False + + if status != "OK" or not fetch_data: + return False + + raw_eml: bytes | None = None + size_bytes: int | None = None + for item in fetch_data: + if isinstance(item, bytes) and len(item) > 100: + raw_eml = item + elif isinstance(item, (bytes, str)): + s = item.decode("ascii", errors="ignore") if isinstance(item, bytes) else item + m = re.search(r"RFC822\.SIZE\s+(\d+)", s) + if m: + size_bytes = int(m.group(1)) + + if not raw_eml: + return False + + return await _save_message( + uid=uid, + raw_eml=raw_eml, + size_bytes=size_bytes or len(raw_eml), + mailbox=mailbox, + db=db, + redis_client=redis_client, + ) + + +async def _save_message( + uid: int, + raw_eml: bytes, + size_bytes: int, + mailbox: Mailbox, + db: AsyncSession, + redis_client: aioredis.Redis, +) -> bool: + """ + Salva un messaggio EML in DB e su MinIO. Pubblica evento WebSocket. + """ + # Idempotenza + existing = await db.execute( + select(Message.id).where( + Message.mailbox_id == mailbox.id, + Message.imap_uid == uid, + ) + ) + if existing.scalar_one_or_none(): + logger.debug(f"[{mailbox.email_address}] UID {uid} già in DB, skip") + return False + + parsed = _parse_eml(raw_eml) + received_at = datetime.now(UTC) + + # Upload su MinIO + eml_path: str | None = None + try: + eml_path = await upload_eml( + tenant_id=str(mailbox.tenant_id), + mailbox_id=str(mailbox.id), + uid=uid, + eml_bytes=raw_eml, + ) + except Exception as e: + logger.error(f"[{mailbox.email_address}] Upload MinIO UID {uid}: {e}") + + # Salva in DB + message = Message( + id=uuid.uuid4(), + tenant_id=mailbox.tenant_id, + mailbox_id=mailbox.id, + imap_uid=uid, + imap_folder="INBOX", + direction="inbound", + state="received", + pec_type=parsed.get("pec_type", "posta_certificata"), + subject=parsed.get("subject"), + from_address=parsed.get("from_address"), + to_addresses=parsed.get("to_addresses"), + cc_addresses=parsed.get("cc_addresses"), + message_id_header=parsed.get("message_id_header"), + sent_at=parsed.get("sent_at"), + received_at=received_at, + size_bytes=size_bytes, + body_text=parsed.get("body_text"), + body_html=parsed.get("body_html"), + has_attachments=parsed.get("has_attachments", False), + raw_eml_path=eml_path, + is_read=False, + ) + db.add(message) + await db.flush() + + # Pubblica evento Redis per WebSocket + try: + event = { + "type": "mailbox:new_message", + "mailbox_id": str(mailbox.id), + "message_id": str(message.id), + "subject": message.subject or "", + "from_address": message.from_address or "", + "pec_type": message.pec_type, + "received_at": received_at.isoformat(), + } + await redis_client.publish(f"ws:tenant:{mailbox.tenant_id}", json.dumps(event)) + except Exception as e: + logger.warning(f"[{mailbox.email_address}] Redis publish UID {uid}: {e}") + + logger.info( + f"[{mailbox.email_address}] Nuovo messaggio: UID={uid} " + f"subject={message.subject!r} pec_type={message.pec_type}" + ) + return True diff --git a/worker/app/jobs/__init__.py b/worker/app/jobs/__init__.py new file mode 100644 index 0000000..8c38720 --- /dev/null +++ b/worker/app/jobs/__init__.py @@ -0,0 +1 @@ +# Definizioni job arq diff --git a/worker/app/jobs/sync_mailbox.py b/worker/app/jobs/sync_mailbox.py new file mode 100644 index 0000000..ebdfa58 --- /dev/null +++ b/worker/app/jobs/sync_mailbox.py @@ -0,0 +1,77 @@ +""" +Job arq: sync_mailbox – trigger manuale per forzare la sincronizzazione di una casella. + +Questo job viene usato per: + - Forzare una sync immediata dopo la creazione di una nuova casella + - Resync manuale da parte dell'admin + - Retry dopo un errore (called dal pool monitor) + +Non sostituisce il loop IMAP continuo (IMAPConnection); è un one-shot job. +""" + +import logging +from typing import Any + +from app.database import AsyncSessionLocal +from app.imap.reconnect import ExponentialBackoff +from app.imap.sync import sync_new_messages +from app.models import Mailbox + +logger = logging.getLogger(__name__) + + +async def sync_mailbox(ctx: dict[str, Any], mailbox_id: str) -> dict: + """ + Job arq: sincronizza una singola casella PEC. + + Args: + ctx: contesto arq (contiene redis, pool reference) + mailbox_id: UUID della casella da sincronizzare + + Returns: + dict con risultato del job + """ + redis_client = ctx.get("redis") + + async with AsyncSessionLocal() as db: + mailbox = await db.get(Mailbox, mailbox_id) + + if not mailbox: + return {"status": "error", "message": f"Mailbox {mailbox_id} non trovata"} + + if mailbox.status not in ("active", "error"): + return { + "status": "skipped", + "message": f"Mailbox status={mailbox.status}, skip", + } + + from app.imap.connection import IMAPConnection + + creds = IMAPConnection._decrypt_creds(mailbox) + + try: + from app.imap.connection import IMAPConnection + + conn = IMAPConnection(mailbox_id=mailbox_id, redis_client=redis_client) + client = await conn._connect(creds) + + n = await sync_new_messages(client, mailbox, db, redis_client) + + try: + await client.logout() + except Exception: + pass + + return { + "status": "ok", + "mailbox": mailbox.email_address, + "new_messages": n, + } + + except Exception as e: + logger.error(f"[sync_mailbox] {mailbox_id} errore: {e}", exc_info=True) + return { + "status": "error", + "mailbox": mailbox.email_address, + "message": str(e), + } diff --git a/worker/app/main.py b/worker/app/main.py new file mode 100644 index 0000000..ebf2d2a --- /dev/null +++ b/worker/app/main.py @@ -0,0 +1,159 @@ +""" +Entrypoint worker arq – PecFlow IMAP Sync Engine. + +Avvio: python -m app.main + +Cosa fa: + 1. Connette a Redis tramite arq + 2. on_startup → avvia MailboxPool (N task IMAP asincroni) + 3. on_shutdown → ferma MailboxPool + 4. Registra job arq (sync_mailbox, future: send_pec, archive_batch, ecc.) + 5. Loop arq per processare job dalla coda Redis + +L'event loop è condiviso tra arq e MailboxPool (asyncio task). +""" + +import asyncio +import logging +import sys +from typing import Any + +import redis.asyncio as aioredis +from arq import run_worker +from arq.connections import RedisSettings + +from app.config import get_settings +from app.imap.pool import MailboxPool +from app.jobs.sync_mailbox import sync_mailbox +from app.storage.minio_client import ensure_bucket_exists + +settings = get_settings() + +# Logging +logging.basicConfig( + level=getattr(logging, settings.log_level.upper(), logging.INFO), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + stream=sys.stdout, +) +logger = logging.getLogger(__name__) + +# Pool globale (accessibile dalle callback) +_mailbox_pool: MailboxPool | None = None + + +# ─── Lifecycle callbacks arq ────────────────────────────────────────────────── + +async def on_startup(ctx: dict[str, Any]) -> None: + """ + Inizializzazione worker all'avvio. + Avvia il MailboxPool con tutte le caselle attive. + """ + global _mailbox_pool + + logger.info("🚀 PecFlow Worker avviato") + logger.info(f" DB: {settings.database_url.split('@')[-1]}") + logger.info(f" Redis: {settings.redis_url}") + logger.info(f" MinIO: {settings.minio_endpoint}") + + # Verifica/crea bucket MinIO + try: + await ensure_bucket_exists() + except Exception as e: + logger.warning(f"MinIO non disponibile al startup: {e}") + + # Crea client Redis condiviso + redis_client = aioredis.from_url(settings.redis_url, decode_responses=True) + ctx["redis"] = redis_client + + # Avvia MailboxPool + _mailbox_pool = MailboxPool(redis_client=redis_client) + ctx["mailbox_pool"] = _mailbox_pool + await _mailbox_pool.start() + + logger.info( + f"✅ Worker pronto: {_mailbox_pool.active_count} caselle IMAP attive" + ) + + +async def on_shutdown(ctx: dict[str, Any]) -> None: + """Cleanup all'arresto del worker.""" + global _mailbox_pool + + logger.info("🛑 PecFlow Worker in arresto...") + + pool = ctx.get("mailbox_pool") or _mailbox_pool + if pool: + await pool.stop() + + redis_client = ctx.get("redis") + if redis_client: + await redis_client.aclose() + + logger.info("🛑 Worker fermato") + + +# ─── Worker health check ────────────────────────────────────────────────────── + +async def health_check(ctx: dict[str, Any]) -> dict: + """ + Job speciale per health check del worker. + Può essere chiamato da monitoring esterno. + """ + pool: MailboxPool | None = ctx.get("mailbox_pool") + return { + "status": "ok", + "active_imap_connections": pool.active_count if pool else 0, + } + + +# ─── WorkerSettings arq ─────────────────────────────────────────────────────── + +def _parse_redis_settings() -> RedisSettings: + """Parsa REDIS_URL in RedisSettings arq.""" + url = settings.redis_url + # Supporta redis://host:port/db e redis://user:pass@host:port/db + import urllib.parse + parsed = urllib.parse.urlparse(url) + + host = parsed.hostname or "localhost" + port = parsed.port or 6379 + db = int(parsed.path.lstrip("/") or "0") + password = parsed.password or None + + return RedisSettings(host=host, port=port, database=db, password=password) + + +class WorkerSettings: + """Configurazione del worker arq.""" + + # Funzioni/job registrati + functions = [sync_mailbox, health_check] + + # Callbacks lifecycle + on_startup = on_startup + on_shutdown = on_shutdown + + # Redis + redis_settings = _parse_redis_settings() + + # Concorrenza + max_jobs = 20 + + # Timeout per ogni job (secondi) + job_timeout = 300 + + # Retry automatico in caso di errore + max_tries = 3 + + # Polling interval (arq controlla la coda ogni N ms) + poll_delay = 0.5 + + # Keep job results per N secondi + keep_result = 3600 + + +# ─── Entrypoint ─────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + logger.info("Avvio PecFlow Worker (arq)...") + run_worker(WorkerSettings) diff --git a/worker/app/models.py b/worker/app/models.py new file mode 100644 index 0000000..bd1555f --- /dev/null +++ b/worker/app/models.py @@ -0,0 +1,128 @@ +""" +Re-export dei modelli SQLAlchemy dal backend. + +Il worker usa gli stessi modelli ORM del backend per leggere/scrivere nel DB. +Importa da una base comune tramite il package condiviso. + +Nota: i modelli sono definiti nel backend. Il worker li ridefinisce qui +come classi identiche (stessa struttura) per non creare dipendenza circolare. +Tuttavia, poiché i due container condividono lo stesso DB PostgreSQL, +utilizziamo i modelli del backend ricopiando solo le parti necessarie. +""" + +# I modelli completi sono già in backend/app/models/. +# Il worker li importa dalla propria copia locale per evitare +# una dipendenza del package worker → backend. +# In un monorepo reale si userebbe un shared/ package. + +import uuid +from datetime import datetime + +from sqlalchemy import ( + ARRAY, BigInteger, Boolean, DateTime, Enum, ForeignKey, + Integer, String, Text, func, +) +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class Base(DeclarativeBase): + pass + + +MailboxStatus = Enum( + "active", "paused", "error", "deleted", + name="mailbox_status", create_type=False, +) + +PecDirection = Enum("inbound", "outbound", name="pec_direction", create_type=False) +PecState = Enum( + "draft", "queued", "sent", "accepted", "delivered", "anomaly", "failed", "received", + name="pec_state", create_type=False, +) +PecMsgType = Enum( + "posta_certificata", "accettazione", "non_accettazione", "presa_in_carico", + "avvenuta_consegna", "mancata_consegna", "errore_consegna", + "preavviso_mancata_consegna", "rilevazione_virus", "unknown", + name="pec_msg_type", create_type=False, +) + + +class Mailbox(Base): + __tablename__ = "mailboxes" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + email_address: Mapped[str] = mapped_column(String(255), nullable=False) + display_name: Mapped[str | None] = mapped_column(String(255), nullable=True) + provider: Mapped[str | None] = mapped_column(String(100), nullable=True) + + imap_host_enc: Mapped[str] = mapped_column(Text, nullable=False) + imap_port_enc: Mapped[str] = mapped_column(Text, nullable=False) + imap_user_enc: Mapped[str] = mapped_column(Text, nullable=False) + imap_pass_enc: Mapped[str] = mapped_column(Text, nullable=False) + imap_use_ssl: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + + smtp_host_enc: Mapped[str] = mapped_column(Text, nullable=False) + smtp_port_enc: Mapped[str] = mapped_column(Text, nullable=False) + smtp_user_enc: Mapped[str] = mapped_column(Text, nullable=False) + smtp_pass_enc: Mapped[str] = mapped_column(Text, nullable=False) + smtp_use_tls: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + + status: Mapped[str] = mapped_column(MailboxStatus, nullable=False, default="active") + last_sync_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + last_sync_uid: Mapped[int | None] = mapped_column(BigInteger, nullable=True) + sync_error_msg: Mapped[str | None] = mapped_column(Text, nullable=True) + sync_error_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + + created_by: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) + + +class Message(Base): + __tablename__ = "messages" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + mailbox_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + + message_id_header: Mapped[str | None] = mapped_column(Text, nullable=True) + imap_uid: Mapped[int | None] = mapped_column(BigInteger, nullable=True) + imap_folder: Mapped[str] = mapped_column(String(255), nullable=False, default="INBOX") + + direction: Mapped[str] = mapped_column(PecDirection, nullable=False) + pec_type: Mapped[str] = mapped_column(PecMsgType, nullable=False, default="posta_certificata") + state: Mapped[str] = mapped_column(PecState, nullable=False) + + subject: Mapped[str | None] = mapped_column(Text, nullable=True) + from_address: Mapped[str | None] = mapped_column(String(255), nullable=True) + to_addresses: Mapped[list[str] | None] = mapped_column(ARRAY(Text), nullable=True) + cc_addresses: Mapped[list[str] | None] = mapped_column(ARRAY(Text), nullable=True) + sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + received_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + size_bytes: Mapped[int | None] = mapped_column(BigInteger, nullable=True) + + body_text: Mapped[str | None] = mapped_column(Text, nullable=True) + body_html: Mapped[str | None] = mapped_column(Text, nullable=True) + has_attachments: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + + parent_message_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), ForeignKey("messages.id"), nullable=True + ) + + is_read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + is_starred: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + is_archived: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + archived_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + raw_eml_path: Mapped[str | None] = mapped_column(Text, nullable=True) + + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) diff --git a/worker/app/storage/__init__.py b/worker/app/storage/__init__.py new file mode 100644 index 0000000..02bc823 --- /dev/null +++ b/worker/app/storage/__init__.py @@ -0,0 +1 @@ +# MinIO storage client diff --git a/worker/app/storage/minio_client.py b/worker/app/storage/minio_client.py new file mode 100644 index 0000000..d1200b7 --- /dev/null +++ b/worker/app/storage/minio_client.py @@ -0,0 +1,74 @@ +""" +Client MinIO/S3 asincrono per il worker. + +Percorso EML raw: pecflow/tenants/{tenant_id}/mailboxes/{mailbox_id}/raw/{uid}.eml +Percorso allegati: pecflow/tenants/{tenant_id}/mailboxes/{mailbox_id}/attachments/{msg_id}/{filename} +""" + +import io +import logging +from functools import lru_cache + +from miniopy_async import Minio + +from app.config import get_settings + +logger = logging.getLogger(__name__) +settings = get_settings() + + +@lru_cache(maxsize=1) +def get_minio_client() -> Minio: + """Restituisce l'istanza singleton del client MinIO.""" + return Minio( + endpoint=settings.minio_endpoint, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + secure=settings.minio_use_ssl, + ) + + +async def upload_eml( + tenant_id: str, + mailbox_id: str, + uid: int, + eml_bytes: bytes, +) -> str: + """ + Carica un raw EML su MinIO e restituisce il percorso oggetto. + + Percorso: tenants/{tenant_id}/mailboxes/{mailbox_id}/raw/{uid}.eml + """ + client = get_minio_client() + bucket = settings.minio_bucket + object_path = f"tenants/{tenant_id}/mailboxes/{mailbox_id}/raw/{uid}.eml" + + try: + data_stream = io.BytesIO(eml_bytes) + await client.put_object( + bucket_name=bucket, + object_name=object_path, + data=data_stream, + length=len(eml_bytes), + content_type="message/rfc822", + ) + logger.debug(f"EML caricato: s3://{bucket}/{object_path} ({len(eml_bytes)} bytes)") + return object_path + except Exception as e: + logger.error(f"Errore upload EML {object_path}: {e}") + raise + + +async def ensure_bucket_exists() -> None: + """Verifica che il bucket MinIO esista, altrimenti lo crea.""" + client = get_minio_client() + bucket = settings.minio_bucket + try: + found = await client.bucket_exists(bucket) + if not found: + await client.make_bucket(bucket) + logger.info(f"Bucket MinIO creato: {bucket}") + else: + logger.debug(f"Bucket MinIO esistente: {bucket}") + except Exception as e: + logger.warning(f"Impossibile verificare/creare bucket MinIO: {e}") diff --git a/worker/pyproject.toml b/worker/pyproject.toml new file mode 100644 index 0000000..1ac9cca --- /dev/null +++ b/worker/pyproject.toml @@ -0,0 +1,75 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "pecflow-worker" +version = "1.0.0" +description = "PecFlow – Worker IMAP sync + background jobs" +requires-python = ">=3.12" + +dependencies = [ + # Job queue + "arq>=0.26.1", + + # Database (stessi driver del backend) + "sqlalchemy>=2.0.36", + "asyncpg>=0.29.0", + "alembic>=1.13.0", + + # Configurazione + "pydantic>=2.9.0", + "pydantic-settings>=2.5.0", + + # Redis + "redis[asyncio]>=5.0.0", + + # IMAP async + "aioimaplib>=2.0.0", + + # Storage MinIO/S3 + "miniopy-async>=1.21.0", + + # Sicurezza (stesso modulo del backend per decifratura credenziali) + "cryptography>=43.0.0", + "python-jose[cryptography]>=3.3.0", + "bcrypt>=4.0.0", + + # Utilities + "python-dotenv>=1.0.0", + "email-validator>=2.2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3.0", + "pytest-asyncio>=0.24.0", + "pytest-cov>=5.0.0", + "anyio>=4.6.0", + "ruff>=0.7.0", + "mypy>=1.13.0", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["app*"] + +[tool.ruff] +target-version = "py312" +line-length = 100 +src = ["app", "tests"] + +[tool.ruff.lint] +select = ["E", "W", "F", "I", "B", "C4", "UP"] +ignore = ["E501", "B008", "B904"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +filterwarnings = [ + "ignore::DeprecationWarning", + "ignore::PendingDeprecationWarning", +] diff --git a/worker/tests/__init__.py b/worker/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worker/tests/integration/__init__.py b/worker/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worker/tests/integration/test_imap_sync.py b/worker/tests/integration/test_imap_sync.py new file mode 100644 index 0000000..fa8291c --- /dev/null +++ b/worker/tests/integration/test_imap_sync.py @@ -0,0 +1,270 @@ +""" +Test di integrazione IMAP con GreenMail (server IMAP mock via Docker). + +Prerequisiti: + - GreenMail in esecuzione: docker compose --profile greenmail up -d greenmail + - Endpoint IMAP: localhost:3143 (plain) o localhost:3993 (SSL) + - Credenziali test: test@example.com / secret + +Esecuzione: + cd worker && pytest tests/integration/test_imap_sync.py -v + +I test sono contrassegnati con @pytest.mark.integration e saltati se +GREENMAIL_HOST non è raggiungibile. +""" + +import asyncio +import os +import socket +import uuid +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +# ─── Skip automatico se GreenMail non disponibile ───────────────────────────── + +GREENMAIL_HOST = os.getenv("GREENMAIL_HOST", "localhost") +GREENMAIL_IMAP_PORT = int(os.getenv("GREENMAIL_IMAP_PORT", "3143")) + + +def _is_greenmail_running() -> bool: + """Verifica se GreenMail IMAP è raggiungibile.""" + try: + with socket.create_connection((GREENMAIL_HOST, GREENMAIL_IMAP_PORT), timeout=2): + return True + except OSError: + return False + + +skip_if_no_greenmail = pytest.mark.skipif( + not _is_greenmail_running(), + reason=f"GreenMail non disponibile su {GREENMAIL_HOST}:{GREENMAIL_IMAP_PORT}", +) + +# ─── Fixture: mailbox mock ──────────────────────────────────────────────────── + + +def make_mock_mailbox( + email_address: str = "test@example.com", + host: str = GREENMAIL_HOST, + port: int = GREENMAIL_IMAP_PORT, + user: str = "test@example.com", + password: str = "secret", + use_ssl: bool = False, + last_sync_uid: int | None = None, +) -> MagicMock: + """Crea un mock di Mailbox con credenziali cifrate per GreenMail.""" + from app.imap.connection import _decrypt + from app.config import get_settings + import base64 + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + + settings = get_settings() + key = settings.encryption_key_bytes + aesgcm = AESGCM(key) + + def encrypt(value: str) -> str: + import os as _os + nonce = _os.urandom(12) + ct = aesgcm.encrypt(nonce, value.encode(), None) + return base64.b64encode(nonce + ct).decode() + + mailbox = MagicMock() + mailbox.id = uuid.uuid4() + mailbox.tenant_id = uuid.uuid4() + mailbox.email_address = email_address + mailbox.status = "active" + mailbox.last_sync_uid = last_sync_uid + mailbox.sync_error_count = 0 + mailbox.sync_error_msg = None + + mailbox.imap_host_enc = encrypt(host) + mailbox.imap_port_enc = encrypt(str(port)) + mailbox.imap_user_enc = encrypt(user) + mailbox.imap_pass_enc = encrypt(password) + mailbox.imap_use_ssl = use_ssl + + return mailbox + + +# ─── Test: connessione IMAP a GreenMail ────────────────────────────────────── + +@skip_if_no_greenmail +@pytest.mark.asyncio +async def test_imap_connect_greenmail(): + """Verifica che IMAPConnection si connetta correttamente a GreenMail.""" + import aioimaplib + from app.imap.connection import IMAPConnection + + mailbox = make_mock_mailbox() + creds = IMAPConnection._decrypt_creds(mailbox) + + conn = IMAPConnection.__new__(IMAPConnection) + client = await conn._connect(creds) + + assert client is not None + + # Verifica che SELECT INBOX abbia funzionato + status, _ = await client.select("INBOX") + assert status == "OK" + + await client.logout() + + +@skip_if_no_greenmail +@pytest.mark.asyncio +async def test_imap_search_empty_inbox(): + """SEARCH su inbox vuota restituisce lista vuota.""" + import aioimaplib + from app.imap.connection import IMAPConnection + + mailbox = make_mock_mailbox(last_sync_uid=0) + creds = IMAPConnection._decrypt_creds(mailbox) + + conn = IMAPConnection.__new__(IMAPConnection) + client = await conn._connect(creds) + + # Cerca UID > 0 (tutti) + status, data = await client.uid("SEARCH", "UID", "1:*") + # Non deve sollevare eccezione + assert status in ("OK", "NO") + + await client.logout() + + +# ─── Test: sync con DB mockato ──────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_sync_new_messages_empty(): + """sync_new_messages con inbox vuota restituisce 0.""" + from app.imap.sync import sync_new_messages + + # Mock IMAP client che risponde con lista vuota + mock_client = AsyncMock() + mock_client.uid = AsyncMock(return_value=("OK", [b""])) + + # Mock mailbox + mailbox = MagicMock() + mailbox.id = uuid.uuid4() + mailbox.tenant_id = uuid.uuid4() + mailbox.email_address = "test@example.com" + mailbox.last_sync_uid = 0 + + # Mock DB + mock_db = AsyncMock() + mock_db.execute = AsyncMock(return_value=MagicMock(scalar_one_or_none=lambda: None)) + + # Mock Redis + mock_redis = AsyncMock() + + n = await sync_new_messages(mock_client, mailbox, mock_db, mock_redis) + assert n == 0 + + +@pytest.mark.asyncio +async def test_sync_skips_duplicate_uid(): + """sync_new_messages salta UID già presenti nel DB.""" + from app.imap.sync import sync_new_messages + from sqlalchemy.engine import Result + + existing_uid = 42 + + mock_client = AsyncMock() + # SEARCH restituisce UID 42 + mock_client.uid = AsyncMock(return_value=("OK", [f"{existing_uid}".encode()])) + + mailbox = MagicMock() + mailbox.id = uuid.uuid4() + mailbox.tenant_id = uuid.uuid4() + mailbox.email_address = "test@example.com" + mailbox.last_sync_uid = 41 + + # DB: UID 42 già presente + mock_result = MagicMock() + mock_result.scalar_one_or_none.return_value = uuid.uuid4() # esiste già + + mock_db = AsyncMock() + mock_db.execute = AsyncMock(return_value=mock_result) + mock_db.flush = AsyncMock() + mock_db.commit = AsyncMock() + + mock_redis = AsyncMock() + + n = await sync_new_messages(mock_client, mailbox, mock_db, mock_redis) + # Deve restituire 0 perché il messaggio è già in DB + assert n == 0 + + +# ─── Test: parsing EML ──────────────────────────────────────────────────────── + +def test_parse_pec_message(): + """Parsing di un EML PEC tipico.""" + from app.imap.sync import _parse_eml + + raw = b"""From: mittente@aruba.pec.it +To: destinatario@pec.it +Subject: Comunicazione ufficiale n. 2026/001 +Date: Wed, 18 Mar 2026 09:00:00 +0100 +Message-ID: <20260318090000.1234@aruba.pec.it> +X-Ricevuta: posta-certificata +Content-Type: text/plain; charset=utf-8 + +Gentile destinatario, +in riferimento alla pratica n. 2026/001... +""" + + parsed = _parse_eml(raw) + assert parsed["subject"] == "Comunicazione ufficiale n. 2026/001" + assert parsed["from_address"] == "mittente@aruba.pec.it" + assert parsed["pec_type"] == "posta_certificata" + assert parsed["sent_at"] is not None + assert "destinatario@pec.it" in parsed["to_addresses"] + + +def test_parse_ricevuta_avvenuta_consegna(): + """Parsing di una ricevuta di avvenuta consegna.""" + from app.imap.sync import _parse_eml + + raw = b"""From: posta-certificata@aruba.pec.it +To: mittente@aruba.pec.it +Subject: AVVENUTA CONSEGNA: Comunicazione n. 001 +Date: Wed, 18 Mar 2026 09:05:00 +0100 +Message-ID: +X-Riferimento-Message-ID: <20260318090000.1234@aruba.pec.it> +X-TipoRicevuta: avvenuta-consegna +Content-Type: text/plain; charset=utf-8 + +Il messaggio e' stato consegnato. +""" + + parsed = _parse_eml(raw) + assert parsed["pec_type"] == "avvenuta_consegna" + assert "AVVENUTA CONSEGNA" in parsed["subject"] + + +# ─── Test: backoff con connessione reale ───────────────────────────────────── + +@skip_if_no_greenmail +@pytest.mark.asyncio +async def test_reconnect_after_logout(): + """Il sistema si riconnette dopo una disconnessione forzata.""" + from app.imap.connection import IMAPConnection + + mailbox = make_mock_mailbox() + creds = IMAPConnection._decrypt_creds(mailbox) + + conn = IMAPConnection.__new__(IMAPConnection) + + # Prima connessione + client1 = await conn._connect(creds) + assert client1 is not None + + # Disconnessione forzata + await client1.logout() + + # Seconda connessione (simula riconnessione) + client2 = await conn._connect(creds) + assert client2 is not None + + await client2.logout() diff --git a/worker/tests/unit/__init__.py b/worker/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/worker/tests/unit/test_reconnect.py b/worker/tests/unit/test_reconnect.py new file mode 100644 index 0000000..36f994e --- /dev/null +++ b/worker/tests/unit/test_reconnect.py @@ -0,0 +1,103 @@ +""" +Unit test: ExponentialBackoff +""" + +import asyncio +import time + +import pytest + +from app.imap.reconnect import ExponentialBackoff + + +@pytest.mark.asyncio +async def test_backoff_increases(): + """Il tempo di attesa aumenta ad ogni tentativo.""" + backoff = ExponentialBackoff(label="test", jitter=False) + + # Registra i tempi senza aspettare davvero (patch asyncio.sleep) + waits = [] + + original_sleep = asyncio.sleep + + async def fake_sleep(t): + waits.append(t) + + asyncio.sleep = fake_sleep + try: + await backoff.wait() + await backoff.wait() + await backoff.wait() + finally: + asyncio.sleep = original_sleep + + assert waits[1] > waits[0], "Il secondo wait deve essere maggiore del primo" + assert waits[2] > waits[1], "Il terzo wait deve essere maggiore del secondo" + + +@pytest.mark.asyncio +async def test_backoff_reset(): + """reset() riporta il contatore a zero e il wait riparte dal valore iniziale.""" + backoff = ExponentialBackoff(label="test", jitter=False) + waits = [] + + async def fake_sleep(t): + waits.append(t) + + original_sleep = asyncio.sleep + asyncio.sleep = fake_sleep + try: + await backoff.wait() # waits[0]: valore iniziale (es. 1.0) + await backoff.wait() # waits[1]: valore incrementato (es. 2.0) + backoff.reset() + await backoff.wait() # waits[2]: deve tornare al valore iniziale + finally: + asyncio.sleep = original_sleep + + # Dopo reset il wait riparte dal valore iniziale + assert backoff.attempt == 1 + assert waits[2] == waits[0], ( + f"Dopo reset il wait deve tornare a {waits[0]}, ma era {waits[2]}" + ) + + +@pytest.mark.asyncio +async def test_backoff_max(): + """Il tempo di attesa non supera backoff_max_seconds.""" + from app.config import get_settings + settings = get_settings() + + backoff = ExponentialBackoff(label="test", jitter=False) + max_recorded = 0.0 + + async def fake_sleep(t): + nonlocal max_recorded + max_recorded = max(max_recorded, t) + + original_sleep = asyncio.sleep + asyncio.sleep = fake_sleep + try: + for _ in range(20): + await backoff.wait() + finally: + asyncio.sleep = original_sleep + + assert max_recorded <= settings.backoff_max_seconds + 0.1 + + +def test_backoff_attempt_count(): + """Il contatore tentativi si incrementa correttamente.""" + import asyncio + backoff = ExponentialBackoff(label="test", jitter=False) + + async def run(): + async def fake_sleep(_): + pass + + asyncio.sleep = fake_sleep + await backoff.wait() + await backoff.wait() + await backoff.wait() + + asyncio.get_event_loop().run_until_complete(run()) + assert backoff.attempt == 3 diff --git a/worker/tests/unit/test_sync_parsing.py b/worker/tests/unit/test_sync_parsing.py new file mode 100644 index 0000000..503d598 --- /dev/null +++ b/worker/tests/unit/test_sync_parsing.py @@ -0,0 +1,197 @@ +""" +Unit test: parsing EML e classificazione tipo PEC. +""" + +import pytest + +from app.imap.sync import _decode_header, _extract_addresses, _parse_date, _parse_eml, _classify_pec_type +import email + + +# ─── Test _decode_header ───────────────────────────────────────────────────── + +def test_decode_header_plain(): + assert _decode_header("Hello World") == "Hello World" + + +def test_decode_header_none(): + assert _decode_header(None) is None + + +def test_decode_header_encoded(): + # Header UTF-8 base64 encoded + encoded = "=?utf-8?b?UEVDIHRlc3Q=?=" # "PEC test" + assert _decode_header(encoded) == "PEC test" + + +def test_decode_header_encoded_iso(): + # Header ISO-8859-1 quoted printable + encoded = "=?iso-8859-1?q?Multa_n=2E_123?=" # "Multa n. 123" + result = _decode_header(encoded) + assert result is not None + assert "Multa" in result + + +# ─── Test _extract_addresses ────────────────────────────────────────────────── + +def test_extract_addresses_single(): + addrs = _extract_addresses("test@example.com") + assert "test@example.com" in addrs + + +def test_extract_addresses_multiple(): + addrs = _extract_addresses("a@x.com, b@y.com, c@z.com") + assert len(addrs) == 3 + assert "a@x.com" in addrs + assert "b@y.com" in addrs + + +def test_extract_addresses_with_display_name(): + addrs = _extract_addresses('"Mario Rossi" ') + assert "mario@comune.it" in addrs + + +def test_extract_addresses_empty(): + assert _extract_addresses(None) == [] + assert _extract_addresses("") == [] + + +# ─── Test _parse_date ───────────────────────────────────────────────────────── + +def test_parse_date_valid(): + date = _parse_date("Wed, 18 Mar 2026 14:00:00 +0100") + assert date is not None + assert date.year == 2026 + assert date.month == 3 + assert date.day == 18 + + +def test_parse_date_none(): + assert _parse_date(None) is None + + +def test_parse_date_invalid(): + assert _parse_date("not-a-date") is None + + +# ─── Test _classify_pec_type ────────────────────────────────────────────────── + +def test_classify_pec_type_avvenuta_consegna(): + msg = email.message_from_string( + "From: server@pec.it\r\n" + "X-TipoRicevuta: avvenuta-consegna\r\n" + "\r\n" + ) + assert _classify_pec_type(msg) == "avvenuta_consegna" + + +def test_classify_pec_type_accettazione(): + msg = email.message_from_string( + "From: server@pec.it\r\n" + "X-Ricevuta: accettazione\r\n" + "\r\n" + ) + assert _classify_pec_type(msg) == "accettazione" + + +def test_classify_pec_type_posta_certificata(): + msg = email.message_from_string( + "From: mittente@pec.it\r\n" + "Subject: Messaggio PEC\r\n" + "\r\n" + ) + assert _classify_pec_type(msg) == "posta_certificata" + + +def test_classify_pec_type_x_tipo_prevalente(): + """X-TipoRicevuta ha precedenza su X-Ricevuta.""" + msg = email.message_from_string( + "From: server@pec.it\r\n" + "X-Ricevuta: accettazione\r\n" + "X-TipoRicevuta: avvenuta-consegna\r\n" + "\r\n" + ) + assert _classify_pec_type(msg) == "avvenuta_consegna" + + +# ─── Test _parse_eml completo ───────────────────────────────────────────────── + +RAW_EML = b"""From: mittente@pec.it +To: destinatario@pec.it +Cc: copia@pec.it +Subject: Test PEC Fase 2 +Message-ID: +Date: Wed, 18 Mar 2026 10:00:00 +0100 +Content-Type: text/plain; charset=utf-8 + +Corpo del messaggio di test. +""" + + +def test_parse_eml_subject(): + parsed = _parse_eml(RAW_EML) + assert parsed["subject"] == "Test PEC Fase 2" + + +def test_parse_eml_from(): + parsed = _parse_eml(RAW_EML) + assert parsed["from_address"] == "mittente@pec.it" + + +def test_parse_eml_to(): + parsed = _parse_eml(RAW_EML) + assert "destinatario@pec.it" in parsed["to_addresses"] + + +def test_parse_eml_cc(): + parsed = _parse_eml(RAW_EML) + assert "copia@pec.it" in parsed["cc_addresses"] + + +def test_parse_eml_message_id(): + parsed = _parse_eml(RAW_EML) + assert parsed["message_id_header"] == "" + + +def test_parse_eml_body_text(): + parsed = _parse_eml(RAW_EML) + assert "Corpo del messaggio" in (parsed.get("body_text") or "") + + +def test_parse_eml_no_attachments(): + parsed = _parse_eml(RAW_EML) + assert parsed["has_attachments"] is False + + +RAW_EML_WITH_ATTACHMENT = b"""From: mittente@pec.it +To: destinatario@pec.it +Subject: PEC con allegato +Date: Wed, 18 Mar 2026 10:00:00 +0100 +Content-Type: multipart/mixed; boundary="----boundary123" + +------boundary123 +Content-Type: text/plain; charset=utf-8 + +Testo del messaggio. + +------boundary123 +Content-Type: application/pdf; name="documento.pdf" +Content-Disposition: attachment; filename="documento.pdf" +Content-Transfer-Encoding: base64 + +JVBERi0xLjQ= + +------boundary123-- +""" + + +def test_parse_eml_with_attachment(): + parsed = _parse_eml(RAW_EML_WITH_ATTACHMENT) + assert parsed["has_attachments"] is True + assert "Testo del messaggio" in (parsed.get("body_text") or "") + + +def test_parse_eml_empty(): + parsed = _parse_eml(b"") + # Non deve sollevare eccezione + assert isinstance(parsed, dict)