diff --git a/backend/app/notifications/email_smtp.py b/backend/app/notifications/email_smtp.py new file mode 100644 index 0000000..de73c43 --- /dev/null +++ b/backend/app/notifications/email_smtp.py @@ -0,0 +1,161 @@ +""" +Email SMTP sender – invio notifiche via SMTP con TLS/SSL o STARTTLS. + +Config non sensibile (config): + { + "smtp_host": "smtp.example.com", + "smtp_port": 465, + "smtp_use_tls": true, # SSL/TLS diretto (porta 465) + "smtp_use_starttls": false, # STARTTLS (porta 587) – alternativo a use_tls + "from_email": "noreply@example.com", + "from_name": "PEChub Notifiche", + "to_email": "destinatario@example.com" + } + +Config sensibile (config_enc → config_secret): + { "smtp_password": "..." } + +Dipendenza: aiosmtplib (gia' in backend/pyproject.toml) +""" + +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +import aiosmtplib + +DEFAULT_TIMEOUT = 15.0 + + +class EmailSMTPError(Exception): + """Errore durante l'invio di un'email di notifica.""" + + def __init__(self, message: str, smtp_code: int | None = None): + super().__init__(message) + self.smtp_code = smtp_code + + +async def send_email_notification( + smtp_host: str, + smtp_port: int, + smtp_user: str, + smtp_password: str, + from_email: str, + to_email: str, + subject: str, + body_text: str, + body_html: str | None = None, + from_name: str = "PEChub Notifiche", + use_tls: bool = True, + use_starttls: bool = False, + timeout: float = DEFAULT_TIMEOUT, +) -> None: + """ + Invia un'email di notifica via SMTP. + + Args: + smtp_host: host SMTP + smtp_port: porta SMTP + smtp_user: username autenticazione + smtp_password: password autenticazione + from_email: indirizzo mittente + to_email: indirizzo destinatario + subject: oggetto email + body_text: testo plain + body_html: testo HTML (opzionale) + from_name: nome visualizzato mittente + use_tls: usa SSL/TLS diretto (porta 465) + use_starttls: usa STARTTLS (porta 587) – alternativo a use_tls + timeout: timeout connessione in secondi + + Raises: + EmailSMTPError: in caso di errori di autenticazione, connessione o invio + """ + msg = MIMEMultipart("alternative") + msg["Subject"] = subject + msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email + msg["To"] = to_email + msg["Date"] = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000") + msg["X-Mailer"] = "PEChub/1.0" + + msg.attach(MIMEText(body_text, "plain", "utf-8")) + if body_html: + msg.attach(MIMEText(body_html, "html", "utf-8")) + + try: + await aiosmtplib.send( + msg, + hostname=smtp_host, + port=smtp_port, + username=smtp_user, + password=smtp_password, + use_tls=use_tls, + start_tls=use_starttls, + timeout=timeout, + ) + except aiosmtplib.SMTPAuthenticationError as exc: + raise EmailSMTPError( + f"Autenticazione SMTP fallita per {smtp_user}@{smtp_host}: {exc}", + smtp_code=535, + ) from exc + except aiosmtplib.SMTPConnectError as exc: + raise EmailSMTPError( + f"Connessione SMTP fallita a {smtp_host}:{smtp_port}: {exc}" + ) from exc + except aiosmtplib.SMTPServerDisconnected as exc: + raise EmailSMTPError( + f"Server SMTP {smtp_host} ha chiuso la connessione: {exc}" + ) from exc + except aiosmtplib.SMTPException as exc: + raise EmailSMTPError(f"Errore SMTP: {exc}") from exc + except Exception as exc: + raise EmailSMTPError(f"Errore invio email: {exc}") from exc + + +async def send_test_email( + smtp_host: str, + smtp_port: int, + smtp_user: str, + smtp_password: str, + from_email: str, + to_email: str, + channel_name: str = "PEChub", + from_name: str = "PEChub Notifiche", + use_tls: bool = True, + use_starttls: bool = False, +) -> None: + """ + Invia un'email di test per verificare la configurazione del canale. + + Raises: + EmailSMTPError: se la connessione o l'autenticazione falliscono + """ + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + subject = f"[PEChub] Test canale email: {channel_name}" + body_text = ( + f"PEChub – Test canale Email\n\n" + f"Il canale '{channel_name}' e' configurato correttamente.\n\n" + f"Data/ora: {ts}\n" + f"Destinatario: {to_email}" + ) + body_html = ( + f"
Il canale {channel_name} e' configurato correttamente.
" + f"Data/ora: {ts}
"
+ f"Destinatario: {to_email}
Inviato da PEChub Notification Engine
" + ) + await send_email_notification( + smtp_host=smtp_host, + smtp_port=smtp_port, + smtp_user=smtp_user, + smtp_password=smtp_password, + from_email=from_email, + to_email=to_email, + subject=subject, + body_text=body_text, + body_html=body_html, + from_name=from_name, + use_tls=use_tls, + use_starttls=use_starttls, + ) diff --git a/backend/app/notifications/webhook.py b/backend/app/notifications/webhook.py new file mode 100644 index 0000000..c9e4db8 --- /dev/null +++ b/backend/app/notifications/webhook.py @@ -0,0 +1,125 @@ +""" +Webhook sender – POST HTTP con firma HMAC-SHA256. + +Config non sensibile (config): + { "url": "https://...", "content_type": "application/json" } + +Config sensibile (config_enc → config_secret): + { "webhook_secret": "..." } # opzionale – usato per firma HMAC + +Header inviati: + Content-Type: application/json + X-PEChub-Event: {event_type} + X-Hub-Signature-256: sha256={hex} (solo se webhook_secret configurato) + X-Delivery: {uuid} + User-Agent: PEChub-Webhook/1.0 +""" + +import hashlib +import hmac +import json +import uuid as uuid_mod +from datetime import datetime + +import httpx + +DEFAULT_TIMEOUT = 10.0 + + +class WebhookError(Exception): + """Errore durante l'invio di una notifica webhook.""" + + def __init__(self, message: str, http_status: int | None = None): + super().__init__(message) + self.http_status = http_status + + +async def send_webhook( + url: str, + payload: dict, + event_type: str = "new_message", + webhook_secret: str | None = None, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """ + Invia un payload JSON a un webhook URL. + + Args: + url: URL destinatario del webhook + payload: dict serializzato come JSON nel body + event_type: valore dell'header X-PEChub-Event + webhook_secret: segreto per firma HMAC-SHA256 (opzionale) + timeout: timeout HTTP in secondi + + Returns: + dict con http_status, response_text, delivery_id + + Raises: + WebhookError: in caso di timeout, errore di rete o HTTP >= 400 + """ + body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8") + delivery_id = str(uuid_mod.uuid4()) + + headers = { + "Content-Type": "application/json", + "X-PEChub-Event": event_type, + "X-Delivery": delivery_id, + "User-Agent": "PEChub-Webhook/1.0", + } + + if webhook_secret: + sig = hmac.new( + webhook_secret.encode("utf-8"), + body, + hashlib.sha256, + ).hexdigest() + headers["X-Hub-Signature-256"] = f"sha256={sig}" + + async with httpx.AsyncClient(timeout=timeout) as client: + try: + response = await client.post(url, content=body, headers=headers) + except httpx.TimeoutException as exc: + raise WebhookError( + f"Timeout webhook dopo {timeout}s" + ) from exc + except httpx.RequestError as exc: + raise WebhookError(f"Errore di rete webhook: {exc}") from exc + + if response.status_code >= 400: + raise WebhookError( + f"Webhook ha risposto con HTTP {response.status_code}: " + f"{response.text[:200]}", + http_status=response.status_code, + ) + + return { + "http_status": response.status_code, + "response_text": response.text[:500], + "delivery_id": delivery_id, + } + + +async def send_test_webhook( + url: str, + webhook_secret: str | None = None, + channel_name: str = "PEChub", +) -> dict: + """Invia un payload di test al webhook per verificare la configurazione.""" + payload = { + "event": "test", + "channel": channel_name, + "timestamp": datetime.now().isoformat(), + "message": "Notifica di test da PEChub", + "data": { + "subject": "[TEST] PEC di prova", + "from_address": "test@pec.example.it", + "pec_type": "posta_certificata", + "direction": "inbound", + }, + } + return await send_webhook( + url=url, + payload=payload, + event_type="test", + webhook_secret=webhook_secret, + ) diff --git a/backend/app/notifications/whatsapp.py b/backend/app/notifications/whatsapp.py new file mode 100644 index 0000000..2be0e0e --- /dev/null +++ b/backend/app/notifications/whatsapp.py @@ -0,0 +1,125 @@ +""" +WhatsApp sender – Meta Cloud API v18. + +Config non sensibile (config): + { + "phone_number_id": "123456789", # ID numero mittente Meta Business + "to_phone": "+393331234567" # numero destinatario con prefisso + } + +Config sensibile (config_enc → config_secret): + { "access_token": "EAABs..." } # Meta Graph API token + +API endpoint: + POST https://graph.facebook.com/v18.0/{phone_number_id}/messages + +Nota: richiede un account Meta Business verificato con WhatsApp Business API. +""" + +import httpx + +META_GRAPH_API_URL = "https://graph.facebook.com/v18.0" +DEFAULT_TIMEOUT = 10.0 + + +class WhatsAppError(Exception): + """Errore durante l'invio di un messaggio WhatsApp.""" + + def __init__(self, message: str, http_status: int | None = None, api_code: int | None = None): + super().__init__(message) + self.http_status = http_status + self.api_code = api_code + + +async def send_whatsapp_message( + phone_number_id: str, + to_phone: str, + text: str, + access_token: str, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """ + Invia un messaggio di testo WhatsApp via Meta Cloud API. + + Args: + phone_number_id: ID del numero WhatsApp Business mittente + to_phone: numero destinatario (formato E.164, es. +393331234567) + text: testo del messaggio + access_token: Meta Graph API Bearer token + timeout: timeout HTTP in secondi + + Returns: + dict con message_id dalla risposta API + + Raises: + WhatsAppError: in caso di errore HTTP o risposta API non-ok + """ + url = f"{META_GRAPH_API_URL}/{phone_number_id}/messages" + payload = { + "messaging_product": "whatsapp", + "to": to_phone.replace(" ", "").replace("-", ""), + "type": "text", + "text": {"body": text}, + } + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient(timeout=timeout) as client: + try: + response = await client.post(url, json=payload, headers=headers) + except httpx.TimeoutException as exc: + raise WhatsAppError( + f"Timeout WhatsApp API dopo {timeout}s" + ) from exc + except httpx.RequestError as exc: + raise WhatsAppError(f"Errore di rete WhatsApp: {exc}") from exc + + if response.status_code == 401: + raise WhatsAppError( + "Token Meta non valido o scaduto", + http_status=401, + ) + + if response.status_code >= 400: + try: + err_data = response.json() + err_msg = err_data.get("error", {}).get("message", response.text[:200]) + err_code = err_data.get("error", {}).get("code") + except Exception: + err_msg = response.text[:200] + err_code = None + raise WhatsAppError( + f"Meta API errore HTTP {response.status_code}: {err_msg}", + http_status=response.status_code, + api_code=err_code, + ) + + data = response.json() + messages = data.get("messages", []) + message_id = messages[0].get("id") if messages else None + return {"message_id": message_id, "http_status": response.status_code} + + +async def send_test_whatsapp( + phone_number_id: str, + to_phone: str, + access_token: str, + channel_name: str = "PEChub", +) -> dict: + """Invia un messaggio WhatsApp di test per verificare la configurazione.""" + from datetime import datetime + + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + text = ( + f"*PEChub – Test canale WhatsApp*\n\n" + f"Il canale _{channel_name}_ e' configurato correttamente.\n\n" + f"Data/ora: {ts}" + ) + return await send_whatsapp_message( + phone_number_id=phone_number_id, + to_phone=to_phone, + text=text, + access_token=access_token, + ) diff --git a/backend/app/services/notification_service.py b/backend/app/services/notification_service.py index 699f397..1875d3b 100644 --- a/backend/app/services/notification_service.py +++ b/backend/app/services/notification_service.py @@ -1,20 +1,24 @@ """ -Servizio Notifiche Multi-canale – CRUD canali, regole, log. +Servizio Notifiche Multi-canale – CRUD canali, regole, log + dispatch. -Nota: la cifratura AES-256-GCM di config_enc avviene qui usando -la NOTIFICATION_SECRET_KEY dalla config. Per semplicità in questo -stub usiamo Fernet (libreria cryptography), facilmente sostituibile -con una implementazione GCM dedicata. +Cifratura: AES-256-GCM via libreria cryptography. + Formato config_enc: base64( nonce(12) || ciphertext+tag ) + Chiave: ENCRYPTION_KEY (hex 64 char = 32 byte) dalla config. + +Backward compatibility: se il valore non decrittografa come GCM, viene +tentato il fallback a base64 grezzo (configurazioni precedenti al fix). """ import base64 import json +import logging +import os import uuid from datetime import datetime, timezone +from cryptography.hazmat.primitives.ciphers.aead import AESGCM from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload from app.config import get_settings from app.core.exceptions import NotFoundError @@ -27,20 +31,53 @@ from app.schemas.notification import ( NotificationRuleUpdate, ) +logger = logging.getLogger(__name__) settings = get_settings() -def _encrypt(data: dict) -> str: - """Cifra un dict JSON → base64. Usa la SECRET_KEY come seed.""" - # In produzione: usa AES-256-GCM. Qui: semplice base64 con marker. - raw = json.dumps(data).encode() - return base64.b64encode(raw).decode() +# ─── Cifratura AES-256-GCM ──────────────────────────────────────────────────── + +def _encrypt(data: dict, key: bytes | None = None) -> str: + """ + Cifra un dict JSON con AES-256-GCM. + + Formato output: base64( nonce(12 byte) || ciphertext+tag(16 byte) ) + """ + if key is None: + key = settings.encryption_key_bytes + nonce = os.urandom(12) + aesgcm = AESGCM(key) + plaintext = json.dumps(data, ensure_ascii=False).encode("utf-8") + ciphertext = aesgcm.encrypt(nonce, plaintext, None) # include tag + return base64.b64encode(nonce + ciphertext).decode("ascii") -def _decrypt(enc: str) -> dict: - """Decifra il valore restituito da _encrypt.""" - raw = base64.b64decode(enc.encode()) - return json.loads(raw.decode()) +def _decrypt(enc: str, key: bytes | None = None) -> dict: + """ + Decifra il valore prodotto da _encrypt. + + Backward compatible: se il dato non e' GCM valido, prova il + vecchio base64 grezzo (usato prima del fix di sicurezza). + """ + if key is None: + key = settings.encryption_key_bytes + try: + raw = base64.b64decode(enc.encode("ascii")) + if len(raw) > 28: # 12 nonce + 16 tag minimo + nonce = raw[:12] + ciphertext = raw[12:] + aesgcm = AESGCM(key) + plaintext = aesgcm.decrypt(nonce, ciphertext, None) + return json.loads(plaintext.decode("utf-8")) + except Exception: + pass + + # Fallback: base64 grezzo (configurazioni create prima del fix GCM) + try: + raw = base64.b64decode(enc.encode("ascii")) + return json.loads(raw.decode("utf-8")) + except Exception: + return {} class NotificationService: @@ -113,6 +150,7 @@ class NotificationService: if data.config is not None: channel.config = data.config if data.config_secret is not None: + # Re-cifra sempre con AES-256-GCM (aggiorna anche i vecchi base64) channel.config_enc = _encrypt(data.config_secret) await self.db.flush() @@ -128,18 +166,16 @@ class NotificationService: self, channel_id: uuid.UUID, tenant_id: uuid.UUID ) -> ChannelTestResult: """ - Invia un messaggio di test al canale configurato. + Invia un messaggio di test reale al canale configurato. - Questa implementazione stub restituisce sempre successo se il canale - è attivo e configurato. Una implementazione completa fa una chiamata - reale al canale (HTTP/SMTP/Telegram/WhatsApp). + Esegue invio effettivo per Telegram, Webhook, Email SMTP e WhatsApp. """ channel = await self.get_channel(channel_id, tenant_id) if not channel.is_active: return ChannelTestResult( success=False, - message="Il canale è disabilitato", + message="Il canale e' disabilitato", ) if channel.circuit_open_until and channel.circuit_open_until > datetime.now(timezone.utc): @@ -148,21 +184,23 @@ class NotificationService: message=f"Circuit breaker aperto fino a {channel.circuit_open_until.isoformat()}", ) - # Validazione configurazione minima per tipo canale config = channel.config or {} + secret = {} + if channel.config_enc: + try: + secret = _decrypt(channel.config_enc) + except Exception as e: + return ChannelTestResult( + success=False, + message=f"Errore decifratura configurazione sensibile: {e}", + ) + channel_type = channel.channel_type - if channel_type == "webhook": - if not config.get("url"): - return ChannelTestResult(success=False, message="URL webhook non configurato") - elif channel_type == "email": - if not config.get("to_email"): - return ChannelTestResult(success=False, message="Email destinatario non configurata") - elif channel_type == "telegram": + # ── Telegram ────────────────────────────────────────────────────────── + if channel_type == "telegram": if not config.get("chat_id"): return ChannelTestResult(success=False, message="Chat ID Telegram non configurato") - # Invio reale via Bot API - secret = _decrypt(channel.config_enc) if channel.config_enc else {} bot_token = secret.get("bot_token") if not bot_token: return ChannelTestResult(success=False, message="Bot token Telegram non configurato") @@ -176,28 +214,128 @@ class NotificationService: msg_id = result.get("message_id") return ChannelTestResult( success=True, - message=f"Messaggio Telegram inviato con successo (message_id={msg_id}).", + message=f"Messaggio Telegram inviato (message_id={msg_id}).", http_status=200, ) - except TelegramError as exc: - return ChannelTestResult( - success=False, - message=f"Errore Telegram: {exc}", - http_status=exc.http_status, - ) except Exception as exc: return ChannelTestResult( success=False, - message=f"Errore imprevisto durante il test Telegram: {exc}", + message=f"Errore Telegram: {exc}", ) - elif channel_type == "whatsapp": - if not config.get("phone_number"): - return ChannelTestResult(success=False, message="Numero WhatsApp non configurato") + # ── Webhook ─────────────────────────────────────────────────────────── + elif channel_type == "webhook": + url = config.get("url") + if not url: + return ChannelTestResult(success=False, message="URL webhook non configurato") + webhook_secret = secret.get("webhook_secret") + try: + from app.notifications.webhook import WebhookError, send_test_webhook + result = await send_test_webhook( + url=url, + webhook_secret=webhook_secret, + channel_name=channel.name, + ) + return ChannelTestResult( + success=True, + message=( + f"Webhook raggiunto con successo " + f"(HTTP {result['http_status']}, delivery={result['delivery_id']})." + ), + http_status=result["http_status"], + ) + except Exception as exc: + http_status = getattr(exc, "http_status", None) + return ChannelTestResult( + success=False, + message=f"Errore webhook: {exc}", + http_status=http_status, + ) + + # ── Email SMTP ──────────────────────────────────────────────────────── + elif channel_type == "email": + smtp_host = config.get("smtp_host") + smtp_port = config.get("smtp_port", 465) + from_email = config.get("from_email") + to_email = config.get("to_email") + smtp_user = config.get("smtp_user") or from_email + use_tls = config.get("smtp_use_tls", True) + use_starttls = config.get("smtp_use_starttls", False) + from_name = config.get("from_name", "PEChub Notifiche") + smtp_password = secret.get("smtp_password", "") + + if not smtp_host: + return ChannelTestResult(success=False, message="Host SMTP non configurato") + if not from_email: + return ChannelTestResult(success=False, message="Email mittente non configurata") + if not to_email: + return ChannelTestResult(success=False, message="Email destinatario non configurata") + if not smtp_password: + return ChannelTestResult(success=False, message="Password SMTP non configurata") + + try: + from app.notifications.email_smtp import EmailSMTPError, send_test_email + await send_test_email( + smtp_host=smtp_host, + smtp_port=int(smtp_port), + smtp_user=smtp_user, + smtp_password=smtp_password, + from_email=from_email, + to_email=to_email, + channel_name=channel.name, + from_name=from_name, + use_tls=use_tls, + use_starttls=use_starttls, + ) + return ChannelTestResult( + success=True, + message=f"Email di test inviata con successo a {to_email}.", + http_status=200, + ) + except Exception as exc: + return ChannelTestResult( + success=False, + message=f"Errore email: {exc}", + ) + + # ── WhatsApp ────────────────────────────────────────────────────────── + elif channel_type == "whatsapp": + phone_number_id = config.get("phone_number_id") + to_phone = config.get("to_phone") + access_token = secret.get("access_token") + + if not phone_number_id: + return ChannelTestResult(success=False, message="phone_number_id non configurato") + if not to_phone: + return ChannelTestResult(success=False, message="Numero WhatsApp destinatario non configurato") + if not access_token: + return ChannelTestResult(success=False, message="Access token Meta non configurato") + + try: + from app.notifications.whatsapp import WhatsAppError, send_test_whatsapp + result = await send_test_whatsapp( + phone_number_id=phone_number_id, + to_phone=to_phone, + access_token=access_token, + channel_name=channel.name, + ) + return ChannelTestResult( + success=True, + message=f"Messaggio WhatsApp inviato (message_id={result.get('message_id')}).", + http_status=200, + ) + except Exception as exc: + http_status = getattr(exc, "http_status", None) + return ChannelTestResult( + success=False, + message=f"Errore WhatsApp: {exc}", + http_status=http_status, + ) + + # ── Tipo sconosciuto ────────────────────────────────────────────────── return ChannelTestResult( - success=True, - message=f"Canale {channel_type} configurato correttamente. Test simulato con successo.", - http_status=200, + success=False, + message=f"Tipo canale '{channel_type}' non supportato", ) # ─── Rules ─────────────────────────────────────────────────────────────── diff --git a/worker/app/imap/sync.py b/worker/app/imap/sync.py index 6cba6a9..2ff0f2d 100644 --- a/worker/app/imap/sync.py +++ b/worker/app/imap/sync.py @@ -35,6 +35,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings +from app.jobs.dispatch_notification import evaluate_and_enqueue_notifications from app.jobs.index_message import index_message from app.models import Attachment, Mailbox, Message from app.parsers.eml_parser import parse_eml @@ -646,6 +647,18 @@ async def _save_message( # sia il messaggio che gli allegati dalla sessione corrente. await index_message(message.id, db) + # ── Valutazione e accodamento notifiche (non bloccante) ─────────────────── + # Solo per messaggi inbound: le ricevute PEC e la posta in arrivo + # possono triggerare regole di notifica configurate dal tenant. + # I messaggi outbound (Sent) non generano notifiche automatiche. + if direction == "inbound": + await evaluate_and_enqueue_notifications( + message=message, + mailbox=mailbox, + db=db, + redis_client=redis_client, + ) + return True diff --git a/worker/app/jobs/dispatch_notification.py b/worker/app/jobs/dispatch_notification.py new file mode 100644 index 0000000..5419fca --- /dev/null +++ b/worker/app/jobs/dispatch_notification.py @@ -0,0 +1,666 @@ +""" +Job arq: dispatch_notification – invio effettivo delle notifiche multi-canale. + +Flusso completo: + 1. sync.py salva un nuovo messaggio inbound + 2. sync.py chiama evaluate_and_enqueue_notifications() (questa funzione) + 3. evaluate_and_enqueue_notifications(): + - legge le NotificationRule attive del tenant con event_type="new_message" + - applica i filtri opzionali (mailbox_id, direction, pec_type) + - per ogni regola che matcha: crea NotificationLog(status=pending) + - enqueue del job arq dispatch_notification con defer 5s + (per dare tempo al DB di committare prima che il job legga) + 4. dispatch_notification(ctx, notification_log_id): + - legge NotificationLog + NotificationChannel dal DB + - controlla circuit breaker + - decifra config_enc con AES-256-GCM + - chiama il sender appropriato (telegram/webhook/email/whatsapp) + - aggiorna status: sent / failed + - in caso di fallimento: re-enqueue con backoff esponenziale + - aggiorna circuit breaker dopo fallimenti consecutivi + +Circuit breaker: + - 5+ fallimenti consecutivi → apre per 1 ora + - reset automatico al primo successo + +Retry backoff (max_attempts default=3): + Tentativo 1 fallisce → attendi 5 min → tentativo 2 + Tentativo 2 fallisce → attendi 30 min → tentativo 3 + Tentativo 3 fallisce → FAILED definitivo + +Cifratura config_enc: + AES-256-GCM – stesso schema di notification_service.py nel backend. + Backward compatible: fallback a base64 grezzo per configurazioni pre-fix. +""" + +import base64 +import json +import logging +import os +import uuid +from datetime import UTC, datetime, timedelta +from typing import Any + +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from sqlalchemy import select +from sqlalchemy.orm import selectinload + +from app.config import get_settings +from app.database import AsyncSessionLocal +from app.models import ( + Mailbox, + Message, + NotificationChannel, + NotificationLog, + NotificationRule, +) + +logger = logging.getLogger(__name__) +settings = get_settings() + +# ─── Backoff retry (secondi) per tentativo N fallito (0-based) ─────────────── +_RETRY_DELAYS = [ + 5 * 60, # dopo tentativo 1: 5 minuti + 30 * 60, # dopo tentativo 2: 30 minuti + 120 * 60, # dopo tentativo 3: 2 ore (non raggiunto: max_attempts=3) +] + +# ─── Circuit breaker ────────────────────────────────────────────────────────── +_CIRCUIT_FAILURE_THRESHOLD = 5 # fallimenti consecutivi per aprire il circuito +_CIRCUIT_OPEN_DURATION = timedelta(hours=1) + + +# ─── Cifratura AES-256-GCM ──────────────────────────────────────────────────── + +def _decrypt_config(enc: str) -> dict: + """ + Decifra config_enc AES-256-GCM. + + Backward compatible: se non e' GCM valido, prova fallback base64 grezzo. + """ + key = settings.encryption_key_bytes + try: + raw = base64.b64decode(enc.encode("ascii")) + if len(raw) > 28: # 12 nonce + 16 tag minimo + nonce = raw[:12] + ciphertext = raw[12:] + aesgcm = AESGCM(key) + plaintext = aesgcm.decrypt(nonce, ciphertext, None) + return json.loads(plaintext.decode("utf-8")) + except Exception: + pass + + # Fallback: base64 grezzo (configurazioni pre-fix sicurezza) + try: + raw = base64.b64decode(enc.encode("ascii")) + return json.loads(raw.decode("utf-8")) + except Exception: + return {} + + +# ─── Valutazione filtri regola ──────────────────────────────────────────────── + +def _matches_filter( + filter_data: dict | None, + message: Message, + mailbox: Mailbox, +) -> bool: + """ + Valuta se un messaggio soddisfa i filtri opzionali di una regola. + + Filtri supportati: + mailbox_id: str | list[str] – UUID casella + direction: str – "inbound" | "outbound" + pec_type: str | list[str] – tipo messaggio PEC + """ + if not filter_data: + return True + + # Filtro per mailbox + if "mailbox_id" in filter_data: + allowed = filter_data["mailbox_id"] + if isinstance(allowed, list): + if str(message.mailbox_id) not in [str(a) for a in allowed]: + return False + else: + if str(message.mailbox_id) != str(allowed): + return False + + # Filtro per direzione + if "direction" in filter_data: + if message.direction != filter_data["direction"]: + return False + + # Filtro per tipo PEC + if "pec_type" in filter_data: + allowed = filter_data["pec_type"] + if isinstance(allowed, list): + if message.pec_type not in allowed: + return False + else: + if message.pec_type != allowed: + return False + + return True + + +# ─── Costruzione testo notifica ─────────────────────────────────────────────── + +def _build_notification_text( + event_type: str, + payload: dict, + channel_type: str, + channel_name: str = "", +) -> str: + """ + Costruisce il testo della notifica in base all'evento e al tipo canale. + """ + if event_type == "new_message": + subject = payload.get("subject") or "(senza oggetto)" + from_addr = payload.get("from_address") or "mittente sconosciuto" + mailbox_email = payload.get("mailbox_email") or payload.get("mailbox_id", "") + pec_type = payload.get("pec_type", "posta_certificata") + received_at = payload.get("received_at", "") + + # Traduzione tipo PEC + pec_type_labels = { + "posta_certificata": "PEC", + "accettazione": "Ricevuta di Accettazione", + "non_accettazione": "Ricevuta di Non Accettazione", + "presa_in_carico": "Ricevuta di Presa in Carico", + "avvenuta_consegna": "Ricevuta di Avvenuta Consegna", + "mancata_consegna": "Ricevuta di Mancata Consegna", + "errore_consegna": "Ricevuta di Errore Consegna", + "preavviso_mancata_consegna": "Preavviso Mancata Consegna", + "rilevazione_virus": "Rilevazione Virus", + "unknown": "Messaggio", + } + tipo_label = pec_type_labels.get(pec_type, pec_type) + + if channel_type == "telegram": + return ( + f"Nuovo messaggio PEC\n\n" + f"Tipo: {tipo_label}\n" + f"Da: {from_addr}\n" + f"Casella: {mailbox_email}\n" + f"Oggetto: {subject}" + ) + else: + return ( + f"Nuovo messaggio PEC\n" + f"Tipo: {tipo_label}\n" + f"Da: {from_addr}\n" + f"Casella: {mailbox_email}\n" + f"Oggetto: {subject}" + ) + + elif event_type == "state_changed": + message_id = payload.get("message_id", "") + old_state = payload.get("old_state", "") + new_state = payload.get("new_state", "") + subject = payload.get("subject", "") + return ( + f"Cambio stato PEC\n" + f"Oggetto: {subject}\n" + f"Stato: {old_state} -> {new_state}" + ) + + else: + return f"Evento PEChub: {event_type}\n{json.dumps(payload, ensure_ascii=False, default=str)}" + + +def _build_notification_payload(event_type: str, payload: dict) -> dict: + """Costruisce il payload JSON per il webhook.""" + return { + "event": event_type, + "timestamp": datetime.now(UTC).isoformat(), + "data": payload, + } + + +def _build_email_subject(event_type: str, payload: dict) -> str: + """Costruisce l'oggetto dell'email di notifica.""" + if event_type == "new_message": + subject = payload.get("subject") or "(senza oggetto)" + return f"[PEChub] Nuovo messaggio PEC: {subject}" + elif event_type == "state_changed": + return f"[PEChub] Cambio stato PEC: {payload.get('new_state', '')}" + return f"[PEChub] Evento: {event_type}" + + +# ─── Evaluate & Enqueue ─────────────────────────────────────────────────────── + +async def evaluate_and_enqueue_notifications( + message: Message, + mailbox: Mailbox, + db: Any, # AsyncSession – evito import circolare + redis_client: Any, # ArqRedis +) -> None: + """ + Valuta le regole di notifica per un messaggio appena salvato e accoda i job. + + Chiamata da sync.py dopo _save_message e index_message. + Non solleva eccezioni: gli errori vengono loggati ma non propagati per + non interrompere il flusso di sincronizzazione IMAP. + + Args: + message: messaggio appena salvato nel DB (flush, non commit) + mailbox: casella di appartenenza + db: sessione DB (open, con flush del messaggio) + redis_client: ArqRedis per enqueue_job + """ + try: + await _do_evaluate_and_enqueue(message, mailbox, db, redis_client) + except Exception as exc: + logger.error( + f"Errore evaluate_and_enqueue_notifications per messaggio " + f"{message.id}: {exc}", + exc_info=True, + ) + + +async def _do_evaluate_and_enqueue( + message: Message, + mailbox: Mailbox, + db: Any, + redis_client: Any, +) -> None: + """Logica interna – puo' sollevare eccezioni.""" + + # Carica regole attive per questo tenant con event_type = "new_message" + rules_result = await db.execute( + select(NotificationRule) + .options(selectinload(NotificationRule.channel)) + .where( + NotificationRule.tenant_id == message.tenant_id, + NotificationRule.is_active == True, # noqa: E712 + NotificationRule.event_type == "new_message", + ) + ) + rules: list[NotificationRule] = list(rules_result.scalars().all()) + + if not rules: + return + + enqueued_count = 0 + + for rule in rules: + channel = rule.channel + if not channel or not channel.is_active: + continue + + # Controlla circuit breaker + if ( + channel.circuit_open_until + and channel.circuit_open_until > datetime.now(UTC) + ): + logger.debug( + f"[notify] Canale {channel.name!r} circuit aperto fino a " + f"{channel.circuit_open_until}, skip regola {rule.name!r}" + ) + continue + + # Applica filtri + if not _matches_filter(rule.filter, message, mailbox): + continue + + # Costruisce il payload dell'evento + event_payload = { + "message_id": str(message.id), + "mailbox_id": str(mailbox.id), + "mailbox_email": mailbox.email_address, + "subject": message.subject or "", + "from_address": message.from_address or "", + "to_addresses": list(message.to_addresses or []), + "pec_type": message.pec_type, + "direction": message.direction, + "received_at": ( + message.received_at.isoformat() + if message.received_at + else None + ), + } + + # Crea NotificationLog + log = NotificationLog( + id=uuid.uuid4(), + tenant_id=message.tenant_id, + channel_id=rule.channel_id, + rule_id=rule.id, + event_type="new_message", + event_payload=event_payload, + status="pending", + attempt_count=0, + max_attempts=3, + ) + db.add(log) + await db.flush() # ottieni log.id + + # Enqueue arq job con defer 5s per attendere il commit DB + try: + await redis_client.enqueue_job( + "dispatch_notification", + str(log.id), + _defer_by=timedelta(seconds=5), + ) + enqueued_count += 1 + logger.info( + f"[notify] Enqueued dispatch_notification per regola " + f"{rule.name!r} -> canale {channel.name!r} " + f"(log_id={log.id})" + ) + except Exception as e: + logger.error( + f"[notify] Impossibile enqueue dispatch_notification " + f"per log {log.id}: {e}" + ) + + if enqueued_count > 0: + logger.info( + f"[notify] Messaggio {message.id}: " + f"{enqueued_count} notifiche accodate" + ) + + +# ─── Job arq principale ─────────────────────────────────────────────────────── + +async def dispatch_notification( + ctx: dict[str, Any], + notification_log_id: str, +) -> dict: + """ + Job arq: legge un NotificationLog e invia la notifica al canale configurato. + + Args: + ctx: contesto arq (ctx["redis"] = ArqRedis) + notification_log_id: UUID del NotificationLog da processare + + Returns: + dict con status e dettagli + """ + redis_client = ctx.get("redis") + + async with AsyncSessionLocal() as db: + log_uuid = uuid.UUID(notification_log_id) + + # ── Carica log + canale ─────────────────────────────────────────────── + log_result = await db.execute( + select(NotificationLog) + .options(selectinload(NotificationLog.channel)) + .where(NotificationLog.id == log_uuid) + ) + log: NotificationLog | None = log_result.scalar_one_or_none() + + if not log: + logger.warning( + f"[dispatch_notification] NotificationLog {notification_log_id} non trovato" + ) + return {"status": "not_found", "log_id": notification_log_id} + + if log.status in ("sent", "failed", "skipped"): + logger.debug( + f"[dispatch_notification] Log {notification_log_id} " + f"gia' in stato {log.status!r}, skip" + ) + return {"status": "already_processed", "current_status": log.status} + + channel = log.channel + if not channel or not channel.is_active: + log.status = "skipped" + await db.commit() + return {"status": "skipped", "reason": "channel_inactive"} + + # ── Circuit breaker ─────────────────────────────────────────────────── + if ( + channel.circuit_open_until + and channel.circuit_open_until > datetime.now(UTC) + ): + log.status = "skipped" + await db.commit() + logger.info( + f"[dispatch_notification] Canale {channel.name!r} circuit aperto, " + f"log {notification_log_id} marcato skipped" + ) + return {"status": "skipped", "reason": "circuit_open"} + + # ── Incrementa contatore tentativi ──────────────────────────────────── + log.attempt_count += 1 + current_attempt = log.attempt_count + + # ── Decifra config sensibile ────────────────────────────────────────── + secret: dict = {} + if channel.config_enc: + try: + secret = _decrypt_config(channel.config_enc) + except Exception as e: + log.status = "failed" + log.last_error = f"Errore decifratura config: {e}" + await db.commit() + logger.error( + f"[dispatch_notification] Errore decifratura canale " + f"{channel.name!r}: {e}" + ) + return {"status": "failed", "error": str(e)} + + config = channel.config or {} + payload = log.event_payload or {} + channel_type = channel.channel_type + + # ── Testo notifica ──────────────────────────────────────────────────── + notif_text = _build_notification_text( + event_type=log.event_type, + payload=payload, + channel_type=channel_type, + channel_name=channel.name, + ) + + # ── Dispatch al sender ──────────────────────────────────────────────── + success = False + error_msg: str | None = None + http_status: int | None = None + + try: + if channel_type == "telegram": + bot_token = secret.get("bot_token") + chat_id = str(config.get("chat_id", "")) + if not bot_token or not chat_id: + raise ValueError("bot_token o chat_id non configurati") + + from app.notifications.telegram import send_message + result = await send_message( + bot_token=bot_token, + chat_id=chat_id, + text=notif_text, + parse_mode="HTML", + ) + http_status = 200 + success = True + logger.info( + f"[dispatch_notification] Telegram inviato: " + f"message_id={result.get('message_id')} " + f"canale={channel.name!r}" + ) + + elif channel_type == "webhook": + url = config.get("url") + if not url: + raise ValueError("URL webhook non configurato") + webhook_secret_val = secret.get("webhook_secret") + + from app.notifications.webhook import send_webhook + result = await send_webhook( + url=url, + payload=_build_notification_payload(log.event_type, payload), + event_type=log.event_type, + webhook_secret=webhook_secret_val, + ) + http_status = result.get("http_status") + success = True + logger.info( + f"[dispatch_notification] Webhook inviato: " + f"HTTP {http_status} delivery={result.get('delivery_id')} " + f"canale={channel.name!r}" + ) + + elif channel_type == "email": + smtp_host = config.get("smtp_host") + smtp_port = int(config.get("smtp_port", 465)) + from_email = config.get("from_email") + to_email = config.get("to_email") + smtp_user = config.get("smtp_user") or from_email + use_tls = config.get("smtp_use_tls", True) + use_starttls = config.get("smtp_use_starttls", False) + from_name = config.get("from_name", "PEChub Notifiche") + smtp_password = secret.get("smtp_password", "") + + if not smtp_host or not from_email or not to_email: + raise ValueError("Configurazione SMTP incompleta") + + from app.notifications.email_smtp import send_email_notification + subject = _build_email_subject(log.event_type, payload) + await send_email_notification( + smtp_host=smtp_host, + smtp_port=smtp_port, + smtp_user=smtp_user, + smtp_password=smtp_password, + from_email=from_email, + to_email=to_email, + subject=subject, + body_text=notif_text, + body_html=None, + from_name=from_name, + use_tls=use_tls, + use_starttls=use_starttls, + ) + http_status = 200 + success = True + logger.info( + f"[dispatch_notification] Email inviata a {to_email} " + f"canale={channel.name!r}" + ) + + elif channel_type == "whatsapp": + phone_number_id = config.get("phone_number_id") + to_phone = config.get("to_phone") + access_token = secret.get("access_token") + + if not phone_number_id or not to_phone or not access_token: + raise ValueError("Configurazione WhatsApp incompleta") + + from app.notifications.whatsapp import send_whatsapp_message + result = await send_whatsapp_message( + phone_number_id=phone_number_id, + to_phone=to_phone, + text=notif_text, + access_token=access_token, + ) + http_status = result.get("http_status") + success = True + logger.info( + f"[dispatch_notification] WhatsApp inviato: " + f"message_id={result.get('message_id')} " + f"canale={channel.name!r}" + ) + + else: + raise ValueError(f"Tipo canale non supportato: {channel_type!r}") + + except Exception as exc: + error_msg = str(exc) + logger.warning( + f"[dispatch_notification] Tentativo {current_attempt} fallito " + f"canale={channel.name!r} tipo={channel_type!r}: {error_msg}" + ) + + # ── Aggiorna stato log e canale ─────────────────────────────────────── + + if success: + log.status = "sent" + log.sent_at = datetime.now(UTC) + log.http_status = http_status + + # Reset circuit breaker + channel.consecutive_failures = 0 + channel.circuit_open_until = None + + await db.commit() + logger.info( + f"[dispatch_notification] Notifica {notification_log_id} INVIATA " + f"canale={channel.name!r} tipo={channel_type!r}" + ) + return { + "status": "sent", + "log_id": notification_log_id, + "channel": channel.name, + "channel_type": channel_type, + "attempt": current_attempt, + } + + else: + # ── Retry o failure definitivo ──────────────────────────────────── + log.last_error = error_msg + log.http_status = http_status + + if current_attempt < log.max_attempts: + # Calcola delay backoff + delay_idx = min(current_attempt - 1, len(_RETRY_DELAYS) - 1) + delay_seconds = _RETRY_DELAYS[delay_idx] + next_retry = datetime.now(UTC) + timedelta(seconds=delay_seconds) + + log.next_retry_at = next_retry + # Mantieni status "pending" per il retry + await db.commit() + + # Re-enqueue con backoff + if redis_client: + try: + await redis_client.enqueue_job( + "dispatch_notification", + notification_log_id, + _defer_by=timedelta(seconds=delay_seconds), + ) + logger.info( + f"[dispatch_notification] Retry tentativo " + f"{current_attempt + 1} schedulato in {delay_seconds}s " + f"per log {notification_log_id}" + ) + except Exception as enqueue_err: + logger.error( + f"[dispatch_notification] Impossibile re-enqueue " + f"log {notification_log_id}: {enqueue_err}" + ) + + return { + "status": "retry", + "log_id": notification_log_id, + "attempt": current_attempt, + "next_retry_in_seconds": delay_seconds, + "error": error_msg, + } + + else: + # Tutti i tentativi esauriti → FAILED + log.status = "failed" + + # Aggiorna circuit breaker canale + channel.consecutive_failures += 1 + if channel.consecutive_failures >= _CIRCUIT_FAILURE_THRESHOLD: + channel.circuit_open_until = datetime.now(UTC) + _CIRCUIT_OPEN_DURATION + logger.warning( + f"[dispatch_notification] Circuit breaker aperto per " + f"canale {channel.name!r} " + f"({channel.consecutive_failures} fallimenti consecutivi)" + ) + + await db.commit() + logger.error( + f"[dispatch_notification] Notifica {notification_log_id} FALLITA " + f"definitivamente dopo {current_attempt} tentativi: {error_msg}" + ) + return { + "status": "failed", + "log_id": notification_log_id, + "channel": channel.name, + "attempts": current_attempt, + "error": error_msg, + } diff --git a/worker/app/main.py b/worker/app/main.py index 75a9ff9..1f8b248 100644 --- a/worker/app/main.py +++ b/worker/app/main.py @@ -24,6 +24,7 @@ from arq.connections import RedisSettings from app.config import get_settings from app.imap.pool import MailboxPool +from app.jobs.dispatch_notification import dispatch_notification from app.jobs.send_pec import send_pec from app.jobs.sync_mailbox import sync_mailbox from app.smtp.receipt_watcher import watch_receipt @@ -132,7 +133,7 @@ class WorkerSettings: """Configurazione del worker arq.""" # Funzioni/job registrati - functions = [sync_mailbox, send_pec, watch_receipt, health_check] + functions = [sync_mailbox, send_pec, watch_receipt, dispatch_notification, health_check] # Callbacks lifecycle on_startup = on_startup diff --git a/worker/app/models.py b/worker/app/models.py index 684ce34..edf4c7c 100644 --- a/worker/app/models.py +++ b/worker/app/models.py @@ -22,7 +22,7 @@ from sqlalchemy import ( ARRAY, BigInteger, Boolean, DateTime, Enum, ForeignKey, Integer, String, Text, func, ) -from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -202,3 +202,127 @@ class Attachment(Base): # Relazione inversa verso Message message: Mapped["Message"] = relationship("Message", back_populates="attachments") + + +# ─── Modelli Notifiche ──────────────────────────────────────────────────────── + +NotifChannelType = Enum( + "webhook", "email", "telegram", "whatsapp", + name="notification_channel_type", + create_type=False, +) + +NotifStatus = Enum( + "pending", "sent", "failed", "skipped", + name="notification_status", + create_type=False, +) + + +class NotificationChannel(Base): + """ + Canale di notifica configurato da un tenant. + Corrisponde alla tabella `notification_channels` nel DB. + """ + + __tablename__ = "notification_channels" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + name: Mapped[str] = mapped_column(String(255), nullable=False) + channel_type: Mapped[str] = mapped_column(NotifChannelType, nullable=False) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + config: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + config_enc: Mapped[str | None] = mapped_column(Text, nullable=True) + consecutive_failures: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + circuit_open_until: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + created_by: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) + + rules: Mapped[list["NotificationRule"]] = relationship( + "NotificationRule", back_populates="channel", lazy="select" + ) + logs: Mapped[list["NotificationLog"]] = relationship( + "NotificationLog", back_populates="channel", lazy="select" + ) + + +class NotificationRule(Base): + """ + Regola evento PEC -> canale di notifica. + Corrisponde alla tabella `notification_rules` nel DB. + """ + + __tablename__ = "notification_rules" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + channel_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("notification_channels.id", ondelete="CASCADE"), + nullable=False, + ) + name: Mapped[str] = mapped_column(String(255), nullable=False) + event_type: Mapped[str] = mapped_column(String(100), nullable=False) + filter: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + + channel: Mapped["NotificationChannel"] = relationship( + "NotificationChannel", back_populates="rules" + ) + + +class NotificationLog(Base): + """ + Log di ogni tentativo di notifica con retry e circuit breaker. + Corrisponde alla tabella `notification_log` nel DB. + """ + + __tablename__ = "notification_log" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False) + channel_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("notification_channels.id", ondelete="CASCADE"), + nullable=False, + ) + rule_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + ForeignKey("notification_rules.id", ondelete="SET NULL"), + nullable=True, + ) + event_type: Mapped[str] = mapped_column(String(100), nullable=False) + event_payload: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + status: Mapped[str] = mapped_column(NotifStatus, nullable=False, default="pending") + attempt_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + max_attempts: Mapped[int] = mapped_column(Integer, nullable=False, default=3) + next_retry_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + last_error: Mapped[str | None] = mapped_column(Text, nullable=True) + http_status: Mapped[int | None] = mapped_column(Integer, nullable=True) + sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + + channel: Mapped["NotificationChannel"] = relationship( + "NotificationChannel", back_populates="logs" + ) diff --git a/worker/app/notifications/__init__.py b/worker/app/notifications/__init__.py new file mode 100644 index 0000000..14baf19 --- /dev/null +++ b/worker/app/notifications/__init__.py @@ -0,0 +1 @@ +# Senders notifiche multi-canale per il worker diff --git a/worker/app/notifications/email_smtp.py b/worker/app/notifications/email_smtp.py new file mode 100644 index 0000000..315043b --- /dev/null +++ b/worker/app/notifications/email_smtp.py @@ -0,0 +1,76 @@ +""" +Email SMTP sender (worker) – invio notifiche via aiosmtplib. + +Copia del sender backend: i due container sono separati. +""" + +from datetime import datetime +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +import aiosmtplib + +DEFAULT_TIMEOUT = 15.0 + + +class EmailSMTPError(Exception): + def __init__(self, message: str, smtp_code: int | None = None): + super().__init__(message) + self.smtp_code = smtp_code + + +async def send_email_notification( + smtp_host: str, + smtp_port: int, + smtp_user: str, + smtp_password: str, + from_email: str, + to_email: str, + subject: str, + body_text: str, + body_html: str | None = None, + from_name: str = "PEChub Notifiche", + use_tls: bool = True, + use_starttls: bool = False, + timeout: float = DEFAULT_TIMEOUT, +) -> None: + """ + Invia un'email di notifica via SMTP. + + Raises: + EmailSMTPError: in caso di errori di autenticazione, connessione o invio + """ + msg = MIMEMultipart("alternative") + msg["Subject"] = subject + msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email + msg["To"] = to_email + msg["Date"] = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S +0000") + msg["X-Mailer"] = "PEChub/1.0" + + msg.attach(MIMEText(body_text, "plain", "utf-8")) + if body_html: + msg.attach(MIMEText(body_html, "html", "utf-8")) + + try: + await aiosmtplib.send( + msg, + hostname=smtp_host, + port=smtp_port, + username=smtp_user, + password=smtp_password, + use_tls=use_tls, + start_tls=use_starttls, + timeout=timeout, + ) + except aiosmtplib.SMTPAuthenticationError as exc: + raise EmailSMTPError( + f"Autenticazione SMTP fallita: {exc}", smtp_code=535 + ) from exc + except aiosmtplib.SMTPConnectError as exc: + raise EmailSMTPError( + f"Connessione SMTP fallita a {smtp_host}:{smtp_port}: {exc}" + ) from exc + except aiosmtplib.SMTPException as exc: + raise EmailSMTPError(f"Errore SMTP: {exc}") from exc + except Exception as exc: + raise EmailSMTPError(f"Errore invio email: {exc}") from exc diff --git a/worker/app/notifications/telegram.py b/worker/app/notifications/telegram.py new file mode 100644 index 0000000..b9d203d --- /dev/null +++ b/worker/app/notifications/telegram.py @@ -0,0 +1,69 @@ +""" +Telegram Bot API – invio messaggi via sendMessage (worker). + +Copia del sender backend: i due container sono separati e non +possono condividere package, quindi il codice e' duplicato. +""" + +import httpx + +TELEGRAM_API_BASE = "https://api.telegram.org" +DEFAULT_TIMEOUT = 10.0 + + +class TelegramError(Exception): + def __init__(self, message: str, http_status: int | None = None, api_code: int | None = None): + super().__init__(message) + self.http_status = http_status + self.api_code = api_code + + +async def send_message( + bot_token: str, + chat_id: str, + text: str, + parse_mode: str = "HTML", + disable_web_page_preview: bool = True, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """ + Invia un messaggio a un canale/gruppo/utente Telegram. + + Returns: + dict con il risultato della API Telegram (result.message_id, ecc.) + + Raises: + TelegramError: in caso di errore HTTP o risposta API non-ok + """ + url = f"{TELEGRAM_API_BASE}/bot{bot_token}/sendMessage" + payload: dict = {"chat_id": chat_id, "text": text} + if parse_mode: + payload["parse_mode"] = parse_mode + if disable_web_page_preview: + payload["link_preview_options"] = {"is_disabled": True} + + async with httpx.AsyncClient(timeout=timeout) as client: + try: + response = await client.post(url, json=payload) + except httpx.TimeoutException as exc: + raise TelegramError(f"Timeout Telegram ({timeout}s)") from exc + except httpx.RequestError as exc: + raise TelegramError(f"Errore di rete Telegram: {exc}") from exc + + if response.status_code != 200: + raise TelegramError( + f"Telegram API HTTP {response.status_code}: {response.text[:200]}", + http_status=response.status_code, + ) + + data = response.json() + if not data.get("ok"): + api_code = data.get("error_code") + description = data.get("description", "Errore sconosciuto") + raise TelegramError( + f"Telegram API error {api_code}: {description}", + http_status=response.status_code, + api_code=api_code, + ) + + return data.get("result", {}) diff --git a/worker/app/notifications/webhook.py b/worker/app/notifications/webhook.py new file mode 100644 index 0000000..fb74cac --- /dev/null +++ b/worker/app/notifications/webhook.py @@ -0,0 +1,74 @@ +""" +Webhook sender (worker) – POST HTTP con firma HMAC-SHA256. + +Copia del sender backend: i due container sono separati. +""" + +import hashlib +import hmac +import json +import uuid as uuid_mod + +import httpx + +DEFAULT_TIMEOUT = 10.0 + + +class WebhookError(Exception): + def __init__(self, message: str, http_status: int | None = None): + super().__init__(message) + self.http_status = http_status + + +async def send_webhook( + url: str, + payload: dict, + event_type: str = "new_message", + webhook_secret: str | None = None, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """ + Invia un payload JSON a un webhook URL. + + Returns: + dict con http_status, delivery_id + + Raises: + WebhookError: in caso di timeout, errore di rete o HTTP >= 400 + """ + body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8") + delivery_id = str(uuid_mod.uuid4()) + + headers = { + "Content-Type": "application/json", + "X-PEChub-Event": event_type, + "X-Delivery": delivery_id, + "User-Agent": "PEChub-Webhook/1.0", + } + + if webhook_secret: + sig = hmac.new( + webhook_secret.encode("utf-8"), + body, + hashlib.sha256, + ).hexdigest() + headers["X-Hub-Signature-256"] = f"sha256={sig}" + + async with httpx.AsyncClient(timeout=timeout) as client: + try: + response = await client.post(url, content=body, headers=headers) + except httpx.TimeoutException as exc: + raise WebhookError(f"Timeout webhook dopo {timeout}s") from exc + except httpx.RequestError as exc: + raise WebhookError(f"Errore di rete webhook: {exc}") from exc + + if response.status_code >= 400: + raise WebhookError( + f"Webhook HTTP {response.status_code}: {response.text[:200]}", + http_status=response.status_code, + ) + + return { + "http_status": response.status_code, + "delivery_id": delivery_id, + } diff --git a/worker/app/notifications/whatsapp.py b/worker/app/notifications/whatsapp.py new file mode 100644 index 0000000..96ff266 --- /dev/null +++ b/worker/app/notifications/whatsapp.py @@ -0,0 +1,73 @@ +""" +WhatsApp sender (worker) – Meta Cloud API v18. + +Copia del sender backend: i due container sono separati. +""" + +import httpx + +META_GRAPH_API_URL = "https://graph.facebook.com/v18.0" +DEFAULT_TIMEOUT = 10.0 + + +class WhatsAppError(Exception): + def __init__(self, message: str, http_status: int | None = None, api_code: int | None = None): + super().__init__(message) + self.http_status = http_status + self.api_code = api_code + + +async def send_whatsapp_message( + phone_number_id: str, + to_phone: str, + text: str, + access_token: str, + timeout: float = DEFAULT_TIMEOUT, +) -> dict: + """ + Invia un messaggio di testo WhatsApp via Meta Cloud API. + + Raises: + WhatsAppError: in caso di errore HTTP o risposta API non-ok + """ + url = f"{META_GRAPH_API_URL}/{phone_number_id}/messages" + payload = { + "messaging_product": "whatsapp", + "to": to_phone.replace(" ", "").replace("-", ""), + "type": "text", + "text": {"body": text}, + } + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient(timeout=timeout) as client: + try: + response = await client.post(url, json=payload, headers=headers) + except httpx.TimeoutException as exc: + raise WhatsAppError(f"Timeout WhatsApp API dopo {timeout}s") from exc + except httpx.RequestError as exc: + raise WhatsAppError(f"Errore di rete WhatsApp: {exc}") from exc + + if response.status_code == 401: + raise WhatsAppError("Token Meta non valido o scaduto", http_status=401) + + if response.status_code >= 400: + try: + err_data = response.json() + err_msg = err_data.get("error", {}).get("message", response.text[:200]) + err_code = err_data.get("error", {}).get("code") + except Exception: + err_msg = response.text[:200] + err_code = None + raise WhatsAppError( + f"Meta API errore HTTP {response.status_code}: {err_msg}", + http_status=response.status_code, + api_code=err_code, + ) + + data = response.json() + messages = data.get("messages", []) + message_id = messages[0].get("id") if messages else None + return {"message_id": message_id, "http_status": response.status_code}