""" Job arq: send_pec – invio PEC con retry esponenziale. Flusso completo: 1. API backend crea Message(state=queued) + SendJob(status=pending) 2. Backend enqueue send_pec via arq 3. send_pec legge il job dal DB, tenta l'invio SMTP 4. Successo → Message(state=sent), SendJob(status=sent), upload EML su MinIO, enqueue watch_receipt con defer 24h 5. Fallimento transitorio → status=retrying, re-enqueue con backoff 6. Fallimento definitivo (max_attempts raggiunto) → status=failed, Message(state=failed), evento WS Retry backoff (max 5 tentativi totali): Tentativo 1 fallisce → attendi 1 min → tentativo 2 Tentativo 2 fallisce → attendi 5 min → tentativo 3 Tentativo 3 fallisce → attendi 15 min → tentativo 4 Tentativo 4 fallisce → attendi 1 ora → tentativo 5 Tentativo 5 fallisce → FAILED (no retry) """ import io import json import logging import uuid as uuid_module from datetime import datetime, timedelta, timezone from email.utils import make_msgid from typing import Any from sqlalchemy import select from app.database import AsyncSessionLocal from app.models import Attachment, Mailbox, Message, SendJob from app.smtp.sender import SmtpSender from app.storage.minio_client import download_attachment, upload_outbound_eml logger = logging.getLogger(__name__) # ─── Configurazione retry ───────────────────────────────────────────────────── MAX_ATTEMPTS = 5 # Delay in secondi dopo il fallimento del tentativo N (0-based) RETRY_DELAYS = [ 60, # dopo tentativo 1 fallisce → 1 min 300, # dopo tentativo 2 fallisce → 5 min 900, # dopo tentativo 3 fallisce → 15 min 3600, # dopo tentativo 4 fallisce → 1 ora # tentativo 5 = ultimo → nessun retry ] # ─── Job principale ─────────────────────────────────────────────────────────── async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict: """ Job arq: invia una PEC. Args: ctx: contesto arq (contiene redis client arq per re-enqueue) send_job_id: UUID del SendJob da processare Returns: dict con esito: status, message_id, attempt """ redis_client = ctx.get("redis") async with AsyncSessionLocal() as db: # ── Carica dati ────────────────────────────────────────────────────── job = await db.get(SendJob, uuid_module.UUID(send_job_id)) if not job: logger.error(f"[send_pec] SendJob {send_job_id} non trovato") return {"status": "error", "message": "SendJob non trovato"} if job.status in ("sent", "failed"): logger.warning( f"[send_pec] SendJob {send_job_id} già in stato {job.status!r}, skip" ) return {"status": "skipped", "current_status": job.status} msg = await db.get(Message, job.message_id) if not msg: logger.error(f"[send_pec] Messaggio {job.message_id} non trovato") job.status = "failed" job.last_error = "Messaggio associato non trovato" await db.commit() return {"status": "error", "message": "Messaggio non trovato"} mailbox = await db.get(Mailbox, job.mailbox_id) if not mailbox: logger.error(f"[send_pec] Casella {job.mailbox_id} non trovata") job.status = "failed" job.last_error = "Casella mittente non trovata" msg.state = "failed" await db.commit() return {"status": "error", "message": "Casella non trovata"} # ── Aggiorna contatori ──────────────────────────────────────────────── job.status = "sending" job.attempt_count += 1 current_attempt = job.attempt_count await db.flush() logger.info( f"[send_pec] Tentativo {current_attempt}/{MAX_ATTEMPTS} " f"per job {send_job_id} → {msg.to_addresses}" ) # ── Carica allegati da MinIO (se presenti) ──────────────────────────── attachments_data: list[dict] | None = None if msg.has_attachments: att_result = await db.execute( select(Attachment).where(Attachment.message_id == msg.id) ) att_records = list(att_result.scalars().all()) if att_records: attachments_data = [] for att in att_records: try: content = await download_attachment(att.storage_path) attachments_data.append( { "filename": att.filename, "content": content, "content_type": att.content_type or "application/octet-stream", } ) logger.debug( f"[send_pec] Allegato caricato per invio: " f"{att.filename} ({len(content)} bytes)" ) except Exception as att_err: logger.warning( f"[send_pec] Impossibile caricare allegato " f"'{att.filename}' ({att.storage_path}): {att_err}" ) # Continua senza questo allegato # ── Pre-genera Message-ID e commita PRIMA dell'invio SMTP ───────────── # RACE CONDITION FIX: il Message-ID viene scritto nel DB e committato # PRIMA dell'handshake SMTP, in modo che, se il ciclo IMAP sync del # worker gira durante l'invio (finestra di secondi), trovi gia' il # record con message_id_header valorizzato e possa bindare correttamente # le ricevute di accettazione (da Aruba: entro 1-2s dalla ricezione). # Senza questo fix la ricevuta di accettazione veniva sempre processata # con message_id_header=NULL nel DB → binding fallisce → parent_message_id # non impostato sulla ricevuta → ricevuta compare in inbox come messaggio # ordinario e lo stato outbound non avanza ad "accepted". if not msg.message_id_header: # Primo tentativo: genera e persiste un nuovo Message-ID pre_generated_id = make_msgid(domain="pechub.local") msg.message_id_header = pre_generated_id await db.commit() logger.info( f"[send_pec] Message-ID pre-committato prima dell'SMTP: " f"{pre_generated_id}" ) else: # Retry successivo: riusa il Message-ID gia' persistito dal # tentativo precedente. In questo modo eventuali ricevute gia' # salvate (ma non bindato per la race) possono essere ricollegate. pre_generated_id = msg.message_id_header logger.debug( f"[send_pec] Retry - riutilizzo Message-ID esistente: " f"{pre_generated_id}" ) # ── Tenta invio SMTP ────────────────────────────────────────────────── try: sender = SmtpSender(mailbox) message_id_header, raw_eml = await sender.send( to_addresses=list(msg.to_addresses or []), cc_addresses=list(msg.cc_addresses or []), subject=msg.subject or "", body_text=msg.body_text or "", body_html=msg.body_html, attachments=attachments_data, preset_message_id=pre_generated_id, ) # ── Successo: aggiorna DB ───────────────────────────────────────── # msg.message_id_header e' gia' impostato e committato sopra. # message_id_header restituito dal sender deve essere identico a # pre_generated_id (stessa stringa che abbiamo passato). now = datetime.now(tz=timezone.utc) msg.state = "sent" msg.sent_at = now job.status = "sent" job.sent_at = now job.message_id = msg.id # Upload raw EML su MinIO try: eml_path = await upload_outbound_eml( tenant_id=str(msg.tenant_id), mailbox_id=str(msg.mailbox_id), message_id=str(msg.id), eml_bytes=raw_eml, ) msg.raw_eml_path = eml_path logger.debug(f"[send_pec] EML salvato: {eml_path}") except Exception as minio_err: logger.warning(f"[send_pec] Upload MinIO fallito (non critico): {minio_err}") await db.commit() # ── Pubblica evento WS ──────────────────────────────────────────── if redis_client: await _publish_ws_event(redis_client, msg.tenant_id, { "type": "message:sent", "message_id": str(msg.id), "mailbox_id": str(msg.mailbox_id), "subject": msg.subject, "message_id_header": message_id_header, }) # ── Enqueue sync_mailbox dopo 60s per rilevare ricevute rapidamente ── # La connessione IMAP IDLE potrebbe avere un heartbeat di 28 minuti; # forziamo un sync immediato per non attendere il prossimo ciclo IDLE. try: await redis_client.enqueue_job( "sync_mailbox", str(mailbox.id), _defer_by=timedelta(seconds=60), ) logger.info( f"[send_pec] sync_mailbox schedulato per {mailbox.id} tra 60s" ) except Exception as e: logger.warning(f"[send_pec] Errore enqueue sync_mailbox post-invio: {e}") # ── Enqueue watch_receipt dopo 24h ──────────────────────────────── try: await redis_client.enqueue_job( "watch_receipt", str(msg.id), _defer_by=timedelta(hours=24), ) logger.info( f"[send_pec] watch_receipt schedulato per {msg.id} " f"tra 24h" ) except Exception as e: logger.warning(f"[send_pec] Errore enqueue watch_receipt: {e}") logger.info( f"[send_pec] ✅ PEC inviata: job={send_job_id} " f"message_id_header={message_id_header}" ) return { "status": "sent", "send_job_id": send_job_id, "message_id": str(msg.id), "message_id_header": message_id_header, "attempt": current_attempt, } except Exception as smtp_error: error_msg = str(smtp_error) logger.warning( f"[send_pec] Tentativo {current_attempt} fallito: {error_msg}" ) # ── Gestione retry / failure ────────────────────────────────────── if current_attempt >= MAX_ATTEMPTS: # Esauriti tutti i tentativi → FAILED job.status = "failed" job.last_error = error_msg msg.state = "failed" await db.commit() # Pubblica evento WS: invio fallito if redis_client: await _publish_ws_event(redis_client, msg.tenant_id, { "type": "message:send_failed", "message_id": str(msg.id), "mailbox_id": str(msg.mailbox_id), "subject": msg.subject, "error": error_msg, "attempts": current_attempt, }) logger.error( f"[send_pec] ❌ Invio FALLITO definitivamente: " f"job={send_job_id}, errore: {error_msg}" ) return { "status": "failed", "send_job_id": send_job_id, "error": error_msg, "attempts": current_attempt, } else: # Retry con backoff delay_seconds = RETRY_DELAYS[current_attempt - 1] next_retry = datetime.now(tz=timezone.utc) + timedelta(seconds=delay_seconds) job.status = "retrying" job.last_error = error_msg job.next_retry_at = next_retry msg.state = "queued" # torna in coda await db.commit() # Re-enqueue con defer try: await redis_client.enqueue_job( "send_pec", send_job_id, _defer_by=timedelta(seconds=delay_seconds), ) logger.info( f"[send_pec] Retry {current_attempt} schedulato " f"in {delay_seconds}s per job {send_job_id}" ) except Exception as enqueue_err: logger.error( f"[send_pec] Impossibile re-enqueue job {send_job_id}: " f"{enqueue_err}" ) return { "status": "retrying", "send_job_id": send_job_id, "attempt": current_attempt, "next_retry_at": next_retry.isoformat(), "delay_seconds": delay_seconds, "error": error_msg, } # ─── Helpers ────────────────────────────────────────────────────────────────── async def _publish_ws_event( redis_client: Any, tenant_id: uuid_module.UUID, event: dict, ) -> None: """Pubblica un evento WebSocket per il tenant tramite Redis pub/sub.""" try: channel = f"ws:tenant:{tenant_id}" await redis_client.publish(channel, json.dumps(event, default=str)) except Exception as e: logger.warning(f"[send_pec] Errore pubblicazione WS: {e}")