From d8d06162a8bbcea481b5c137bd64a195b54e8c57 Mon Sep 17 00:00:00 2001 From: idrainformatica Date: Tue, 7 Apr 2026 11:32:03 +0200 Subject: [PATCH] Fix parsing --- worker/app/imap/connection.py | 109 +++++++++++++++++++------------ worker/app/jobs/send_pec.py | 15 +++++ worker/app/parsers/pec_parser.py | 9 ++- 3 files changed, 91 insertions(+), 42 deletions(-) diff --git a/worker/app/imap/connection.py b/worker/app/imap/connection.py index b167d70..e6377c8 100644 --- a/worker/app/imap/connection.py +++ b/worker/app/imap/connection.py @@ -241,16 +241,27 @@ class IMAPConnection: """ Loop IMAP IDLE con heartbeat ogni 28 minuti (RFC 2177). - Quando il server segnala EXISTS (nuovi messaggi) → sync. - Ogni 28 minuti → DONE + re-IDLE per mantenere connessione viva. + Ad ogni ciclo (sia su EXISTS che su heartbeat) sincronizza sempre INBOX e Sent. + Usare sempre EXISTS come hint ma non come unico trigger: la connessione IDLE + puo' diventare stale dopo ore/giorni e le notifiche EXISTS possono essere perse. + Ogni ciclo usa una sessione DB fresca per evitare problemi con sessioni long-lived. """ client = self._client idle_timeout = settings.imap_idle_timeout_seconds # 28 min + # Timeout massimo per stabilire IDLE (se la connessione e' stale, idle_start + # puo' bloccare indefinitamente aspettando il "+" dal server) + idle_start_timeout = 30.0 while self._running: try: - # Avvia IDLE - await client.idle_start(timeout=idle_timeout) + # Avvia IDLE con timeout esplicito: se la connessione TCP e' stale, + # idle_start() blocca l'event loop aspettando "+" dal server. + # wait_for garantisce che dopo idle_start_timeout secondi venga + # sollevato asyncio.TimeoutError -> reconnect via loop esterno. + await asyncio.wait_for( + client.idle_start(timeout=idle_timeout), + timeout=idle_start_timeout, + ) # Attendi server push con timeout (heartbeat) try: @@ -259,45 +270,55 @@ class IMAPConnection: timeout=float(idle_timeout), ) except asyncio.TimeoutError: - # Heartbeat: nessun nuovo messaggio in 28 minuti → re-IDLE + # Heartbeat: nessuna notifica in 28 minuti → ri-IDLE server_push = [] # Termina IDLE # Nota: in aioimaplib >= 2.0.0 idle_done() e' sincrona (non coroutine) client.idle_done() - # Controlla se ci sono nuovi messaggi (EXISTS) + # Controlla se ci sono nuovi messaggi (EXISTS) – solo per logging has_new = any( b"EXISTS" in (line if isinstance(line, bytes) else line.encode()) for line in server_push if line ) - # Ricarica mailbox dal DB prima delle sync - await db.refresh(mailbox) - if has_new: logger.debug( f"[{mailbox.email_address}] EXISTS ricevuto, sync INBOX..." ) - n = await sync_new_messages(client, mailbox, db, self.redis) + + # Sessione DB fresca per ogni ciclo: evita session stale dopo ore/giorni + async with AsyncSessionLocal() as cycle_db: + fresh_mailbox = await cycle_db.get(Mailbox, self.mailbox_id) + if not fresh_mailbox: + logger.error( + f"[{self.mailbox_id}] Mailbox non trovata durante IDLE loop. Task terminato." + ) + return + + # Sync INBOX ad ogni ciclo (sia su EXISTS che su heartbeat) + # Garantisce che ricevute e messaggi non vengano mai persi + # anche se la connessione IDLE ha perso notifiche EXISTS + n = await sync_new_messages(client, fresh_mailbox, cycle_db, self.redis) if n > 0: logger.info( - f"[{mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati" + f"[{fresh_mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati" ) - # Sync Sent ad ogni ciclo IDLE (heartbeat ~28 min o su EXISTS) - try: - ns = await sync_sent_messages(client, mailbox, db, self.redis) - if ns > 0: - logger.info( - f"[{mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati" + # Sync Sent ad ogni ciclo + try: + ns = await sync_sent_messages(client, fresh_mailbox, cycle_db, self.redis) + if ns > 0: + logger.info( + f"[{fresh_mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati" + ) + except Exception as e: + logger.warning( + f"[{fresh_mailbox.email_address}] Errore sync Sent in IDLE loop: {e}" ) - except Exception as e: - logger.warning( - f"[{mailbox.email_address}] Errore sync Sent in IDLE loop: {e}" - ) - # sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore + # sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore except asyncio.CancelledError: try: @@ -322,6 +343,7 @@ class IMAPConnection: """ Polling IMAP ogni N secondi quando IDLE non è supportato. Esegue NOOP + SEARCH UID per verificare nuovi messaggi. + Usa una sessione DB fresca per ogni ciclo per evitare sessioni stale. """ client = self._client interval = settings.imap_polling_interval_seconds @@ -339,25 +361,32 @@ class IMAPConnection: except Exception: raise ConnectionError("Connessione IMAP persa durante NOOP") - # Ricarica mailbox e controlla nuovi UID INBOX - await db.refresh(mailbox) - n = await sync_new_messages(client, mailbox, db, self.redis) - if n > 0: - logger.info( - f"[{mailbox.email_address}] Polling INBOX: {n} nuovi messaggi" - ) - - # Sync Sent ad ogni ciclo di polling - try: - ns = await sync_sent_messages(client, mailbox, db, self.redis) - if ns > 0: - logger.info( - f"[{mailbox.email_address}] Polling Sent: {ns} nuovi messaggi" + # Sessione DB fresca per ogni ciclo + async with AsyncSessionLocal() as cycle_db: + fresh_mailbox = await cycle_db.get(Mailbox, self.mailbox_id) + if not fresh_mailbox: + logger.error( + f"[{self.mailbox_id}] Mailbox non trovata durante polling loop. Task terminato." + ) + return + + n = await sync_new_messages(client, fresh_mailbox, cycle_db, self.redis) + if n > 0: + logger.info( + f"[{fresh_mailbox.email_address}] Polling INBOX: {n} nuovi messaggi" + ) + + # Sync Sent ad ogni ciclo di polling + try: + ns = await sync_sent_messages(client, fresh_mailbox, cycle_db, self.redis) + if ns > 0: + logger.info( + f"[{fresh_mailbox.email_address}] Polling Sent: {ns} nuovi messaggi" + ) + except Exception as e: + logger.warning( + f"[{fresh_mailbox.email_address}] Errore sync Sent in polling loop: {e}" ) - except Exception as e: - logger.warning( - f"[{mailbox.email_address}] Errore sync Sent in polling loop: {e}" - ) except asyncio.CancelledError: return diff --git a/worker/app/jobs/send_pec.py b/worker/app/jobs/send_pec.py index d010587..476cf74 100644 --- a/worker/app/jobs/send_pec.py +++ b/worker/app/jobs/send_pec.py @@ -182,6 +182,21 @@ async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict: "message_id_header": message_id_header, }) + # ── Enqueue sync_mailbox dopo 60s per rilevare ricevute rapidamente ── + # La connessione IMAP IDLE potrebbe avere un heartbeat di 28 minuti; + # forziamo un sync immediato per non attendere il prossimo ciclo IDLE. + try: + await redis_client.enqueue_job( + "sync_mailbox", + str(mailbox.id), + _defer_by=timedelta(seconds=60), + ) + logger.info( + f"[send_pec] sync_mailbox schedulato per {mailbox.id} tra 60s" + ) + except Exception as e: + logger.warning(f"[send_pec] Errore enqueue sync_mailbox post-invio: {e}") + # ── Enqueue watch_receipt dopo 24h ──────────────────────────────── try: await redis_client.enqueue_job( diff --git a/worker/app/parsers/pec_parser.py b/worker/app/parsers/pec_parser.py index efb577d..a405aa5 100644 --- a/worker/app/parsers/pec_parser.py +++ b/worker/app/parsers/pec_parser.py @@ -14,6 +14,7 @@ Il risultato della classificazione determina: State machine messaggi outbound: sent/queued → accepted (ricevuta di accettazione o presa in carico) + sent/queued → delivered (ricevuta di consegna arrivata prima dell'accettazione) accepted → delivered (ricevuta di avvenuta consegna) any valid → anomaly (non-accettazione, mancata consegna, errore, virus) """ @@ -146,9 +147,13 @@ _RECEIPT_TO_STATE: dict[str, str] = { # Transizioni di stato valide per ciascuno stato corrente # Solo le transizioni in avanti sono permesse (no retrocessioni) +# NOTA: "delivered" e' ammesso anche da "sent"/"queued" perche' i gestori PEC +# possono inviare la ricevuta di consegna prima di quella di accettazione +# (o in assenza della ricevuta di accettazione). Se arriva prima la consegna +# lo stato deve diventare "delivered" indipendentemente dall'ordine di arrivo. VALID_OUTBOUND_TRANSITIONS: dict[str, set[str]] = { - "queued": {"accepted", "anomaly"}, - "sent": {"accepted", "anomaly"}, + "queued": {"accepted", "delivered", "anomaly"}, + "sent": {"accepted", "delivered", "anomaly"}, "accepted": {"delivered", "anomaly"}, # delivered e anomaly sono terminali: nessuna transizione }