Files
2026-04-07 11:32:03 +02:00

313 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Job arq: send_pec invio PEC con retry esponenziale.
Flusso completo:
1. API backend crea Message(state=queued) + SendJob(status=pending)
2. Backend enqueue send_pec via arq
3. send_pec legge il job dal DB, tenta l'invio SMTP
4. Successo → Message(state=sent), SendJob(status=sent), upload EML su MinIO,
enqueue watch_receipt con defer 24h
5. Fallimento transitorio → status=retrying, re-enqueue con backoff
6. Fallimento definitivo (max_attempts raggiunto) → status=failed,
Message(state=failed), evento WS
Retry backoff (max 5 tentativi totali):
Tentativo 1 fallisce → attendi 1 min → tentativo 2
Tentativo 2 fallisce → attendi 5 min → tentativo 3
Tentativo 3 fallisce → attendi 15 min → tentativo 4
Tentativo 4 fallisce → attendi 1 ora → tentativo 5
Tentativo 5 fallisce → FAILED (no retry)
"""
import io
import json
import logging
import uuid as uuid_module
from datetime import datetime, timedelta, timezone
from typing import Any
from sqlalchemy import select
from app.database import AsyncSessionLocal
from app.models import Attachment, Mailbox, Message, SendJob
from app.smtp.sender import SmtpSender
from app.storage.minio_client import download_attachment, upload_outbound_eml
logger = logging.getLogger(__name__)
# ─── Configurazione retry ─────────────────────────────────────────────────────
MAX_ATTEMPTS = 5
# Delay in secondi dopo il fallimento del tentativo N (0-based)
RETRY_DELAYS = [
60, # dopo tentativo 1 fallisce → 1 min
300, # dopo tentativo 2 fallisce → 5 min
900, # dopo tentativo 3 fallisce → 15 min
3600, # dopo tentativo 4 fallisce → 1 ora
# tentativo 5 = ultimo → nessun retry
]
# ─── Job principale ───────────────────────────────────────────────────────────
async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict:
"""
Job arq: invia una PEC.
Args:
ctx: contesto arq (contiene redis client arq per re-enqueue)
send_job_id: UUID del SendJob da processare
Returns:
dict con esito: status, message_id, attempt
"""
redis_client = ctx.get("redis")
async with AsyncSessionLocal() as db:
# ── Carica dati ──────────────────────────────────────────────────────
job = await db.get(SendJob, uuid_module.UUID(send_job_id))
if not job:
logger.error(f"[send_pec] SendJob {send_job_id} non trovato")
return {"status": "error", "message": "SendJob non trovato"}
if job.status in ("sent", "failed"):
logger.warning(
f"[send_pec] SendJob {send_job_id} già in stato {job.status!r}, skip"
)
return {"status": "skipped", "current_status": job.status}
msg = await db.get(Message, job.message_id)
if not msg:
logger.error(f"[send_pec] Messaggio {job.message_id} non trovato")
job.status = "failed"
job.last_error = "Messaggio associato non trovato"
await db.commit()
return {"status": "error", "message": "Messaggio non trovato"}
mailbox = await db.get(Mailbox, job.mailbox_id)
if not mailbox:
logger.error(f"[send_pec] Casella {job.mailbox_id} non trovata")
job.status = "failed"
job.last_error = "Casella mittente non trovata"
msg.state = "failed"
await db.commit()
return {"status": "error", "message": "Casella non trovata"}
# ── Aggiorna contatori ────────────────────────────────────────────────
job.status = "sending"
job.attempt_count += 1
current_attempt = job.attempt_count
await db.flush()
logger.info(
f"[send_pec] Tentativo {current_attempt}/{MAX_ATTEMPTS} "
f"per job {send_job_id}{msg.to_addresses}"
)
# ── Carica allegati da MinIO (se presenti) ────────────────────────────
attachments_data: list[dict] | None = None
if msg.has_attachments:
att_result = await db.execute(
select(Attachment).where(Attachment.message_id == msg.id)
)
att_records = list(att_result.scalars().all())
if att_records:
attachments_data = []
for att in att_records:
try:
content = await download_attachment(att.storage_path)
attachments_data.append(
{
"filename": att.filename,
"content": content,
"content_type": att.content_type or "application/octet-stream",
}
)
logger.debug(
f"[send_pec] Allegato caricato per invio: "
f"{att.filename} ({len(content)} bytes)"
)
except Exception as att_err:
logger.warning(
f"[send_pec] Impossibile caricare allegato "
f"'{att.filename}' ({att.storage_path}): {att_err}"
)
# Continua senza questo allegato
# ── Tenta invio SMTP ──────────────────────────────────────────────────
try:
sender = SmtpSender(mailbox)
message_id_header, raw_eml = await sender.send(
to_addresses=list(msg.to_addresses or []),
cc_addresses=list(msg.cc_addresses or []),
subject=msg.subject or "",
body_text=msg.body_text or "",
body_html=msg.body_html,
attachments=attachments_data,
)
# ── Successo: aggiorna DB ─────────────────────────────────────────
now = datetime.now(tz=timezone.utc)
msg.message_id_header = message_id_header
msg.state = "sent"
msg.sent_at = now
job.status = "sent"
job.sent_at = now
job.message_id = msg.id
# Upload raw EML su MinIO
try:
eml_path = await upload_outbound_eml(
tenant_id=str(msg.tenant_id),
mailbox_id=str(msg.mailbox_id),
message_id=str(msg.id),
eml_bytes=raw_eml,
)
msg.raw_eml_path = eml_path
logger.debug(f"[send_pec] EML salvato: {eml_path}")
except Exception as minio_err:
logger.warning(f"[send_pec] Upload MinIO fallito (non critico): {minio_err}")
await db.commit()
# ── Pubblica evento WS ────────────────────────────────────────────
if redis_client:
await _publish_ws_event(redis_client, msg.tenant_id, {
"type": "message:sent",
"message_id": str(msg.id),
"mailbox_id": str(msg.mailbox_id),
"subject": msg.subject,
"message_id_header": message_id_header,
})
# ── Enqueue sync_mailbox dopo 60s per rilevare ricevute rapidamente ──
# La connessione IMAP IDLE potrebbe avere un heartbeat di 28 minuti;
# forziamo un sync immediato per non attendere il prossimo ciclo IDLE.
try:
await redis_client.enqueue_job(
"sync_mailbox",
str(mailbox.id),
_defer_by=timedelta(seconds=60),
)
logger.info(
f"[send_pec] sync_mailbox schedulato per {mailbox.id} tra 60s"
)
except Exception as e:
logger.warning(f"[send_pec] Errore enqueue sync_mailbox post-invio: {e}")
# ── Enqueue watch_receipt dopo 24h ────────────────────────────────
try:
await redis_client.enqueue_job(
"watch_receipt",
str(msg.id),
_defer_by=timedelta(hours=24),
)
logger.info(
f"[send_pec] watch_receipt schedulato per {msg.id} "
f"tra 24h"
)
except Exception as e:
logger.warning(f"[send_pec] Errore enqueue watch_receipt: {e}")
logger.info(
f"[send_pec] ✅ PEC inviata: job={send_job_id} "
f"message_id_header={message_id_header}"
)
return {
"status": "sent",
"send_job_id": send_job_id,
"message_id": str(msg.id),
"message_id_header": message_id_header,
"attempt": current_attempt,
}
except Exception as smtp_error:
error_msg = str(smtp_error)
logger.warning(
f"[send_pec] Tentativo {current_attempt} fallito: {error_msg}"
)
# ── Gestione retry / failure ──────────────────────────────────────
if current_attempt >= MAX_ATTEMPTS:
# Esauriti tutti i tentativi → FAILED
job.status = "failed"
job.last_error = error_msg
msg.state = "failed"
await db.commit()
# Pubblica evento WS: invio fallito
if redis_client:
await _publish_ws_event(redis_client, msg.tenant_id, {
"type": "message:send_failed",
"message_id": str(msg.id),
"mailbox_id": str(msg.mailbox_id),
"subject": msg.subject,
"error": error_msg,
"attempts": current_attempt,
})
logger.error(
f"[send_pec] ❌ Invio FALLITO definitivamente: "
f"job={send_job_id}, errore: {error_msg}"
)
return {
"status": "failed",
"send_job_id": send_job_id,
"error": error_msg,
"attempts": current_attempt,
}
else:
# Retry con backoff
delay_seconds = RETRY_DELAYS[current_attempt - 1]
next_retry = datetime.now(tz=timezone.utc) + timedelta(seconds=delay_seconds)
job.status = "retrying"
job.last_error = error_msg
job.next_retry_at = next_retry
msg.state = "queued" # torna in coda
await db.commit()
# Re-enqueue con defer
try:
await redis_client.enqueue_job(
"send_pec",
send_job_id,
_defer_by=timedelta(seconds=delay_seconds),
)
logger.info(
f"[send_pec] Retry {current_attempt} schedulato "
f"in {delay_seconds}s per job {send_job_id}"
)
except Exception as enqueue_err:
logger.error(
f"[send_pec] Impossibile re-enqueue job {send_job_id}: "
f"{enqueue_err}"
)
return {
"status": "retrying",
"send_job_id": send_job_id,
"attempt": current_attempt,
"next_retry_at": next_retry.isoformat(),
"delay_seconds": delay_seconds,
"error": error_msg,
}
# ─── Helpers ──────────────────────────────────────────────────────────────────
async def _publish_ws_event(
redis_client: Any,
tenant_id: uuid_module.UUID,
event: dict,
) -> None:
"""Pubblica un evento WebSocket per il tenant tramite Redis pub/sub."""
try:
channel = f"ws:tenant:{tenant_id}"
await redis_client.publish(channel, json.dumps(event, default=str))
except Exception as e:
logger.warning(f"[send_pec] Errore pubblicazione WS: {e}")