Files
PecHub/worker/app/imap/connection.py
T
2026-03-19 11:41:10 +01:00

435 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
IMAPConnection gestione singola connessione IMAP con:
- IDLE con heartbeat 28 min (RFC 2177)
- Polling fallback ogni 60s se IDLE non supportato
- Backoff esponenziale su disconnessione
- Aggiornamento stato mailbox su N errori consecutivi
Architettura (ADR-003):
Un asyncio.Task per casella → overhead minimo, migliaia di caselle per host.
"""
import asyncio
import base64
import logging
from datetime import UTC, datetime
import aioimaplib
import redis.asyncio as aioredis
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.database import AsyncSessionLocal
from app.imap.reconnect import ExponentialBackoff
from app.imap.sync import sync_new_messages, sync_sent_messages
from app.models import Mailbox
logger = logging.getLogger(__name__)
settings = get_settings()
def _decrypt(enc: str) -> str:
"""Decifra un campo credenziale AES-256-GCM."""
import os
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = settings.encryption_key_bytes
aesgcm = AESGCM(key)
raw = base64.b64decode(enc.encode("ascii"))
nonce = raw[:12]
ciphertext_with_tag = raw[12:]
return aesgcm.decrypt(nonce, ciphertext_with_tag, None).decode("utf-8")
class IMAPConnection:
"""
Gestisce la connessione IMAP di una singola casella PEC.
Ciclo di vita:
1. Connessione IMAP (SSL o plain)
2. Login
3. SELECT INBOX
4. Sync iniziale (tutti i messaggi nuovi dall'ultimo UID noto)
5. IDLE loop (o polling se IDLE non disponibile)
6. Su EXISTS/EXPUNGE notify → fetch nuovi messaggi
7. Su errore → backoff → torna al punto 1
8. Su N errori consecutivi → imposta mailbox.status=error
"""
def __init__(
self,
mailbox_id: str,
redis_client: aioredis.Redis,
) -> None:
self.mailbox_id = mailbox_id
self.redis = redis_client
self._running = False
self._client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL | None = None
async def run(self) -> None:
"""
Loop principale della connessione IMAP.
Questo metodo non solleva mai eccezioni; gestisce internamente tutti gli errori.
"""
self._running = True
backoff = ExponentialBackoff(label=f"mailbox:{self.mailbox_id[:8]}")
while self._running:
try:
async with AsyncSessionLocal() as db:
# Carica mailbox dal DB
mailbox = await db.get(Mailbox, self.mailbox_id)
if not mailbox:
logger.error(
f"[{self.mailbox_id}] Mailbox non trovata in DB. Task terminato."
)
return
if mailbox.status in ("deleted", "paused"):
logger.info(
f"[{mailbox.email_address}] Status={mailbox.status}, task in pausa."
)
await asyncio.sleep(60)
continue
# Decifra credenziali
creds = self._decrypt_creds(mailbox)
# Connetti e sincronizza
await self._connect_and_run(mailbox, creds, db, backoff)
except asyncio.CancelledError:
logger.info(f"[{self.mailbox_id}] Task IMAP cancellato.")
self._running = False
return
except Exception as e:
logger.error(
f"[{self.mailbox_id}] Errore inatteso nel loop IMAP: {e}",
exc_info=True,
)
await backoff.wait(e)
def stop(self) -> None:
"""Segnala al loop di terminare al prossimo ciclo."""
self._running = False
# ─── Connessione e loop interno ──────────────────────────────────────────
async def _connect_and_run(
self,
mailbox: Mailbox,
creds: dict,
db: AsyncSession,
backoff: ExponentialBackoff,
) -> None:
"""
Tenta la connessione IMAP. Se riesce, avvia il loop IDLE/polling.
Se fallisce, incrementa il contatore errori e aggiorna lo stato mailbox.
"""
try:
client = await self._connect(creds)
self._client = client
except asyncio.TimeoutError:
err_msg = f"Timeout connessione IMAP ({settings.imap_connect_timeout_seconds}s)"
await self._record_error(mailbox, db, err_msg)
await backoff.wait(TimeoutError(err_msg))
return
except Exception as e:
err_msg = str(e)
await self._record_error(mailbox, db, err_msg)
await backoff.wait(e)
return
# Connessione riuscita
backoff.reset()
await self._reset_error_state(mailbox, db)
# Sync iniziale INBOX: porta il DB aggiornato fino all'ultimo UID disponibile
logger.info(f"[{mailbox.email_address}] Sync iniziale INBOX...")
try:
n = await sync_new_messages(self._client, mailbox, db, self.redis)
if n > 0:
logger.info(
f"[{mailbox.email_address}] Sync iniziale INBOX completata: {n} messaggi nuovi"
)
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore sync iniziale INBOX: {e}", exc_info=True
)
# Sync iniziale Sent: sincronizza la posta inviata storica
logger.info(f"[{mailbox.email_address}] Sync iniziale Sent...")
try:
ns = await sync_sent_messages(self._client, mailbox, db, self.redis)
if ns > 0:
logger.info(
f"[{mailbox.email_address}] Sync iniziale Sent completata: {ns} messaggi nuovi"
)
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore sync iniziale Sent: {e}", exc_info=True
)
# Avvia IDLE o polling
supports_idle = self._supports_idle(client)
if supports_idle:
logger.info(f"[{mailbox.email_address}] Avvio IDLE loop")
await self._idle_loop(mailbox, db)
else:
logger.info(
f"[{mailbox.email_address}] IDLE non supportato, avvio polling "
f"ogni {settings.imap_polling_interval_seconds}s"
)
await self._polling_loop(mailbox, db)
async def _connect(
self, creds: dict
) -> aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL:
"""Connette al server IMAP e fa login. Solleva eccezione su errore."""
host = creds["host"]
port = creds["port"]
user = creds["user"]
password = creds["password"]
use_ssl = creds["use_ssl"]
logger.info(f"Connessione IMAP {user}@{host}:{port} ssl={use_ssl}")
if use_ssl:
client = aioimaplib.IMAP4_SSL(
host=host,
port=port,
timeout=settings.imap_connect_timeout_seconds,
)
else:
client = aioimaplib.IMAP4(
host=host,
port=port,
timeout=settings.imap_connect_timeout_seconds,
)
await asyncio.wait_for(
client.wait_hello_from_server(),
timeout=settings.imap_connect_timeout_seconds,
)
status, _ = await client.login(user, password)
if status != "OK":
await client.logout()
raise ConnectionError(f"Login IMAP fallito: status={status}")
status, _ = await client.select("INBOX")
if status != "OK":
await client.logout()
raise ConnectionError(f"SELECT INBOX fallito: status={status}")
logger.info(f"IMAP connesso: {user}@{host}:{port}")
return client
def _supports_idle(
self, client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL
) -> bool:
"""Verifica se il server supporta IDLE."""
try:
caps = client.protocol.capabilities
return "IDLE" in caps
except Exception:
return False
# ─── IDLE loop ────────────────────────────────────────────────────────────
async def _idle_loop(self, mailbox: Mailbox, db: AsyncSession) -> None:
"""
Loop IMAP IDLE con heartbeat ogni 28 minuti (RFC 2177).
Quando il server segnala EXISTS (nuovi messaggi) → sync.
Ogni 28 minuti → DONE + re-IDLE per mantenere connessione viva.
"""
client = self._client
idle_timeout = settings.imap_idle_timeout_seconds # 28 min
while self._running:
try:
# Avvia IDLE
await client.idle_start(timeout=idle_timeout)
# Attendi server push con timeout (heartbeat)
try:
server_push = await asyncio.wait_for(
client.wait_server_push(),
timeout=float(idle_timeout),
)
except asyncio.TimeoutError:
# Heartbeat: nessun nuovo messaggio in 28 minuti → re-IDLE
server_push = []
# Termina IDLE
await client.idle_done()
# Controlla se ci sono nuovi messaggi (EXISTS)
has_new = any(
b"EXISTS" in (line if isinstance(line, bytes) else line.encode())
for line in server_push
if line
)
# Ricarica mailbox dal DB prima delle sync
await db.refresh(mailbox)
if has_new:
logger.debug(
f"[{mailbox.email_address}] EXISTS ricevuto, sync INBOX..."
)
n = await sync_new_messages(client, mailbox, db, self.redis)
if n > 0:
logger.info(
f"[{mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati"
)
# Sync Sent ad ogni ciclo IDLE (heartbeat ~28 min o su EXISTS)
try:
ns = await sync_sent_messages(client, mailbox, db, self.redis)
if ns > 0:
logger.info(
f"[{mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati"
)
except Exception as e:
logger.warning(
f"[{mailbox.email_address}] Errore sync Sent in IDLE loop: {e}"
)
# sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore
except asyncio.CancelledError:
try:
await client.idle_done()
except Exception:
pass
return
except (ConnectionError, IOError, OSError) as e:
logger.warning(
f"[{mailbox.email_address}] Connessione IDLE persa: {e}"
)
raise # propaga al loop esterno per backoff
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore IDLE loop: {e}", exc_info=True
)
raise
# ─── Polling loop ─────────────────────────────────────────────────────────
async def _polling_loop(self, mailbox: Mailbox, db: AsyncSession) -> None:
"""
Polling IMAP ogni N secondi quando IDLE non è supportato.
Esegue NOOP + SEARCH UID per verificare nuovi messaggi.
"""
client = self._client
interval = settings.imap_polling_interval_seconds
while self._running:
try:
await asyncio.sleep(interval)
if not self._running:
break
# NOOP per mantenere connessione viva
try:
await client.noop()
except Exception:
raise ConnectionError("Connessione IMAP persa durante NOOP")
# Ricarica mailbox e controlla nuovi UID INBOX
await db.refresh(mailbox)
n = await sync_new_messages(client, mailbox, db, self.redis)
if n > 0:
logger.info(
f"[{mailbox.email_address}] Polling INBOX: {n} nuovi messaggi"
)
# Sync Sent ad ogni ciclo di polling
try:
ns = await sync_sent_messages(client, mailbox, db, self.redis)
if ns > 0:
logger.info(
f"[{mailbox.email_address}] Polling Sent: {ns} nuovi messaggi"
)
except Exception as e:
logger.warning(
f"[{mailbox.email_address}] Errore sync Sent in polling loop: {e}"
)
except asyncio.CancelledError:
return
except (ConnectionError, IOError, OSError) as e:
logger.warning(
f"[{mailbox.email_address}] Connessione polling persa: {e}"
)
raise
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore polling loop: {e}", exc_info=True
)
raise
# ─── Error management ─────────────────────────────────────────────────────
async def _record_error(
self, mailbox: Mailbox, db: AsyncSession, error_msg: str
) -> None:
"""
Incrementa sync_error_count. Se supera il limite → status=error.
Pubblica evento Redis di errore.
"""
import json
mailbox.sync_error_count += 1
mailbox.sync_error_msg = error_msg[:500]
if mailbox.sync_error_count >= settings.imap_max_error_count:
if mailbox.status != "error":
mailbox.status = "error"
logger.error(
f"[{mailbox.email_address}] Troppe anomalie "
f"({mailbox.sync_error_count}), status=error"
)
# Pubblica evento WebSocket di errore
try:
event = {
"type": "mailbox:sync_error",
"mailbox_id": str(mailbox.id),
"error": error_msg,
"status": "error",
}
channel = f"ws:tenant:{mailbox.tenant_id}"
await self.redis.publish(channel, json.dumps(event))
except Exception:
pass
await db.flush()
await db.commit()
async def _reset_error_state(
self, mailbox: Mailbox, db: AsyncSession
) -> None:
"""Resetta il contatore errori dopo una connessione riuscita."""
if mailbox.sync_error_count > 0 or mailbox.status == "error":
mailbox.sync_error_count = 0
mailbox.sync_error_msg = None
if mailbox.status == "error":
mailbox.status = "active"
await db.flush()
await db.commit()
# ─── Decrypt credentials ──────────────────────────────────────────────────
@staticmethod
def _decrypt_creds(mailbox: Mailbox) -> dict:
"""Decifra le credenziali IMAP dalla mailbox."""
return {
"host": _decrypt(mailbox.imap_host_enc),
"port": int(_decrypt(mailbox.imap_port_enc)),
"user": _decrypt(mailbox.imap_user_enc),
"password": _decrypt(mailbox.imap_pass_enc),
"use_ssl": mailbox.imap_use_ssl,
}