mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
436 lines
16 KiB
Python
436 lines
16 KiB
Python
"""
|
||
IMAPConnection – gestione singola connessione IMAP con:
|
||
- IDLE con heartbeat 28 min (RFC 2177)
|
||
- Polling fallback ogni 60s se IDLE non supportato
|
||
- Backoff esponenziale su disconnessione
|
||
- Aggiornamento stato mailbox su N errori consecutivi
|
||
|
||
Architettura (ADR-003):
|
||
Un asyncio.Task per casella → overhead minimo, migliaia di caselle per host.
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import logging
|
||
from datetime import UTC, datetime
|
||
|
||
import aioimaplib
|
||
import redis.asyncio as aioredis
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.config import get_settings
|
||
from app.database import AsyncSessionLocal
|
||
from app.imap.reconnect import ExponentialBackoff
|
||
from app.imap.sync import sync_new_messages, sync_sent_messages
|
||
from app.models import Mailbox
|
||
|
||
logger = logging.getLogger(__name__)
|
||
settings = get_settings()
|
||
|
||
|
||
def _decrypt(enc: str) -> str:
|
||
"""Decifra un campo credenziale AES-256-GCM."""
|
||
import os
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
|
||
key = settings.encryption_key_bytes
|
||
aesgcm = AESGCM(key)
|
||
raw = base64.b64decode(enc.encode("ascii"))
|
||
nonce = raw[:12]
|
||
ciphertext_with_tag = raw[12:]
|
||
return aesgcm.decrypt(nonce, ciphertext_with_tag, None).decode("utf-8")
|
||
|
||
|
||
class IMAPConnection:
|
||
"""
|
||
Gestisce la connessione IMAP di una singola casella PEC.
|
||
|
||
Ciclo di vita:
|
||
1. Connessione IMAP (SSL o plain)
|
||
2. Login
|
||
3. SELECT INBOX
|
||
4. Sync iniziale (tutti i messaggi nuovi dall'ultimo UID noto)
|
||
5. IDLE loop (o polling se IDLE non disponibile)
|
||
6. Su EXISTS/EXPUNGE notify → fetch nuovi messaggi
|
||
7. Su errore → backoff → torna al punto 1
|
||
8. Su N errori consecutivi → imposta mailbox.status=error
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
mailbox_id: str,
|
||
redis_client: aioredis.Redis,
|
||
) -> None:
|
||
self.mailbox_id = mailbox_id
|
||
self.redis = redis_client
|
||
self._running = False
|
||
self._client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL | None = None
|
||
|
||
async def run(self) -> None:
|
||
"""
|
||
Loop principale della connessione IMAP.
|
||
Questo metodo non solleva mai eccezioni; gestisce internamente tutti gli errori.
|
||
"""
|
||
self._running = True
|
||
backoff = ExponentialBackoff(label=f"mailbox:{self.mailbox_id[:8]}")
|
||
|
||
while self._running:
|
||
try:
|
||
async with AsyncSessionLocal() as db:
|
||
# Carica mailbox dal DB
|
||
mailbox = await db.get(Mailbox, self.mailbox_id)
|
||
if not mailbox:
|
||
logger.error(
|
||
f"[{self.mailbox_id}] Mailbox non trovata in DB. Task terminato."
|
||
)
|
||
return
|
||
|
||
if mailbox.status in ("deleted", "paused"):
|
||
logger.info(
|
||
f"[{mailbox.email_address}] Status={mailbox.status}, task in pausa."
|
||
)
|
||
await asyncio.sleep(60)
|
||
continue
|
||
|
||
# Decifra credenziali
|
||
creds = self._decrypt_creds(mailbox)
|
||
|
||
# Connetti e sincronizza
|
||
await self._connect_and_run(mailbox, creds, db, backoff)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info(f"[{self.mailbox_id}] Task IMAP cancellato.")
|
||
self._running = False
|
||
return
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[{self.mailbox_id}] Errore inatteso nel loop IMAP: {e}",
|
||
exc_info=True,
|
||
)
|
||
await backoff.wait(e)
|
||
|
||
def stop(self) -> None:
|
||
"""Segnala al loop di terminare al prossimo ciclo."""
|
||
self._running = False
|
||
|
||
# ─── Connessione e loop interno ──────────────────────────────────────────
|
||
|
||
async def _connect_and_run(
|
||
self,
|
||
mailbox: Mailbox,
|
||
creds: dict,
|
||
db: AsyncSession,
|
||
backoff: ExponentialBackoff,
|
||
) -> None:
|
||
"""
|
||
Tenta la connessione IMAP. Se riesce, avvia il loop IDLE/polling.
|
||
Se fallisce, incrementa il contatore errori e aggiorna lo stato mailbox.
|
||
"""
|
||
try:
|
||
client = await self._connect(creds)
|
||
self._client = client
|
||
except asyncio.TimeoutError:
|
||
err_msg = f"Timeout connessione IMAP ({settings.imap_connect_timeout_seconds}s)"
|
||
await self._record_error(mailbox, db, err_msg)
|
||
await backoff.wait(TimeoutError(err_msg))
|
||
return
|
||
except Exception as e:
|
||
err_msg = str(e)
|
||
await self._record_error(mailbox, db, err_msg)
|
||
await backoff.wait(e)
|
||
return
|
||
|
||
# Connessione riuscita
|
||
backoff.reset()
|
||
await self._reset_error_state(mailbox, db)
|
||
|
||
# Sync iniziale INBOX: porta il DB aggiornato fino all'ultimo UID disponibile
|
||
logger.info(f"[{mailbox.email_address}] Sync iniziale INBOX...")
|
||
try:
|
||
n = await sync_new_messages(self._client, mailbox, db, self.redis)
|
||
if n > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] Sync iniziale INBOX completata: {n} messaggi nuovi"
|
||
)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[{mailbox.email_address}] Errore sync iniziale INBOX: {e}", exc_info=True
|
||
)
|
||
|
||
# Sync iniziale Sent: sincronizza la posta inviata storica
|
||
logger.info(f"[{mailbox.email_address}] Sync iniziale Sent...")
|
||
try:
|
||
ns = await sync_sent_messages(self._client, mailbox, db, self.redis)
|
||
if ns > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] Sync iniziale Sent completata: {ns} messaggi nuovi"
|
||
)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[{mailbox.email_address}] Errore sync iniziale Sent: {e}", exc_info=True
|
||
)
|
||
|
||
# Avvia IDLE o polling
|
||
supports_idle = self._supports_idle(client)
|
||
if supports_idle:
|
||
logger.info(f"[{mailbox.email_address}] Avvio IDLE loop")
|
||
await self._idle_loop(mailbox, db)
|
||
else:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] IDLE non supportato, avvio polling "
|
||
f"ogni {settings.imap_polling_interval_seconds}s"
|
||
)
|
||
await self._polling_loop(mailbox, db)
|
||
|
||
async def _connect(
|
||
self, creds: dict
|
||
) -> aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL:
|
||
"""Connette al server IMAP e fa login. Solleva eccezione su errore."""
|
||
host = creds["host"]
|
||
port = creds["port"]
|
||
user = creds["user"]
|
||
password = creds["password"]
|
||
use_ssl = creds["use_ssl"]
|
||
|
||
logger.info(f"Connessione IMAP {user}@{host}:{port} ssl={use_ssl}")
|
||
|
||
if use_ssl:
|
||
client = aioimaplib.IMAP4_SSL(
|
||
host=host,
|
||
port=port,
|
||
timeout=settings.imap_connect_timeout_seconds,
|
||
)
|
||
else:
|
||
client = aioimaplib.IMAP4(
|
||
host=host,
|
||
port=port,
|
||
timeout=settings.imap_connect_timeout_seconds,
|
||
)
|
||
|
||
await asyncio.wait_for(
|
||
client.wait_hello_from_server(),
|
||
timeout=settings.imap_connect_timeout_seconds,
|
||
)
|
||
|
||
status, _ = await client.login(user, password)
|
||
if status != "OK":
|
||
await client.logout()
|
||
raise ConnectionError(f"Login IMAP fallito: status={status}")
|
||
|
||
status, _ = await client.select("INBOX")
|
||
if status != "OK":
|
||
await client.logout()
|
||
raise ConnectionError(f"SELECT INBOX fallito: status={status}")
|
||
|
||
logger.info(f"IMAP connesso: {user}@{host}:{port}")
|
||
return client
|
||
|
||
def _supports_idle(
|
||
self, client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL
|
||
) -> bool:
|
||
"""Verifica se il server supporta IDLE."""
|
||
try:
|
||
caps = client.protocol.capabilities
|
||
return "IDLE" in caps
|
||
except Exception:
|
||
return False
|
||
|
||
# ─── IDLE loop ────────────────────────────────────────────────────────────
|
||
|
||
async def _idle_loop(self, mailbox: Mailbox, db: AsyncSession) -> None:
|
||
"""
|
||
Loop IMAP IDLE con heartbeat ogni 28 minuti (RFC 2177).
|
||
|
||
Quando il server segnala EXISTS (nuovi messaggi) → sync.
|
||
Ogni 28 minuti → DONE + re-IDLE per mantenere connessione viva.
|
||
"""
|
||
client = self._client
|
||
idle_timeout = settings.imap_idle_timeout_seconds # 28 min
|
||
|
||
while self._running:
|
||
try:
|
||
# Avvia IDLE
|
||
await client.idle_start(timeout=idle_timeout)
|
||
|
||
# Attendi server push con timeout (heartbeat)
|
||
try:
|
||
server_push = await asyncio.wait_for(
|
||
client.wait_server_push(),
|
||
timeout=float(idle_timeout),
|
||
)
|
||
except asyncio.TimeoutError:
|
||
# Heartbeat: nessun nuovo messaggio in 28 minuti → re-IDLE
|
||
server_push = []
|
||
|
||
# Termina IDLE
|
||
# Nota: in aioimaplib >= 2.0.0 idle_done() e' sincrona (non coroutine)
|
||
client.idle_done()
|
||
|
||
# Controlla se ci sono nuovi messaggi (EXISTS)
|
||
has_new = any(
|
||
b"EXISTS" in (line if isinstance(line, bytes) else line.encode())
|
||
for line in server_push
|
||
if line
|
||
)
|
||
|
||
# Ricarica mailbox dal DB prima delle sync
|
||
await db.refresh(mailbox)
|
||
|
||
if has_new:
|
||
logger.debug(
|
||
f"[{mailbox.email_address}] EXISTS ricevuto, sync INBOX..."
|
||
)
|
||
n = await sync_new_messages(client, mailbox, db, self.redis)
|
||
if n > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] {n} nuovi messaggi INBOX sincronizzati"
|
||
)
|
||
|
||
# Sync Sent ad ogni ciclo IDLE (heartbeat ~28 min o su EXISTS)
|
||
try:
|
||
ns = await sync_sent_messages(client, mailbox, db, self.redis)
|
||
if ns > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] {ns} nuovi messaggi Sent sincronizzati"
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"[{mailbox.email_address}] Errore sync Sent in IDLE loop: {e}"
|
||
)
|
||
# sync_sent_messages garantisce il ritorno in INBOX anche in caso di errore
|
||
|
||
except asyncio.CancelledError:
|
||
try:
|
||
client.idle_done()
|
||
except Exception:
|
||
pass
|
||
return
|
||
except (ConnectionError, IOError, OSError) as e:
|
||
logger.warning(
|
||
f"[{mailbox.email_address}] Connessione IDLE persa: {e}"
|
||
)
|
||
raise # propaga al loop esterno per backoff
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[{mailbox.email_address}] Errore IDLE loop: {e}", exc_info=True
|
||
)
|
||
raise
|
||
|
||
# ─── Polling loop ─────────────────────────────────────────────────────────
|
||
|
||
async def _polling_loop(self, mailbox: Mailbox, db: AsyncSession) -> None:
|
||
"""
|
||
Polling IMAP ogni N secondi quando IDLE non è supportato.
|
||
Esegue NOOP + SEARCH UID per verificare nuovi messaggi.
|
||
"""
|
||
client = self._client
|
||
interval = settings.imap_polling_interval_seconds
|
||
|
||
while self._running:
|
||
try:
|
||
await asyncio.sleep(interval)
|
||
|
||
if not self._running:
|
||
break
|
||
|
||
# NOOP per mantenere connessione viva
|
||
try:
|
||
await client.noop()
|
||
except Exception:
|
||
raise ConnectionError("Connessione IMAP persa durante NOOP")
|
||
|
||
# Ricarica mailbox e controlla nuovi UID INBOX
|
||
await db.refresh(mailbox)
|
||
n = await sync_new_messages(client, mailbox, db, self.redis)
|
||
if n > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] Polling INBOX: {n} nuovi messaggi"
|
||
)
|
||
|
||
# Sync Sent ad ogni ciclo di polling
|
||
try:
|
||
ns = await sync_sent_messages(client, mailbox, db, self.redis)
|
||
if ns > 0:
|
||
logger.info(
|
||
f"[{mailbox.email_address}] Polling Sent: {ns} nuovi messaggi"
|
||
)
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"[{mailbox.email_address}] Errore sync Sent in polling loop: {e}"
|
||
)
|
||
|
||
except asyncio.CancelledError:
|
||
return
|
||
except (ConnectionError, IOError, OSError) as e:
|
||
logger.warning(
|
||
f"[{mailbox.email_address}] Connessione polling persa: {e}"
|
||
)
|
||
raise
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[{mailbox.email_address}] Errore polling loop: {e}", exc_info=True
|
||
)
|
||
raise
|
||
|
||
# ─── Error management ─────────────────────────────────────────────────────
|
||
|
||
async def _record_error(
|
||
self, mailbox: Mailbox, db: AsyncSession, error_msg: str
|
||
) -> None:
|
||
"""
|
||
Incrementa sync_error_count. Se supera il limite → status=error.
|
||
Pubblica evento Redis di errore.
|
||
"""
|
||
import json
|
||
|
||
mailbox.sync_error_count += 1
|
||
mailbox.sync_error_msg = error_msg[:500]
|
||
|
||
if mailbox.sync_error_count >= settings.imap_max_error_count:
|
||
if mailbox.status != "error":
|
||
mailbox.status = "error"
|
||
logger.error(
|
||
f"[{mailbox.email_address}] Troppe anomalie "
|
||
f"({mailbox.sync_error_count}), status=error"
|
||
)
|
||
# Pubblica evento WebSocket di errore
|
||
try:
|
||
event = {
|
||
"type": "mailbox:sync_error",
|
||
"mailbox_id": str(mailbox.id),
|
||
"error": error_msg,
|
||
"status": "error",
|
||
}
|
||
channel = f"ws:tenant:{mailbox.tenant_id}"
|
||
await self.redis.publish(channel, json.dumps(event))
|
||
except Exception:
|
||
pass
|
||
|
||
await db.flush()
|
||
await db.commit()
|
||
|
||
async def _reset_error_state(
|
||
self, mailbox: Mailbox, db: AsyncSession
|
||
) -> None:
|
||
"""Resetta il contatore errori dopo una connessione riuscita."""
|
||
if mailbox.sync_error_count > 0 or mailbox.status == "error":
|
||
mailbox.sync_error_count = 0
|
||
mailbox.sync_error_msg = None
|
||
if mailbox.status == "error":
|
||
mailbox.status = "active"
|
||
await db.flush()
|
||
await db.commit()
|
||
|
||
# ─── Decrypt credentials ──────────────────────────────────────────────────
|
||
|
||
@staticmethod
|
||
def _decrypt_creds(mailbox: Mailbox) -> dict:
|
||
"""Decifra le credenziali IMAP dalla mailbox."""
|
||
return {
|
||
"host": _decrypt(mailbox.imap_host_enc),
|
||
"port": int(_decrypt(mailbox.imap_port_enc)),
|
||
"user": _decrypt(mailbox.imap_user_enc),
|
||
"password": _decrypt(mailbox.imap_pass_enc),
|
||
"use_ssl": mailbox.imap_use_ssl,
|
||
}
|