mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
667 lines
25 KiB
Python
667 lines
25 KiB
Python
"""
|
||
Job arq: dispatch_notification – invio effettivo delle notifiche multi-canale.
|
||
|
||
Flusso completo:
|
||
1. sync.py salva un nuovo messaggio inbound
|
||
2. sync.py chiama evaluate_and_enqueue_notifications() (questa funzione)
|
||
3. evaluate_and_enqueue_notifications():
|
||
- legge le NotificationRule attive del tenant con event_type="new_message"
|
||
- applica i filtri opzionali (mailbox_id, direction, pec_type)
|
||
- per ogni regola che matcha: crea NotificationLog(status=pending)
|
||
- enqueue del job arq dispatch_notification con defer 5s
|
||
(per dare tempo al DB di committare prima che il job legga)
|
||
4. dispatch_notification(ctx, notification_log_id):
|
||
- legge NotificationLog + NotificationChannel dal DB
|
||
- controlla circuit breaker
|
||
- decifra config_enc con AES-256-GCM
|
||
- chiama il sender appropriato (telegram/webhook/email/whatsapp)
|
||
- aggiorna status: sent / failed
|
||
- in caso di fallimento: re-enqueue con backoff esponenziale
|
||
- aggiorna circuit breaker dopo fallimenti consecutivi
|
||
|
||
Circuit breaker:
|
||
- 5+ fallimenti consecutivi → apre per 1 ora
|
||
- reset automatico al primo successo
|
||
|
||
Retry backoff (max_attempts default=3):
|
||
Tentativo 1 fallisce → attendi 5 min → tentativo 2
|
||
Tentativo 2 fallisce → attendi 30 min → tentativo 3
|
||
Tentativo 3 fallisce → FAILED definitivo
|
||
|
||
Cifratura config_enc:
|
||
AES-256-GCM – stesso schema di notification_service.py nel backend.
|
||
Backward compatible: fallback a base64 grezzo per configurazioni pre-fix.
|
||
"""
|
||
|
||
import base64
|
||
import json
|
||
import logging
|
||
import os
|
||
import uuid
|
||
from datetime import UTC, datetime, timedelta
|
||
from typing import Any
|
||
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
from sqlalchemy import select
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from app.config import get_settings
|
||
from app.database import AsyncSessionLocal
|
||
from app.models import (
|
||
Mailbox,
|
||
Message,
|
||
NotificationChannel,
|
||
NotificationLog,
|
||
NotificationRule,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
settings = get_settings()
|
||
|
||
# ─── Backoff retry (secondi) per tentativo N fallito (0-based) ───────────────
|
||
_RETRY_DELAYS = [
|
||
5 * 60, # dopo tentativo 1: 5 minuti
|
||
30 * 60, # dopo tentativo 2: 30 minuti
|
||
120 * 60, # dopo tentativo 3: 2 ore (non raggiunto: max_attempts=3)
|
||
]
|
||
|
||
# ─── Circuit breaker ──────────────────────────────────────────────────────────
|
||
_CIRCUIT_FAILURE_THRESHOLD = 5 # fallimenti consecutivi per aprire il circuito
|
||
_CIRCUIT_OPEN_DURATION = timedelta(hours=1)
|
||
|
||
|
||
# ─── Cifratura AES-256-GCM ────────────────────────────────────────────────────
|
||
|
||
def _decrypt_config(enc: str) -> dict:
|
||
"""
|
||
Decifra config_enc AES-256-GCM.
|
||
|
||
Backward compatible: se non e' GCM valido, prova fallback base64 grezzo.
|
||
"""
|
||
key = settings.encryption_key_bytes
|
||
try:
|
||
raw = base64.b64decode(enc.encode("ascii"))
|
||
if len(raw) > 28: # 12 nonce + 16 tag minimo
|
||
nonce = raw[:12]
|
||
ciphertext = raw[12:]
|
||
aesgcm = AESGCM(key)
|
||
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
||
return json.loads(plaintext.decode("utf-8"))
|
||
except Exception:
|
||
pass
|
||
|
||
# Fallback: base64 grezzo (configurazioni pre-fix sicurezza)
|
||
try:
|
||
raw = base64.b64decode(enc.encode("ascii"))
|
||
return json.loads(raw.decode("utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
# ─── Valutazione filtri regola ────────────────────────────────────────────────
|
||
|
||
def _matches_filter(
|
||
filter_data: dict | None,
|
||
message: Message,
|
||
mailbox: Mailbox,
|
||
) -> bool:
|
||
"""
|
||
Valuta se un messaggio soddisfa i filtri opzionali di una regola.
|
||
|
||
Filtri supportati:
|
||
mailbox_id: str | list[str] – UUID casella
|
||
direction: str – "inbound" | "outbound"
|
||
pec_type: str | list[str] – tipo messaggio PEC
|
||
"""
|
||
if not filter_data:
|
||
return True
|
||
|
||
# Filtro per mailbox
|
||
if "mailbox_id" in filter_data:
|
||
allowed = filter_data["mailbox_id"]
|
||
if isinstance(allowed, list):
|
||
if str(message.mailbox_id) not in [str(a) for a in allowed]:
|
||
return False
|
||
else:
|
||
if str(message.mailbox_id) != str(allowed):
|
||
return False
|
||
|
||
# Filtro per direzione
|
||
if "direction" in filter_data:
|
||
if message.direction != filter_data["direction"]:
|
||
return False
|
||
|
||
# Filtro per tipo PEC
|
||
if "pec_type" in filter_data:
|
||
allowed = filter_data["pec_type"]
|
||
if isinstance(allowed, list):
|
||
if message.pec_type not in allowed:
|
||
return False
|
||
else:
|
||
if message.pec_type != allowed:
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
# ─── Costruzione testo notifica ───────────────────────────────────────────────
|
||
|
||
def _build_notification_text(
|
||
event_type: str,
|
||
payload: dict,
|
||
channel_type: str,
|
||
channel_name: str = "",
|
||
) -> str:
|
||
"""
|
||
Costruisce il testo della notifica in base all'evento e al tipo canale.
|
||
"""
|
||
if event_type == "new_message":
|
||
subject = payload.get("subject") or "(senza oggetto)"
|
||
from_addr = payload.get("from_address") or "mittente sconosciuto"
|
||
mailbox_email = payload.get("mailbox_email") or payload.get("mailbox_id", "")
|
||
pec_type = payload.get("pec_type", "posta_certificata")
|
||
received_at = payload.get("received_at", "")
|
||
|
||
# Traduzione tipo PEC
|
||
pec_type_labels = {
|
||
"posta_certificata": "PEC",
|
||
"accettazione": "Ricevuta di Accettazione",
|
||
"non_accettazione": "Ricevuta di Non Accettazione",
|
||
"presa_in_carico": "Ricevuta di Presa in Carico",
|
||
"avvenuta_consegna": "Ricevuta di Avvenuta Consegna",
|
||
"mancata_consegna": "Ricevuta di Mancata Consegna",
|
||
"errore_consegna": "Ricevuta di Errore Consegna",
|
||
"preavviso_mancata_consegna": "Preavviso Mancata Consegna",
|
||
"rilevazione_virus": "Rilevazione Virus",
|
||
"unknown": "Messaggio",
|
||
}
|
||
tipo_label = pec_type_labels.get(pec_type, pec_type)
|
||
|
||
if channel_type == "telegram":
|
||
return (
|
||
f"<b>Nuovo messaggio PEC</b>\n\n"
|
||
f"<b>Tipo:</b> {tipo_label}\n"
|
||
f"<b>Da:</b> {from_addr}\n"
|
||
f"<b>Casella:</b> {mailbox_email}\n"
|
||
f"<b>Oggetto:</b> {subject}"
|
||
)
|
||
else:
|
||
return (
|
||
f"Nuovo messaggio PEC\n"
|
||
f"Tipo: {tipo_label}\n"
|
||
f"Da: {from_addr}\n"
|
||
f"Casella: {mailbox_email}\n"
|
||
f"Oggetto: {subject}"
|
||
)
|
||
|
||
elif event_type == "state_changed":
|
||
message_id = payload.get("message_id", "")
|
||
old_state = payload.get("old_state", "")
|
||
new_state = payload.get("new_state", "")
|
||
subject = payload.get("subject", "")
|
||
return (
|
||
f"Cambio stato PEC\n"
|
||
f"Oggetto: {subject}\n"
|
||
f"Stato: {old_state} -> {new_state}"
|
||
)
|
||
|
||
else:
|
||
return f"Evento PEChub: {event_type}\n{json.dumps(payload, ensure_ascii=False, default=str)}"
|
||
|
||
|
||
def _build_notification_payload(event_type: str, payload: dict) -> dict:
|
||
"""Costruisce il payload JSON per il webhook."""
|
||
return {
|
||
"event": event_type,
|
||
"timestamp": datetime.now(UTC).isoformat(),
|
||
"data": payload,
|
||
}
|
||
|
||
|
||
def _build_email_subject(event_type: str, payload: dict) -> str:
|
||
"""Costruisce l'oggetto dell'email di notifica."""
|
||
if event_type == "new_message":
|
||
subject = payload.get("subject") or "(senza oggetto)"
|
||
return f"[PEChub] Nuovo messaggio PEC: {subject}"
|
||
elif event_type == "state_changed":
|
||
return f"[PEChub] Cambio stato PEC: {payload.get('new_state', '')}"
|
||
return f"[PEChub] Evento: {event_type}"
|
||
|
||
|
||
# ─── Evaluate & Enqueue ───────────────────────────────────────────────────────
|
||
|
||
async def evaluate_and_enqueue_notifications(
|
||
message: Message,
|
||
mailbox: Mailbox,
|
||
db: Any, # AsyncSession – evito import circolare
|
||
redis_client: Any, # ArqRedis
|
||
) -> None:
|
||
"""
|
||
Valuta le regole di notifica per un messaggio appena salvato e accoda i job.
|
||
|
||
Chiamata da sync.py dopo _save_message e index_message.
|
||
Non solleva eccezioni: gli errori vengono loggati ma non propagati per
|
||
non interrompere il flusso di sincronizzazione IMAP.
|
||
|
||
Args:
|
||
message: messaggio appena salvato nel DB (flush, non commit)
|
||
mailbox: casella di appartenenza
|
||
db: sessione DB (open, con flush del messaggio)
|
||
redis_client: ArqRedis per enqueue_job
|
||
"""
|
||
try:
|
||
await _do_evaluate_and_enqueue(message, mailbox, db, redis_client)
|
||
except Exception as exc:
|
||
logger.error(
|
||
f"Errore evaluate_and_enqueue_notifications per messaggio "
|
||
f"{message.id}: {exc}",
|
||
exc_info=True,
|
||
)
|
||
|
||
|
||
async def _do_evaluate_and_enqueue(
|
||
message: Message,
|
||
mailbox: Mailbox,
|
||
db: Any,
|
||
redis_client: Any,
|
||
) -> None:
|
||
"""Logica interna – puo' sollevare eccezioni."""
|
||
|
||
# Carica regole attive per questo tenant con event_type = "new_message"
|
||
rules_result = await db.execute(
|
||
select(NotificationRule)
|
||
.options(selectinload(NotificationRule.channel))
|
||
.where(
|
||
NotificationRule.tenant_id == message.tenant_id,
|
||
NotificationRule.is_active == True, # noqa: E712
|
||
NotificationRule.event_type == "new_message",
|
||
)
|
||
)
|
||
rules: list[NotificationRule] = list(rules_result.scalars().all())
|
||
|
||
if not rules:
|
||
return
|
||
|
||
enqueued_count = 0
|
||
|
||
for rule in rules:
|
||
channel = rule.channel
|
||
if not channel or not channel.is_active:
|
||
continue
|
||
|
||
# Controlla circuit breaker
|
||
if (
|
||
channel.circuit_open_until
|
||
and channel.circuit_open_until > datetime.now(UTC)
|
||
):
|
||
logger.debug(
|
||
f"[notify] Canale {channel.name!r} circuit aperto fino a "
|
||
f"{channel.circuit_open_until}, skip regola {rule.name!r}"
|
||
)
|
||
continue
|
||
|
||
# Applica filtri
|
||
if not _matches_filter(rule.filter, message, mailbox):
|
||
continue
|
||
|
||
# Costruisce il payload dell'evento
|
||
event_payload = {
|
||
"message_id": str(message.id),
|
||
"mailbox_id": str(mailbox.id),
|
||
"mailbox_email": mailbox.email_address,
|
||
"subject": message.subject or "",
|
||
"from_address": message.from_address or "",
|
||
"to_addresses": list(message.to_addresses or []),
|
||
"pec_type": message.pec_type,
|
||
"direction": message.direction,
|
||
"received_at": (
|
||
message.received_at.isoformat()
|
||
if message.received_at
|
||
else None
|
||
),
|
||
}
|
||
|
||
# Crea NotificationLog
|
||
log = NotificationLog(
|
||
id=uuid.uuid4(),
|
||
tenant_id=message.tenant_id,
|
||
channel_id=rule.channel_id,
|
||
rule_id=rule.id,
|
||
event_type="new_message",
|
||
event_payload=event_payload,
|
||
status="pending",
|
||
attempt_count=0,
|
||
max_attempts=3,
|
||
)
|
||
db.add(log)
|
||
await db.flush() # ottieni log.id
|
||
|
||
# Enqueue arq job con defer 5s per attendere il commit DB
|
||
try:
|
||
await redis_client.enqueue_job(
|
||
"dispatch_notification",
|
||
str(log.id),
|
||
_defer_by=timedelta(seconds=5),
|
||
)
|
||
enqueued_count += 1
|
||
logger.info(
|
||
f"[notify] Enqueued dispatch_notification per regola "
|
||
f"{rule.name!r} -> canale {channel.name!r} "
|
||
f"(log_id={log.id})"
|
||
)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[notify] Impossibile enqueue dispatch_notification "
|
||
f"per log {log.id}: {e}"
|
||
)
|
||
|
||
if enqueued_count > 0:
|
||
logger.info(
|
||
f"[notify] Messaggio {message.id}: "
|
||
f"{enqueued_count} notifiche accodate"
|
||
)
|
||
|
||
|
||
# ─── Job arq principale ───────────────────────────────────────────────────────
|
||
|
||
async def dispatch_notification(
|
||
ctx: dict[str, Any],
|
||
notification_log_id: str,
|
||
) -> dict:
|
||
"""
|
||
Job arq: legge un NotificationLog e invia la notifica al canale configurato.
|
||
|
||
Args:
|
||
ctx: contesto arq (ctx["redis"] = ArqRedis)
|
||
notification_log_id: UUID del NotificationLog da processare
|
||
|
||
Returns:
|
||
dict con status e dettagli
|
||
"""
|
||
redis_client = ctx.get("redis")
|
||
|
||
async with AsyncSessionLocal() as db:
|
||
log_uuid = uuid.UUID(notification_log_id)
|
||
|
||
# ── Carica log + canale ───────────────────────────────────────────────
|
||
log_result = await db.execute(
|
||
select(NotificationLog)
|
||
.options(selectinload(NotificationLog.channel))
|
||
.where(NotificationLog.id == log_uuid)
|
||
)
|
||
log: NotificationLog | None = log_result.scalar_one_or_none()
|
||
|
||
if not log:
|
||
logger.warning(
|
||
f"[dispatch_notification] NotificationLog {notification_log_id} non trovato"
|
||
)
|
||
return {"status": "not_found", "log_id": notification_log_id}
|
||
|
||
if log.status in ("sent", "failed", "skipped"):
|
||
logger.debug(
|
||
f"[dispatch_notification] Log {notification_log_id} "
|
||
f"gia' in stato {log.status!r}, skip"
|
||
)
|
||
return {"status": "already_processed", "current_status": log.status}
|
||
|
||
channel = log.channel
|
||
if not channel or not channel.is_active:
|
||
log.status = "skipped"
|
||
await db.commit()
|
||
return {"status": "skipped", "reason": "channel_inactive"}
|
||
|
||
# ── Circuit breaker ───────────────────────────────────────────────────
|
||
if (
|
||
channel.circuit_open_until
|
||
and channel.circuit_open_until > datetime.now(UTC)
|
||
):
|
||
log.status = "skipped"
|
||
await db.commit()
|
||
logger.info(
|
||
f"[dispatch_notification] Canale {channel.name!r} circuit aperto, "
|
||
f"log {notification_log_id} marcato skipped"
|
||
)
|
||
return {"status": "skipped", "reason": "circuit_open"}
|
||
|
||
# ── Incrementa contatore tentativi ────────────────────────────────────
|
||
log.attempt_count += 1
|
||
current_attempt = log.attempt_count
|
||
|
||
# ── Decifra config sensibile ──────────────────────────────────────────
|
||
secret: dict = {}
|
||
if channel.config_enc:
|
||
try:
|
||
secret = _decrypt_config(channel.config_enc)
|
||
except Exception as e:
|
||
log.status = "failed"
|
||
log.last_error = f"Errore decifratura config: {e}"
|
||
await db.commit()
|
||
logger.error(
|
||
f"[dispatch_notification] Errore decifratura canale "
|
||
f"{channel.name!r}: {e}"
|
||
)
|
||
return {"status": "failed", "error": str(e)}
|
||
|
||
config = channel.config or {}
|
||
payload = log.event_payload or {}
|
||
channel_type = channel.channel_type
|
||
|
||
# ── Testo notifica ────────────────────────────────────────────────────
|
||
notif_text = _build_notification_text(
|
||
event_type=log.event_type,
|
||
payload=payload,
|
||
channel_type=channel_type,
|
||
channel_name=channel.name,
|
||
)
|
||
|
||
# ── Dispatch al sender ────────────────────────────────────────────────
|
||
success = False
|
||
error_msg: str | None = None
|
||
http_status: int | None = None
|
||
|
||
try:
|
||
if channel_type == "telegram":
|
||
bot_token = secret.get("bot_token")
|
||
chat_id = str(config.get("chat_id", ""))
|
||
if not bot_token or not chat_id:
|
||
raise ValueError("bot_token o chat_id non configurati")
|
||
|
||
from app.notifications.telegram import send_message
|
||
result = await send_message(
|
||
bot_token=bot_token,
|
||
chat_id=chat_id,
|
||
text=notif_text,
|
||
parse_mode="HTML",
|
||
)
|
||
http_status = 200
|
||
success = True
|
||
logger.info(
|
||
f"[dispatch_notification] Telegram inviato: "
|
||
f"message_id={result.get('message_id')} "
|
||
f"canale={channel.name!r}"
|
||
)
|
||
|
||
elif channel_type == "webhook":
|
||
url = config.get("url")
|
||
if not url:
|
||
raise ValueError("URL webhook non configurato")
|
||
webhook_secret_val = secret.get("webhook_secret")
|
||
|
||
from app.notifications.webhook import send_webhook
|
||
result = await send_webhook(
|
||
url=url,
|
||
payload=_build_notification_payload(log.event_type, payload),
|
||
event_type=log.event_type,
|
||
webhook_secret=webhook_secret_val,
|
||
)
|
||
http_status = result.get("http_status")
|
||
success = True
|
||
logger.info(
|
||
f"[dispatch_notification] Webhook inviato: "
|
||
f"HTTP {http_status} delivery={result.get('delivery_id')} "
|
||
f"canale={channel.name!r}"
|
||
)
|
||
|
||
elif channel_type == "email":
|
||
smtp_host = config.get("smtp_host")
|
||
smtp_port = int(config.get("smtp_port", 465))
|
||
from_email = config.get("from_email")
|
||
to_email = config.get("to_email")
|
||
smtp_user = config.get("smtp_user") or from_email
|
||
use_tls = config.get("smtp_use_tls", True)
|
||
use_starttls = config.get("smtp_use_starttls", False)
|
||
from_name = config.get("from_name", "PEChub Notifiche")
|
||
smtp_password = secret.get("smtp_password", "")
|
||
|
||
if not smtp_host or not from_email or not to_email:
|
||
raise ValueError("Configurazione SMTP incompleta")
|
||
|
||
from app.notifications.email_smtp import send_email_notification
|
||
subject = _build_email_subject(log.event_type, payload)
|
||
await send_email_notification(
|
||
smtp_host=smtp_host,
|
||
smtp_port=smtp_port,
|
||
smtp_user=smtp_user,
|
||
smtp_password=smtp_password,
|
||
from_email=from_email,
|
||
to_email=to_email,
|
||
subject=subject,
|
||
body_text=notif_text,
|
||
body_html=None,
|
||
from_name=from_name,
|
||
use_tls=use_tls,
|
||
use_starttls=use_starttls,
|
||
)
|
||
http_status = 200
|
||
success = True
|
||
logger.info(
|
||
f"[dispatch_notification] Email inviata a {to_email} "
|
||
f"canale={channel.name!r}"
|
||
)
|
||
|
||
elif channel_type == "whatsapp":
|
||
phone_number_id = config.get("phone_number_id")
|
||
to_phone = config.get("to_phone")
|
||
access_token = secret.get("access_token")
|
||
|
||
if not phone_number_id or not to_phone or not access_token:
|
||
raise ValueError("Configurazione WhatsApp incompleta")
|
||
|
||
from app.notifications.whatsapp import send_whatsapp_message
|
||
result = await send_whatsapp_message(
|
||
phone_number_id=phone_number_id,
|
||
to_phone=to_phone,
|
||
text=notif_text,
|
||
access_token=access_token,
|
||
)
|
||
http_status = result.get("http_status")
|
||
success = True
|
||
logger.info(
|
||
f"[dispatch_notification] WhatsApp inviato: "
|
||
f"message_id={result.get('message_id')} "
|
||
f"canale={channel.name!r}"
|
||
)
|
||
|
||
else:
|
||
raise ValueError(f"Tipo canale non supportato: {channel_type!r}")
|
||
|
||
except Exception as exc:
|
||
error_msg = str(exc)
|
||
logger.warning(
|
||
f"[dispatch_notification] Tentativo {current_attempt} fallito "
|
||
f"canale={channel.name!r} tipo={channel_type!r}: {error_msg}"
|
||
)
|
||
|
||
# ── Aggiorna stato log e canale ───────────────────────────────────────
|
||
|
||
if success:
|
||
log.status = "sent"
|
||
log.sent_at = datetime.now(UTC)
|
||
log.http_status = http_status
|
||
|
||
# Reset circuit breaker
|
||
channel.consecutive_failures = 0
|
||
channel.circuit_open_until = None
|
||
|
||
await db.commit()
|
||
logger.info(
|
||
f"[dispatch_notification] Notifica {notification_log_id} INVIATA "
|
||
f"canale={channel.name!r} tipo={channel_type!r}"
|
||
)
|
||
return {
|
||
"status": "sent",
|
||
"log_id": notification_log_id,
|
||
"channel": channel.name,
|
||
"channel_type": channel_type,
|
||
"attempt": current_attempt,
|
||
}
|
||
|
||
else:
|
||
# ── Retry o failure definitivo ────────────────────────────────────
|
||
log.last_error = error_msg
|
||
log.http_status = http_status
|
||
|
||
if current_attempt < log.max_attempts:
|
||
# Calcola delay backoff
|
||
delay_idx = min(current_attempt - 1, len(_RETRY_DELAYS) - 1)
|
||
delay_seconds = _RETRY_DELAYS[delay_idx]
|
||
next_retry = datetime.now(UTC) + timedelta(seconds=delay_seconds)
|
||
|
||
log.next_retry_at = next_retry
|
||
# Mantieni status "pending" per il retry
|
||
await db.commit()
|
||
|
||
# Re-enqueue con backoff
|
||
if redis_client:
|
||
try:
|
||
await redis_client.enqueue_job(
|
||
"dispatch_notification",
|
||
notification_log_id,
|
||
_defer_by=timedelta(seconds=delay_seconds),
|
||
)
|
||
logger.info(
|
||
f"[dispatch_notification] Retry tentativo "
|
||
f"{current_attempt + 1} schedulato in {delay_seconds}s "
|
||
f"per log {notification_log_id}"
|
||
)
|
||
except Exception as enqueue_err:
|
||
logger.error(
|
||
f"[dispatch_notification] Impossibile re-enqueue "
|
||
f"log {notification_log_id}: {enqueue_err}"
|
||
)
|
||
|
||
return {
|
||
"status": "retry",
|
||
"log_id": notification_log_id,
|
||
"attempt": current_attempt,
|
||
"next_retry_in_seconds": delay_seconds,
|
||
"error": error_msg,
|
||
}
|
||
|
||
else:
|
||
# Tutti i tentativi esauriti → FAILED
|
||
log.status = "failed"
|
||
|
||
# Aggiorna circuit breaker canale
|
||
channel.consecutive_failures += 1
|
||
if channel.consecutive_failures >= _CIRCUIT_FAILURE_THRESHOLD:
|
||
channel.circuit_open_until = datetime.now(UTC) + _CIRCUIT_OPEN_DURATION
|
||
logger.warning(
|
||
f"[dispatch_notification] Circuit breaker aperto per "
|
||
f"canale {channel.name!r} "
|
||
f"({channel.consecutive_failures} fallimenti consecutivi)"
|
||
)
|
||
|
||
await db.commit()
|
||
logger.error(
|
||
f"[dispatch_notification] Notifica {notification_log_id} FALLITA "
|
||
f"definitivamente dopo {current_attempt} tentativi: {error_msg}"
|
||
)
|
||
return {
|
||
"status": "failed",
|
||
"log_id": notification_log_id,
|
||
"channel": channel.name,
|
||
"attempts": current_attempt,
|
||
"error": error_msg,
|
||
}
|