""" Job arq: dispatch_notification – invio effettivo delle notifiche multi-canale. Flusso completo: 1. sync.py salva un nuovo messaggio inbound 2. sync.py chiama evaluate_and_enqueue_notifications() (questa funzione) 3. evaluate_and_enqueue_notifications(): - legge le NotificationRule attive del tenant con event_type="new_message" - applica i filtri opzionali (mailbox_id, direction, pec_type) - per ogni regola che matcha: crea NotificationLog(status=pending) - enqueue del job arq dispatch_notification con defer 5s (per dare tempo al DB di committare prima che il job legga) 4. dispatch_notification(ctx, notification_log_id): - legge NotificationLog + NotificationChannel dal DB - controlla circuit breaker - decifra config_enc con AES-256-GCM - chiama il sender appropriato (telegram/webhook/email/whatsapp) - aggiorna status: sent / failed - in caso di fallimento: re-enqueue con backoff esponenziale - aggiorna circuit breaker dopo fallimenti consecutivi Circuit breaker: - 5+ fallimenti consecutivi → apre per 1 ora - reset automatico al primo successo Retry backoff (max_attempts default=3): Tentativo 1 fallisce → attendi 5 min → tentativo 2 Tentativo 2 fallisce → attendi 30 min → tentativo 3 Tentativo 3 fallisce → FAILED definitivo Cifratura config_enc: AES-256-GCM – stesso schema di notification_service.py nel backend. Backward compatible: fallback a base64 grezzo per configurazioni pre-fix. """ import base64 import json import logging import os import uuid from datetime import UTC, datetime, timedelta from typing import Any from cryptography.hazmat.primitives.ciphers.aead import AESGCM from sqlalchemy import select from sqlalchemy.orm import selectinload from app.config import get_settings from app.database import AsyncSessionLocal from app.models import ( Mailbox, Message, NotificationChannel, NotificationLog, NotificationRule, ) logger = logging.getLogger(__name__) settings = get_settings() # ─── Backoff retry (secondi) per tentativo N fallito (0-based) ─────────────── _RETRY_DELAYS = [ 5 * 60, # dopo tentativo 1: 5 minuti 30 * 60, # dopo tentativo 2: 30 minuti 120 * 60, # dopo tentativo 3: 2 ore (non raggiunto: max_attempts=3) ] # ─── Circuit breaker ────────────────────────────────────────────────────────── _CIRCUIT_FAILURE_THRESHOLD = 5 # fallimenti consecutivi per aprire il circuito _CIRCUIT_OPEN_DURATION = timedelta(hours=1) # ─── Cifratura AES-256-GCM ──────────────────────────────────────────────────── def _decrypt_config(enc: str) -> dict: """ Decifra config_enc AES-256-GCM. Backward compatible: se non e' GCM valido, prova fallback base64 grezzo. """ key = settings.encryption_key_bytes try: raw = base64.b64decode(enc.encode("ascii")) if len(raw) > 28: # 12 nonce + 16 tag minimo nonce = raw[:12] ciphertext = raw[12:] aesgcm = AESGCM(key) plaintext = aesgcm.decrypt(nonce, ciphertext, None) return json.loads(plaintext.decode("utf-8")) except Exception: pass # Fallback: base64 grezzo (configurazioni pre-fix sicurezza) try: raw = base64.b64decode(enc.encode("ascii")) return json.loads(raw.decode("utf-8")) except Exception: return {} # ─── Valutazione filtri regola ──────────────────────────────────────────────── def _matches_filter( filter_data: dict | None, message: Message, mailbox: Mailbox, ) -> bool: """ Valuta se un messaggio soddisfa i filtri opzionali di una regola. Filtri supportati: mailbox_id: str | list[str] – UUID casella direction: str – "inbound" | "outbound" pec_type: str | list[str] – tipo messaggio PEC """ if not filter_data: return True # Filtro per mailbox if "mailbox_id" in filter_data: allowed = filter_data["mailbox_id"] if isinstance(allowed, list): if str(message.mailbox_id) not in [str(a) for a in allowed]: return False else: if str(message.mailbox_id) != str(allowed): return False # Filtro per direzione if "direction" in filter_data: if message.direction != filter_data["direction"]: return False # Filtro per tipo PEC if "pec_type" in filter_data: allowed = filter_data["pec_type"] if isinstance(allowed, list): if message.pec_type not in allowed: return False else: if message.pec_type != allowed: return False return True # ─── Costruzione testo notifica ─────────────────────────────────────────────── def _build_notification_text( event_type: str, payload: dict, channel_type: str, channel_name: str = "", ) -> str: """ Costruisce il testo della notifica in base all'evento e al tipo canale. """ if event_type == "new_message": subject = payload.get("subject") or "(senza oggetto)" from_addr = payload.get("from_address") or "mittente sconosciuto" mailbox_email = payload.get("mailbox_email") or payload.get("mailbox_id", "") pec_type = payload.get("pec_type", "posta_certificata") received_at = payload.get("received_at", "") # Traduzione tipo PEC pec_type_labels = { "posta_certificata": "PEC", "accettazione": "Ricevuta di Accettazione", "non_accettazione": "Ricevuta di Non Accettazione", "presa_in_carico": "Ricevuta di Presa in Carico", "avvenuta_consegna": "Ricevuta di Avvenuta Consegna", "mancata_consegna": "Ricevuta di Mancata Consegna", "errore_consegna": "Ricevuta di Errore Consegna", "preavviso_mancata_consegna": "Preavviso Mancata Consegna", "rilevazione_virus": "Rilevazione Virus", "unknown": "Messaggio", } tipo_label = pec_type_labels.get(pec_type, pec_type) if channel_type == "telegram": return ( f"Nuovo messaggio PEC\n\n" f"Tipo: {tipo_label}\n" f"Da: {from_addr}\n" f"Casella: {mailbox_email}\n" f"Oggetto: {subject}" ) else: return ( f"Nuovo messaggio PEC\n" f"Tipo: {tipo_label}\n" f"Da: {from_addr}\n" f"Casella: {mailbox_email}\n" f"Oggetto: {subject}" ) elif event_type == "state_changed": message_id = payload.get("message_id", "") old_state = payload.get("old_state", "") new_state = payload.get("new_state", "") subject = payload.get("subject", "") return ( f"Cambio stato PEC\n" f"Oggetto: {subject}\n" f"Stato: {old_state} -> {new_state}" ) else: return f"Evento PEChub: {event_type}\n{json.dumps(payload, ensure_ascii=False, default=str)}" def _build_notification_payload(event_type: str, payload: dict) -> dict: """Costruisce il payload JSON per il webhook.""" return { "event": event_type, "timestamp": datetime.now(UTC).isoformat(), "data": payload, } def _build_email_subject(event_type: str, payload: dict) -> str: """Costruisce l'oggetto dell'email di notifica.""" if event_type == "new_message": subject = payload.get("subject") or "(senza oggetto)" return f"[PEChub] Nuovo messaggio PEC: {subject}" elif event_type == "state_changed": return f"[PEChub] Cambio stato PEC: {payload.get('new_state', '')}" return f"[PEChub] Evento: {event_type}" # ─── Evaluate & Enqueue ─────────────────────────────────────────────────────── async def evaluate_and_enqueue_notifications( message: Message, mailbox: Mailbox, db: Any, # AsyncSession – evito import circolare redis_client: Any, # ArqRedis ) -> None: """ Valuta le regole di notifica per un messaggio appena salvato e accoda i job. Chiamata da sync.py dopo _save_message e index_message. Non solleva eccezioni: gli errori vengono loggati ma non propagati per non interrompere il flusso di sincronizzazione IMAP. Args: message: messaggio appena salvato nel DB (flush, non commit) mailbox: casella di appartenenza db: sessione DB (open, con flush del messaggio) redis_client: ArqRedis per enqueue_job """ try: await _do_evaluate_and_enqueue(message, mailbox, db, redis_client) except Exception as exc: logger.error( f"Errore evaluate_and_enqueue_notifications per messaggio " f"{message.id}: {exc}", exc_info=True, ) async def _do_evaluate_and_enqueue( message: Message, mailbox: Mailbox, db: Any, redis_client: Any, ) -> None: """Logica interna – puo' sollevare eccezioni.""" # Carica regole attive per questo tenant con event_type = "new_message" rules_result = await db.execute( select(NotificationRule) .options(selectinload(NotificationRule.channel)) .where( NotificationRule.tenant_id == message.tenant_id, NotificationRule.is_active == True, # noqa: E712 NotificationRule.event_type == "new_message", ) ) rules: list[NotificationRule] = list(rules_result.scalars().all()) if not rules: return enqueued_count = 0 for rule in rules: channel = rule.channel if not channel or not channel.is_active: continue # Controlla circuit breaker if ( channel.circuit_open_until and channel.circuit_open_until > datetime.now(UTC) ): logger.debug( f"[notify] Canale {channel.name!r} circuit aperto fino a " f"{channel.circuit_open_until}, skip regola {rule.name!r}" ) continue # Applica filtri if not _matches_filter(rule.filter, message, mailbox): continue # Costruisce il payload dell'evento event_payload = { "message_id": str(message.id), "mailbox_id": str(mailbox.id), "mailbox_email": mailbox.email_address, "subject": message.subject or "", "from_address": message.from_address or "", "to_addresses": list(message.to_addresses or []), "pec_type": message.pec_type, "direction": message.direction, "received_at": ( message.received_at.isoformat() if message.received_at else None ), } # Crea NotificationLog log = NotificationLog( id=uuid.uuid4(), tenant_id=message.tenant_id, channel_id=rule.channel_id, rule_id=rule.id, event_type="new_message", event_payload=event_payload, status="pending", attempt_count=0, max_attempts=3, ) db.add(log) await db.flush() # ottieni log.id # Enqueue arq job con defer 5s per attendere il commit DB try: await redis_client.enqueue_job( "dispatch_notification", str(log.id), _defer_by=timedelta(seconds=5), ) enqueued_count += 1 logger.info( f"[notify] Enqueued dispatch_notification per regola " f"{rule.name!r} -> canale {channel.name!r} " f"(log_id={log.id})" ) except Exception as e: logger.error( f"[notify] Impossibile enqueue dispatch_notification " f"per log {log.id}: {e}" ) if enqueued_count > 0: logger.info( f"[notify] Messaggio {message.id}: " f"{enqueued_count} notifiche accodate" ) # ─── Job arq principale ─────────────────────────────────────────────────────── async def dispatch_notification( ctx: dict[str, Any], notification_log_id: str, ) -> dict: """ Job arq: legge un NotificationLog e invia la notifica al canale configurato. Args: ctx: contesto arq (ctx["redis"] = ArqRedis) notification_log_id: UUID del NotificationLog da processare Returns: dict con status e dettagli """ redis_client = ctx.get("redis") async with AsyncSessionLocal() as db: log_uuid = uuid.UUID(notification_log_id) # ── Carica log + canale ─────────────────────────────────────────────── log_result = await db.execute( select(NotificationLog) .options(selectinload(NotificationLog.channel)) .where(NotificationLog.id == log_uuid) ) log: NotificationLog | None = log_result.scalar_one_or_none() if not log: logger.warning( f"[dispatch_notification] NotificationLog {notification_log_id} non trovato" ) return {"status": "not_found", "log_id": notification_log_id} if log.status in ("sent", "failed", "skipped"): logger.debug( f"[dispatch_notification] Log {notification_log_id} " f"gia' in stato {log.status!r}, skip" ) return {"status": "already_processed", "current_status": log.status} channel = log.channel if not channel or not channel.is_active: log.status = "skipped" await db.commit() return {"status": "skipped", "reason": "channel_inactive"} # ── Circuit breaker ─────────────────────────────────────────────────── if ( channel.circuit_open_until and channel.circuit_open_until > datetime.now(UTC) ): log.status = "skipped" await db.commit() logger.info( f"[dispatch_notification] Canale {channel.name!r} circuit aperto, " f"log {notification_log_id} marcato skipped" ) return {"status": "skipped", "reason": "circuit_open"} # ── Incrementa contatore tentativi ──────────────────────────────────── log.attempt_count += 1 current_attempt = log.attempt_count # ── Decifra config sensibile ────────────────────────────────────────── secret: dict = {} if channel.config_enc: try: secret = _decrypt_config(channel.config_enc) except Exception as e: log.status = "failed" log.last_error = f"Errore decifratura config: {e}" await db.commit() logger.error( f"[dispatch_notification] Errore decifratura canale " f"{channel.name!r}: {e}" ) return {"status": "failed", "error": str(e)} config = channel.config or {} payload = log.event_payload or {} channel_type = channel.channel_type # ── Testo notifica ──────────────────────────────────────────────────── notif_text = _build_notification_text( event_type=log.event_type, payload=payload, channel_type=channel_type, channel_name=channel.name, ) # ── Dispatch al sender ──────────────────────────────────────────────── success = False error_msg: str | None = None http_status: int | None = None try: if channel_type == "telegram": bot_token = secret.get("bot_token") chat_id = str(config.get("chat_id", "")) if not bot_token or not chat_id: raise ValueError("bot_token o chat_id non configurati") from app.notifications.telegram import send_message result = await send_message( bot_token=bot_token, chat_id=chat_id, text=notif_text, parse_mode="HTML", ) http_status = 200 success = True logger.info( f"[dispatch_notification] Telegram inviato: " f"message_id={result.get('message_id')} " f"canale={channel.name!r}" ) elif channel_type == "webhook": url = config.get("url") if not url: raise ValueError("URL webhook non configurato") webhook_secret_val = secret.get("webhook_secret") from app.notifications.webhook import send_webhook result = await send_webhook( url=url, payload=_build_notification_payload(log.event_type, payload), event_type=log.event_type, webhook_secret=webhook_secret_val, ) http_status = result.get("http_status") success = True logger.info( f"[dispatch_notification] Webhook inviato: " f"HTTP {http_status} delivery={result.get('delivery_id')} " f"canale={channel.name!r}" ) elif channel_type == "email": smtp_host = config.get("smtp_host") smtp_port = int(config.get("smtp_port", 465)) from_email = config.get("from_email") to_email = config.get("to_email") smtp_user = config.get("smtp_user") or from_email use_tls = config.get("smtp_use_tls", True) use_starttls = config.get("smtp_use_starttls", False) from_name = config.get("from_name", "PEChub Notifiche") smtp_password = secret.get("smtp_password", "") if not smtp_host or not from_email or not to_email: raise ValueError("Configurazione SMTP incompleta") from app.notifications.email_smtp import send_email_notification subject = _build_email_subject(log.event_type, payload) await send_email_notification( smtp_host=smtp_host, smtp_port=smtp_port, smtp_user=smtp_user, smtp_password=smtp_password, from_email=from_email, to_email=to_email, subject=subject, body_text=notif_text, body_html=None, from_name=from_name, use_tls=use_tls, use_starttls=use_starttls, ) http_status = 200 success = True logger.info( f"[dispatch_notification] Email inviata a {to_email} " f"canale={channel.name!r}" ) elif channel_type == "whatsapp": phone_number_id = config.get("phone_number_id") to_phone = config.get("to_phone") access_token = secret.get("access_token") if not phone_number_id or not to_phone or not access_token: raise ValueError("Configurazione WhatsApp incompleta") from app.notifications.whatsapp import send_whatsapp_message result = await send_whatsapp_message( phone_number_id=phone_number_id, to_phone=to_phone, text=notif_text, access_token=access_token, ) http_status = result.get("http_status") success = True logger.info( f"[dispatch_notification] WhatsApp inviato: " f"message_id={result.get('message_id')} " f"canale={channel.name!r}" ) else: raise ValueError(f"Tipo canale non supportato: {channel_type!r}") except Exception as exc: error_msg = str(exc) logger.warning( f"[dispatch_notification] Tentativo {current_attempt} fallito " f"canale={channel.name!r} tipo={channel_type!r}: {error_msg}" ) # ── Aggiorna stato log e canale ─────────────────────────────────────── if success: log.status = "sent" log.sent_at = datetime.now(UTC) log.http_status = http_status # Reset circuit breaker channel.consecutive_failures = 0 channel.circuit_open_until = None await db.commit() logger.info( f"[dispatch_notification] Notifica {notification_log_id} INVIATA " f"canale={channel.name!r} tipo={channel_type!r}" ) return { "status": "sent", "log_id": notification_log_id, "channel": channel.name, "channel_type": channel_type, "attempt": current_attempt, } else: # ── Retry o failure definitivo ──────────────────────────────────── log.last_error = error_msg log.http_status = http_status if current_attempt < log.max_attempts: # Calcola delay backoff delay_idx = min(current_attempt - 1, len(_RETRY_DELAYS) - 1) delay_seconds = _RETRY_DELAYS[delay_idx] next_retry = datetime.now(UTC) + timedelta(seconds=delay_seconds) log.next_retry_at = next_retry # Mantieni status "pending" per il retry await db.commit() # Re-enqueue con backoff if redis_client: try: await redis_client.enqueue_job( "dispatch_notification", notification_log_id, _defer_by=timedelta(seconds=delay_seconds), ) logger.info( f"[dispatch_notification] Retry tentativo " f"{current_attempt + 1} schedulato in {delay_seconds}s " f"per log {notification_log_id}" ) except Exception as enqueue_err: logger.error( f"[dispatch_notification] Impossibile re-enqueue " f"log {notification_log_id}: {enqueue_err}" ) return { "status": "retry", "log_id": notification_log_id, "attempt": current_attempt, "next_retry_in_seconds": delay_seconds, "error": error_msg, } else: # Tutti i tentativi esauriti → FAILED log.status = "failed" # Aggiorna circuit breaker canale channel.consecutive_failures += 1 if channel.consecutive_failures >= _CIRCUIT_FAILURE_THRESHOLD: channel.circuit_open_until = datetime.now(UTC) + _CIRCUIT_OPEN_DURATION logger.warning( f"[dispatch_notification] Circuit breaker aperto per " f"canale {channel.name!r} " f"({channel.consecutive_failures} fallimenti consecutivi)" ) await db.commit() logger.error( f"[dispatch_notification] Notifica {notification_log_id} FALLITA " f"definitivamente dopo {current_attempt} tentativi: {error_msg}" ) return { "status": "failed", "log_id": notification_log_id, "channel": channel.name, "attempts": current_attempt, "error": error_msg, }