mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
166 lines
5.1 KiB
Python
166 lines
5.1 KiB
Python
"""
|
||
Entrypoint worker arq – PEChub IMAP Sync Engine.
|
||
|
||
Avvio: python -m app.main
|
||
|
||
Cosa fa:
|
||
1. Connette a Redis tramite arq
|
||
2. on_startup → avvia MailboxPool (N task IMAP asincroni)
|
||
3. on_shutdown → ferma MailboxPool
|
||
4. Registra job arq (sync_mailbox, future: send_pec, archive_batch, ecc.)
|
||
5. Loop arq per processare job dalla coda Redis
|
||
|
||
L'event loop è condiviso tra arq e MailboxPool (asyncio task).
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import sys
|
||
from typing import Any
|
||
|
||
import redis.asyncio as aioredis
|
||
from arq import run_worker
|
||
from arq.connections import RedisSettings
|
||
|
||
from app.config import get_settings
|
||
from app.imap.pool import MailboxPool
|
||
from app.jobs.send_pec import send_pec
|
||
from app.jobs.sync_mailbox import sync_mailbox
|
||
from app.smtp.receipt_watcher import watch_receipt
|
||
from app.storage.minio_client import ensure_bucket_exists
|
||
|
||
settings = get_settings()
|
||
|
||
# Logging
|
||
logging.basicConfig(
|
||
level=getattr(logging, settings.log_level.upper(), logging.INFO),
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
stream=sys.stdout,
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Pool globale (accessibile dalle callback)
|
||
_mailbox_pool: MailboxPool | None = None
|
||
|
||
|
||
# ─── Lifecycle callbacks arq ──────────────────────────────────────────────────
|
||
|
||
async def on_startup(ctx: dict[str, Any]) -> None:
|
||
"""
|
||
Inizializzazione worker all'avvio.
|
||
Avvia il MailboxPool con tutte le caselle attive.
|
||
|
||
NOTA: ctx["redis"] è già un ArqRedis (con enqueue_job) impostato da arq
|
||
prima di chiamare on_startup – NON sovrascrivere con aioredis standard.
|
||
"""
|
||
global _mailbox_pool
|
||
|
||
logger.info("🚀 PEChub Worker avviato")
|
||
logger.info(f" DB: {settings.database_url.split('@')[-1]}")
|
||
logger.info(f" Redis: {settings.redis_url}")
|
||
logger.info(f" MinIO: {settings.minio_endpoint}")
|
||
|
||
# Verifica/crea bucket MinIO
|
||
try:
|
||
await ensure_bucket_exists()
|
||
except Exception as e:
|
||
logger.warning(f"MinIO non disponibile al startup: {e}")
|
||
|
||
# Usa il client Redis ArqRedis già presente nel contesto (messo da arq).
|
||
# ArqRedis estende Redis, quindi funziona sia per MailboxPool sia per
|
||
# enqueue_job() nei job asincroni (send_pec, watch_receipt, ecc.).
|
||
redis_client = ctx["redis"]
|
||
|
||
# Avvia MailboxPool
|
||
_mailbox_pool = MailboxPool(redis_client=redis_client)
|
||
ctx["mailbox_pool"] = _mailbox_pool
|
||
await _mailbox_pool.start()
|
||
|
||
logger.info(
|
||
f"✅ Worker pronto: {_mailbox_pool.active_count} caselle IMAP attive"
|
||
)
|
||
|
||
|
||
async def on_shutdown(ctx: dict[str, Any]) -> None:
|
||
"""Cleanup all'arresto del worker."""
|
||
global _mailbox_pool
|
||
|
||
logger.info("🛑 PEChub Worker in arresto...")
|
||
|
||
pool = ctx.get("mailbox_pool") or _mailbox_pool
|
||
if pool:
|
||
await pool.stop()
|
||
|
||
# NON chiudere ctx["redis"]: è l'ArqRedis gestito da arq,
|
||
# che ne gestisce il ciclo di vita autonomamente.
|
||
|
||
logger.info("🛑 Worker fermato")
|
||
|
||
|
||
# ─── Worker health check ──────────────────────────────────────────────────────
|
||
|
||
async def health_check(ctx: dict[str, Any]) -> dict:
|
||
"""
|
||
Job speciale per health check del worker.
|
||
Può essere chiamato da monitoring esterno.
|
||
"""
|
||
pool: MailboxPool | None = ctx.get("mailbox_pool")
|
||
return {
|
||
"status": "ok",
|
||
"active_imap_connections": pool.active_count if pool else 0,
|
||
}
|
||
|
||
|
||
# ─── WorkerSettings arq ───────────────────────────────────────────────────────
|
||
|
||
def _parse_redis_settings() -> RedisSettings:
|
||
"""Parsa REDIS_URL in RedisSettings arq."""
|
||
url = settings.redis_url
|
||
# Supporta redis://host:port/db e redis://user:pass@host:port/db
|
||
import urllib.parse
|
||
parsed = urllib.parse.urlparse(url)
|
||
|
||
host = parsed.hostname or "localhost"
|
||
port = parsed.port or 6379
|
||
db = int(parsed.path.lstrip("/") or "0")
|
||
password = parsed.password or None
|
||
|
||
return RedisSettings(host=host, port=port, database=db, password=password)
|
||
|
||
|
||
class WorkerSettings:
|
||
"""Configurazione del worker arq."""
|
||
|
||
# Funzioni/job registrati
|
||
functions = [sync_mailbox, send_pec, watch_receipt, health_check]
|
||
|
||
# Callbacks lifecycle
|
||
on_startup = on_startup
|
||
on_shutdown = on_shutdown
|
||
|
||
# Redis
|
||
redis_settings = _parse_redis_settings()
|
||
|
||
# Concorrenza
|
||
max_jobs = 20
|
||
|
||
# Timeout per ogni job (secondi)
|
||
# send_pec può richiedere più tempo su SMTP lenti
|
||
job_timeout = 120
|
||
|
||
# Retry automatico in caso di errore
|
||
max_tries = 3
|
||
|
||
# Polling interval (arq controlla la coda ogni N ms)
|
||
poll_delay = 0.5
|
||
|
||
# Keep job results per N secondi
|
||
keep_result = 3600
|
||
|
||
|
||
# ─── Entrypoint ───────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
logger.info("Avvio PEChub Worker (arq)...")
|
||
run_worker(WorkerSettings)
|