Files
PecHub/worker/app/imap/pool.py
T
2026-03-18 17:30:13 +01:00

276 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MailboxPool orchestratore async per N caselle IMAP in parallelo.
All'avvio del worker:
1. Carica tutte le mailbox con status='active' dal DB
2. Avvia un asyncio.Task per ogni casella (IMAPConnection.run)
3. Monitora i task: se uno muore, lo riavvia dopo un breve delay
4. Osserva eventi Redis per caselle aggiunte/rimosse/aggiornate a runtime
ADR-003: Un task async per casella overhead < 10MB per casella.
"""
import asyncio
import json
import logging
import uuid
from typing import Any
import redis.asyncio as aioredis
from sqlalchemy import select
from app.config import get_settings
from app.database import AsyncSessionLocal
from app.imap.connection import IMAPConnection
from app.models import Mailbox
logger = logging.getLogger(__name__)
settings = get_settings()
# Canale Redis per eventi di gestione caselle
MAILBOX_EVENTS_CHANNEL = "mailbox:events"
class MailboxPool:
"""
Gestisce il pool di task IMAP asincroni.
Un task per casella, monitorato e riavviato automaticamente in caso di crash.
"""
def __init__(self, redis_client: aioredis.Redis) -> None:
self.redis = redis_client
# mailbox_id (str) → asyncio.Task
self._tasks: dict[str, asyncio.Task] = {}
# mailbox_id (str) → IMAPConnection
self._connections: dict[str, IMAPConnection] = {}
self._running = False
self._monitor_task: asyncio.Task | None = None
self._events_task: asyncio.Task | None = None
# ─── Lifecycle ────────────────────────────────────────────────────────────
async def start(self) -> None:
"""
Carica le mailbox attive dal DB e avvia i task IMAP.
Da chiamare nell'on_startup del worker arq.
"""
self._running = True
logger.info("MailboxPool: avvio in corso...")
mailbox_ids = await self._load_active_mailbox_ids()
logger.info(f"MailboxPool: {len(mailbox_ids)} caselle attive trovate")
for mid in mailbox_ids:
await self._start_task(mid)
# Task di monitoraggio: riavvia task morti
self._monitor_task = asyncio.create_task(
self._monitor_loop(), name="mailbox-pool-monitor"
)
# Task listener Redis: gestisce caselle aggiunte/rimosse a runtime
self._events_task = asyncio.create_task(
self._events_listener(), name="mailbox-pool-events"
)
logger.info(
f"MailboxPool avviato: {len(self._tasks)} task IMAP in esecuzione"
)
async def stop(self) -> None:
"""
Ferma tutti i task IMAP.
Da chiamare nell'on_shutdown del worker arq.
"""
logger.info("MailboxPool: arresto in corso...")
self._running = False
# Cancella task di sistema
for task in [self._monitor_task, self._events_task]:
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Ferma tutte le connessioni IMAP
for conn in self._connections.values():
conn.stop()
# Cancella i task
if self._tasks:
await asyncio.gather(
*[t for t in self._tasks.values() if not t.done()],
return_exceptions=True,
)
self._tasks.clear()
self._connections.clear()
logger.info("MailboxPool: tutti i task fermati")
# ─── Gestione task individuali ────────────────────────────────────────────
async def add_mailbox(self, mailbox_id: str) -> None:
"""Aggiunge e avvia una nuova casella al pool a runtime."""
if mailbox_id in self._tasks and not self._tasks[mailbox_id].done():
logger.debug(f"MailboxPool: {mailbox_id[:8]} già nel pool")
return
await self._start_task(mailbox_id)
logger.info(f"MailboxPool: casella aggiunta {mailbox_id[:8]}")
async def remove_mailbox(self, mailbox_id: str) -> None:
"""Ferma e rimuove una casella dal pool a runtime."""
if mailbox_id in self._connections:
self._connections[mailbox_id].stop()
if mailbox_id in self._tasks:
task = self._tasks[mailbox_id]
if not task.done():
task.cancel()
try:
await asyncio.wait_for(task, timeout=5.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
del self._tasks[mailbox_id]
if mailbox_id in self._connections:
del self._connections[mailbox_id]
logger.info(f"MailboxPool: casella rimossa {mailbox_id[:8]}")
@property
def active_count(self) -> int:
"""Numero di task IMAP attivi."""
return sum(1 for t in self._tasks.values() if not t.done())
# ─── Loop di monitoraggio ─────────────────────────────────────────────────
async def _monitor_loop(self) -> None:
"""
Ogni 30 secondi verifica i task morti e li riavvia.
Rimuove anche task di caselle che non esistono più nel DB.
"""
while self._running:
try:
await asyncio.sleep(30)
dead_ids = [
mid for mid, task in self._tasks.items()
if task.done() and not task.cancelled()
]
if dead_ids:
logger.info(
f"MailboxPool monitor: {len(dead_ids)} task morti, riavvio..."
)
active_ids = await self._load_active_mailbox_ids()
active_set = {str(mid) for mid in active_ids}
for mid in dead_ids:
if mid in active_set:
logger.info(
f"MailboxPool: riavvio task {mid[:8]}..."
)
await self._start_task(mid)
else:
# Casella non più attiva, rimuovi dal pool
logger.info(
f"MailboxPool: casella {mid[:8]} non più attiva, rimossa"
)
self._tasks.pop(mid, None)
self._connections.pop(mid, None)
# Aggiungi caselle nuove attive
active_ids = await self._load_active_mailbox_ids()
for mid in active_ids:
mid_str = str(mid)
if mid_str not in self._tasks or self._tasks[mid_str].done():
logger.info(
f"MailboxPool: rilevata nuova casella attiva {mid_str[:8]}"
)
await self._start_task(mid_str)
except asyncio.CancelledError:
return
except Exception as e:
logger.error(f"MailboxPool monitor errore: {e}", exc_info=True)
# ─── Listener eventi Redis ─────────────────────────────────────────────────
async def _events_listener(self) -> None:
"""
Ascolta eventi Redis per aggiungere/rimuovere caselle a runtime.
Formato evento: {"action": "add"|"remove"|"refresh", "mailbox_id": "<uuid>"}
Pubblicare con: PUBLISH mailbox:events '{"action":"add","mailbox_id":"<uuid>"}'
"""
pubsub = self.redis.pubsub()
await pubsub.subscribe(MAILBOX_EVENTS_CHANNEL)
logger.debug(f"MailboxPool: in ascolto su Redis {MAILBOX_EVENTS_CHANNEL}")
try:
async for message in pubsub.listen():
if not self._running:
break
if message["type"] != "message":
continue
try:
data = json.loads(message["data"])
action = data.get("action")
mid = data.get("mailbox_id")
if not action or not mid:
continue
if action == "add":
await self.add_mailbox(mid)
elif action == "remove":
await self.remove_mailbox(mid)
elif action == "refresh":
# Rimuovi e riavvia (utile dopo cambio credenziali)
await self.remove_mailbox(mid)
await asyncio.sleep(1)
await self.add_mailbox(mid)
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"MailboxPool: evento Redis malformato: {e}")
except asyncio.CancelledError:
await pubsub.unsubscribe(MAILBOX_EVENTS_CHANNEL)
return
except Exception as e:
logger.error(f"MailboxPool events listener errore: {e}", exc_info=True)
# ─── Private ─────────────────────────────────────────────────────────────
async def _start_task(self, mailbox_id: str) -> None:
"""Crea e avvia un nuovo task IMAPConnection per la casella."""
conn = IMAPConnection(
mailbox_id=mailbox_id,
redis_client=self.redis,
)
self._connections[mailbox_id] = conn
task = asyncio.create_task(
conn.run(),
name=f"imap-{mailbox_id[:8]}",
)
self._tasks[mailbox_id] = task
async def _load_active_mailbox_ids(self) -> list[str]:
"""Carica dal DB gli UUID di tutte le caselle con status=active."""
try:
async with AsyncSessionLocal() as db:
result = await db.execute(
select(Mailbox.id).where(Mailbox.status == "active")
)
return [str(row[0]) for row in result.all()]
except Exception as e:
logger.error(f"MailboxPool: errore caricamento caselle: {e}")
return []