This commit is contained in:
2026-06-04 16:38:24 +02:00
3 changed files with 91 additions and 42 deletions
+69 -40
View File
@@ -241,16 +241,27 @@ class IMAPConnection:
"""
Loop IMAP IDLE con heartbeat ogni 28 minuti (RFC 2177).
Quando il server segnala EXISTS (nuovi messaggi) → sync.
Ogni 28 minuti → DONE + re-IDLE per mantenere connessione viva.
Ad ogni ciclo (sia su EXISTS che su heartbeat) sincronizza sempre INBOX e Sent.
Usare sempre EXISTS come hint ma non come unico trigger: la connessione IDLE
puo' diventare stale dopo ore/giorni e le notifiche EXISTS possono essere perse.
Ogni ciclo usa una sessione DB fresca per evitare problemi con sessioni long-lived.
"""
client = self._client
idle_timeout = settings.imap_idle_timeout_seconds # 28 min
# Timeout massimo per stabilire IDLE (se la connessione e' stale, idle_start
# puo' bloccare indefinitamente aspettando il "+" dal server)
idle_start_timeout = 30.0
while self._running:
try:
# Avvia IDLE
await client.idle_start(timeout=idle_timeout)
# Avvia IDLE con timeout esplicito: se la connessione TCP e' stale,
# idle_start() blocca l'event loop aspettando "+" dal server.
# wait_for garantisce che dopo idle_start_timeout secondi venga
# sollevato asyncio.TimeoutError -> reconnect via loop esterno.
await asyncio.wait_for(
client.idle_start(timeout=idle_timeout),
timeout=idle_start_timeout,
)
# Attendi server push con timeout (heartbeat)
try:
@@ -259,45 +270,55 @@ class IMAPConnection:
timeout=float(idle_timeout),
)
except asyncio.TimeoutError:
# Heartbeat: nessun nuovo messaggio in 28 minuti → re-IDLE
# Heartbeat: nessuna notifica in 28 minuti → ri-IDLE
server_push = []
# Termina IDLE
# Nota: in aioimaplib >= 2.0.0 idle_done() e' sincrona (non coroutine)
client.idle_done()
# Controlla se ci sono nuovi messaggi (EXISTS)
# Controlla se ci sono nuovi messaggi (EXISTS) solo per logging
has_new = any(
b"EXISTS" in (line if isinstance(line, bytes) else line.encode())
for line in server_push
if line
)
# Ricarica mailbox dal DB prima delle sync
await db.refresh(mailbox)
if has_new:
logger.debug(
f"[{mailbox.email_address}] EXISTS ricevuto, sync INBOX..."
)
n = await sync_new_messages(client, mailbox, db, self.redis)
# Sessione DB fresca per ogni ciclo: evita session stale dopo ore/giorni
async with AsyncSessionLocal() as cycle_db:
fresh_mailbox = await cycle_db.get(Mailbox, self.mailbox_id)
if not fresh_mailbox:
logger.error(
f"[{self.mailbox_id}] Mailbox non trovata durante IDLE loop. Task terminato."
)
return
# Sync INBOX ad ogni ciclo (sia su EXISTS che su heartbeat)
# Garantisce che ricevute e messaggi non vengano mai persi
# anche se la connessione IDLE ha perso notifiche EXISTS
n = await sync_new_messages(client, fresh_mailbox, cycle_db, self.redis)
if n > 0:
logger.info(
f"[{mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati"
f"[{fresh_mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati"
)
# Sync Sent ad ogni ciclo IDLE (heartbeat ~28 min o su EXISTS)
try:
ns = await sync_sent_messages(client, mailbox, db, self.redis)
if ns > 0:
logger.info(
f"[{mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati"
# Sync Sent ad ogni ciclo
try:
ns = await sync_sent_messages(client, fresh_mailbox, cycle_db, self.redis)
if ns > 0:
logger.info(
f"[{fresh_mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati"
)
except Exception as e:
logger.warning(
f"[{fresh_mailbox.email_address}] Errore sync Sent in IDLE loop: {e}"
)
except Exception as e:
logger.warning(
f"[{mailbox.email_address}] Errore sync Sent in IDLE loop: {e}"
)
# sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore
# sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore
except asyncio.CancelledError:
try:
@@ -322,6 +343,7 @@ class IMAPConnection:
"""
Polling IMAP ogni N secondi quando IDLE non è supportato.
Esegue NOOP + SEARCH UID per verificare nuovi messaggi.
Usa una sessione DB fresca per ogni ciclo per evitare sessioni stale.
"""
client = self._client
interval = settings.imap_polling_interval_seconds
@@ -339,25 +361,32 @@ class IMAPConnection:
except Exception:
raise ConnectionError("Connessione IMAP persa durante NOOP")
# Ricarica mailbox e controlla nuovi UID INBOX
await db.refresh(mailbox)
n = await sync_new_messages(client, mailbox, db, self.redis)
if n > 0:
logger.info(
f"[{mailbox.email_address}] Polling INBOX: {n} nuovi messaggi"
)
# Sync Sent ad ogni ciclo di polling
try:
ns = await sync_sent_messages(client, mailbox, db, self.redis)
if ns > 0:
logger.info(
f"[{mailbox.email_address}] Polling Sent: {ns} nuovi messaggi"
# Sessione DB fresca per ogni ciclo
async with AsyncSessionLocal() as cycle_db:
fresh_mailbox = await cycle_db.get(Mailbox, self.mailbox_id)
if not fresh_mailbox:
logger.error(
f"[{self.mailbox_id}] Mailbox non trovata durante polling loop. Task terminato."
)
return
n = await sync_new_messages(client, fresh_mailbox, cycle_db, self.redis)
if n > 0:
logger.info(
f"[{fresh_mailbox.email_address}] Polling INBOX: {n} nuovi messaggi"
)
# Sync Sent ad ogni ciclo di polling
try:
ns = await sync_sent_messages(client, fresh_mailbox, cycle_db, self.redis)
if ns > 0:
logger.info(
f"[{fresh_mailbox.email_address}] Polling Sent: {ns} nuovi messaggi"
)
except Exception as e:
logger.warning(
f"[{fresh_mailbox.email_address}] Errore sync Sent in polling loop: {e}"
)
except Exception as e:
logger.warning(
f"[{mailbox.email_address}] Errore sync Sent in polling loop: {e}"
)
except asyncio.CancelledError:
return
+15
View File
@@ -182,6 +182,21 @@ async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict:
"message_id_header": message_id_header,
})
# ── Enqueue sync_mailbox dopo 60s per rilevare ricevute rapidamente ──
# La connessione IMAP IDLE potrebbe avere un heartbeat di 28 minuti;
# forziamo un sync immediato per non attendere il prossimo ciclo IDLE.
try:
await redis_client.enqueue_job(
"sync_mailbox",
str(mailbox.id),
_defer_by=timedelta(seconds=60),
)
logger.info(
f"[send_pec] sync_mailbox schedulato per {mailbox.id} tra 60s"
)
except Exception as e:
logger.warning(f"[send_pec] Errore enqueue sync_mailbox post-invio: {e}")
# ── Enqueue watch_receipt dopo 24h ────────────────────────────────
try:
await redis_client.enqueue_job(
+7 -2
View File
@@ -14,6 +14,7 @@ Il risultato della classificazione determina:
State machine messaggi outbound:
sent/queued → accepted (ricevuta di accettazione o presa in carico)
sent/queued → delivered (ricevuta di consegna arrivata prima dell'accettazione)
accepted → delivered (ricevuta di avvenuta consegna)
any valid → anomaly (non-accettazione, mancata consegna, errore, virus)
"""
@@ -146,9 +147,13 @@ _RECEIPT_TO_STATE: dict[str, str] = {
# Transizioni di stato valide per ciascuno stato corrente
# Solo le transizioni in avanti sono permesse (no retrocessioni)
# NOTA: "delivered" e' ammesso anche da "sent"/"queued" perche' i gestori PEC
# possono inviare la ricevuta di consegna prima di quella di accettazione
# (o in assenza della ricevuta di accettazione). Se arriva prima la consegna
# lo stato deve diventare "delivered" indipendentemente dall'ordine di arrivo.
VALID_OUTBOUND_TRANSITIONS: dict[str, set[str]] = {
"queued": {"accepted", "anomaly"},
"sent": {"accepted", "anomaly"},
"queued": {"accepted", "delivered", "anomaly"},
"sent": {"accepted", "delivered", "anomaly"},
"accepted": {"delivered", "anomaly"},
# delivered e anomaly sono terminali: nessuna transizione
}