diff --git a/backend/app/schemas/routing_rule.py b/backend/app/schemas/routing_rule.py index 92ada5a..cac2956 100644 --- a/backend/app/schemas/routing_rule.py +++ b/backend/app/schemas/routing_rule.py @@ -20,6 +20,11 @@ CONDITION_FIELDS = Literal[ # Rischio e Riservatezza (N3): verifica il livello gia' impostato "risk_level", "confidentiality", + # Campi aggiuntivi del messaggio + "has_attachments", # "true" / "false" + "direction", # "inbound" / "outbound" + "protocol_type", # "pec_it" / "rem_eu" + "body_contains", # testo nel corpo del messaggio (usa operator contains/regex) ] # Operatori supportati CONDITION_OPERATORS = Literal[ @@ -37,6 +42,19 @@ ACTION_TYPES = Literal[ # Rischio e Riservatezza (N3): imposta il livello di rischio o riservatezza "set_risk_level", "set_confidentiality", + # Gestione messaggio + "archive", # archivia il messaggio + "mark_for_conservation", # marca per la conservazione digitale immediata + # Scadenzario (Feature 4): imposta una scadenza relativa + # action_value = numero di giorni (es. "30") oppure "+30d", "+4w", "+1y" + # oppure JSON {"days": 30, "note": "Testo promemoria"} + "set_deadline", + # Fascicolazione (Feature N5): aggiunge il messaggio a un fascicolo esistente + # action_value = UUID del fascicolo + "add_to_fascicolo", + # Notifiche multi-canale: invia tramite un canale configurato (email/telegram/whatsapp/webhook) + # action_value = UUID del NotificationChannel + "notify_channel", ] diff --git a/backend/app/services/routing_rule_service.py b/backend/app/services/routing_rule_service.py index fa07fa1..7c485c7 100644 --- a/backend/app/services/routing_rule_service.py +++ b/backend/app/services/routing_rule_service.py @@ -6,6 +6,7 @@ Il metodo evaluate_rules() viene chiamato dal worker dopo ogni messaggio inbound import re import uuid +from datetime import datetime, timedelta, timezone from sqlalchemy import delete, func, select from sqlalchemy.ext.asyncio import AsyncSession @@ -245,6 +246,16 @@ class RoutingRuleService: return message.risk_level or "" elif field == "confidentiality": return message.confidentiality or "" + # Campi aggiuntivi + elif field == "has_attachments": + return "true" if message.has_attachments else "false" + elif field == "direction": + return message.direction or "" + elif field == "protocol_type": + return message.protocol_type or "" + elif field == "body_contains": + # Restituisce il corpo del messaggio per confronto con contains/regex + return (message.body_text or "").lower() return "" def _evaluate_condition( @@ -279,29 +290,58 @@ class RoutingRuleService: try: if action.action_type == "apply_label" and action.action_value: await self._action_apply_label(message, uuid.UUID(action.action_value)) + elif action.action_type == "apply_taxonomy" and action.action_value: # Applica un nodo tassonomico: identico a apply_label - # Il nodo è una Label con parent_id valorizzato (Processo o Classificazione) await self._action_apply_label(message, uuid.UUID(action.action_value)) + + elif action.action_type == "assign_vbox" and action.action_value: + await self._action_assign_vbox(message, uuid.UUID(action.action_value)) + elif action.action_type == "mark_read": message.is_read = True + elif action.action_type == "mark_starred": message.is_starred = True + + elif action.action_type == "archive": + message.is_archived = True + message.archived_at = datetime.now(timezone.utc) + + elif action.action_type == "mark_for_conservation": + if not message.is_pending_conservation: + message.is_pending_conservation = True + message.pending_conservation_at = datetime.now(timezone.utc) + + elif action.action_type == "set_deadline" and action.action_value: + await self._action_set_deadline(message, action.action_value) + + elif action.action_type == "add_to_fascicolo" and action.action_value: + await self._action_add_to_fascicolo(message, uuid.UUID(action.action_value)) + elif action.action_type == "notify_webhook" and action.action_value: await self._action_notify_webhook(message, action.action_value) + + elif action.action_type == "notify_channel" and action.action_value: + await self._action_notify_channel(message, uuid.UUID(action.action_value)) + # Rischio e Riservatezza (N3) elif action.action_type == "set_risk_level" and action.action_value: valid_levels = {"low", "medium", "high", "critical"} if action.action_value in valid_levels: message.risk_level = action.action_value + elif action.action_type == "set_confidentiality" and action.action_value: valid_levels = {"public", "internal", "confidential", "secret"} if action.action_value in valid_levels: message.confidentiality = action.action_value + except Exception: # Le azioni non devono interrompere il flusso principale pass + # ─── Implementazioni azioni ─────────────────────────────────────────────── + async def _action_apply_label( self, message: Message, label_id: uuid.UUID ) -> None: @@ -325,6 +365,166 @@ class RoutingRuleService: if not existing.scalar_one_or_none(): self.db.add(MessageLabel(message_id=message.id, label_id=label_id)) + async def _action_assign_vbox( + self, message: Message, vbox_id: uuid.UUID + ) -> None: + """ + Assegna il messaggio alla Virtual Box trovando la Label associata. + + Le Virtual Box sono filtri read-only: per "assegnare" un messaggio a una VBox + si applica la Label il cui nome corrisponde al campo 'label' della VBox. + Se la VBox non ha un campo 'label' o non esiste una Label con quel nome, + l'azione viene saltata silenziosamente. + """ + from app.models.virtual_box import VirtualBox + + vbox_result = await self.db.execute( + select(VirtualBox).where( + VirtualBox.id == vbox_id, + VirtualBox.tenant_id == message.tenant_id, + VirtualBox.is_active == True, # noqa: E712 + ) + ) + vbox = vbox_result.scalar_one_or_none() + if not vbox or not vbox.label: + return + + # Cerca la Label con lo stesso nome del campo 'label' della VBox + label_result = await self.db.execute( + select(Label).where( + Label.name == vbox.label, + Label.tenant_id == message.tenant_id, + ) + ) + label = label_result.scalar_one_or_none() + if label: + await self._action_apply_label(message, label.id) + + async def _action_set_deadline( + self, message: Message, value: str + ) -> None: + """ + Imposta una scadenza relativa al messaggio. + + Formati accettati per action_value: + - Numero intero: "30" → +30 giorni + - Suffisso giorni: "+30d" → +30 giorni + - Suffisso settimane:"+4w" → +28 giorni + - Suffisso mesi: "+2m" → +60 giorni + - Suffisso anni: "+1y" → +365 giorni + - JSON: {"days": 30, "note": "Risposta entro 30 giorni"} + + Non sovrascrive una scadenza gia' impostata manualmente. + """ + import json as _json + + value = value.strip() + days: int | None = None + note: str | None = None + + # Prova JSON + try: + parsed = _json.loads(value) + if isinstance(parsed, dict): + days = int(parsed.get("days", 0)) or None + note = parsed.get("note") + except Exception: + pass + + # Prova formato stringa: "30", "+30d", "+4w", "+2m", "+1y" + if days is None: + m = re.match(r"^\+?(\d+)([dDwWmMyY]?)$", value) + if m: + n = int(m.group(1)) + unit = m.group(2).lower() if m.group(2) else "d" + if unit == "w": + days = n * 7 + elif unit == "m": + days = n * 30 + elif unit == "y": + days = n * 365 + else: + days = n + + if days and days > 0: + base = message.received_at or datetime.now(timezone.utc) + message.deadline_at = base + timedelta(days=days) + if note and not message.deadline_note: + message.deadline_note = note + + async def _action_add_to_fascicolo( + self, message: Message, fascicolo_id: uuid.UUID + ) -> None: + """ + Aggiunge il messaggio a un fascicolo esistente (se non gia' presente). + + Verifica che il fascicolo appartenga al tenant e non sia gia' archiviato. + """ + from app.models.fascicolo import Fascicolo, FascicoloMessage + + # Verifica che il fascicolo esista, appartenga al tenant e non sia archiviato + fascicolo_result = await self.db.execute( + select(Fascicolo).where( + Fascicolo.id == fascicolo_id, + Fascicolo.tenant_id == message.tenant_id, + Fascicolo.stato != "archiviato", + ) + ) + if not fascicolo_result.scalar_one_or_none(): + return + + # Verifica che il messaggio non sia gia' nel fascicolo + existing = await self.db.execute( + select(FascicoloMessage).where( + FascicoloMessage.fascicolo_id == fascicolo_id, + FascicoloMessage.message_id == message.id, + ) + ) + if not existing.scalar_one_or_none(): + self.db.add(FascicoloMessage( + fascicolo_id=fascicolo_id, + message_id=message.id, + )) + + async def _action_notify_channel( + self, message: Message, channel_id: uuid.UUID + ) -> None: + """ + Invia una notifica tramite un canale configurato (webhook/email/telegram/whatsapp). + + Crea un NotificationLog con status 'pending' che il worker processerà + tramite il meccanismo di retry esistente. + """ + from app.models.notification import NotificationChannel, NotificationLog + + channel_result = await self.db.execute( + select(NotificationChannel).where( + NotificationChannel.id == channel_id, + NotificationChannel.tenant_id == message.tenant_id, + NotificationChannel.is_active == True, # noqa: E712 + ) + ) + channel = channel_result.scalar_one_or_none() + if not channel: + return + + # Crea il log per l'invio asincrono da parte del worker + self.db.add(NotificationLog( + tenant_id=message.tenant_id, + channel_id=channel_id, + event_type="routing_rule_match", + event_payload={ + "message_id": str(message.id), + "subject": message.subject, + "from_address": message.from_address, + "pec_type": message.pec_type, + "mailbox_id": str(message.mailbox_id), + "direction": message.direction, + }, + status="pending", + max_attempts=3, + )) + async def _action_notify_webhook(self, message: Message, url: str) -> None: """Invia una notifica webhook per il messaggio.""" import aiohttp diff --git a/frontend/src/api/routing_rules.api.ts b/frontend/src/api/routing_rules.api.ts index a2556fd..7b9bfbb 100644 --- a/frontend/src/api/routing_rules.api.ts +++ b/frontend/src/api/routing_rules.api.ts @@ -11,6 +11,11 @@ export type ConditionField = /** Rischio e Riservatezza (N3): verifica il livello gia' impostato sul messaggio */ | 'risk_level' | 'confidentiality' + /** Campi aggiuntivi del messaggio */ + | 'has_attachments' // "true" / "false" + | 'direction' // "inbound" / "outbound" + | 'protocol_type' // "pec_it" / "rem_eu" + | 'body_contains' // testo nel corpo del messaggio export type ConditionOperator = 'contains' | 'equals' | 'starts_with' | 'ends_with' | 'regex' | 'not_contains' @@ -25,6 +30,15 @@ export type ActionType = /** Rischio e Riservatezza (N3): imposta il livello di rischio o riservatezza */ | 'set_risk_level' | 'set_confidentiality' + /** Gestione messaggio */ + | 'archive' + | 'mark_for_conservation' + /** Scadenzario: valore = giorni (es. "30") o formato "+30d", "+4w", "+1y" */ + | 'set_deadline' + /** Fascicolazione: valore = UUID del fascicolo */ + | 'add_to_fascicolo' + /** Notifiche multi-canale: valore = UUID del NotificationChannel */ + | 'notify_channel' export interface RoutingRuleCondition { id: string diff --git a/frontend/src/pages/RoutingRules/RoutingRulesPage.tsx b/frontend/src/pages/RoutingRules/RoutingRulesPage.tsx index 48e2da4..4b3e824 100644 --- a/frontend/src/pages/RoutingRules/RoutingRulesPage.tsx +++ b/frontend/src/pages/RoutingRules/RoutingRulesPage.tsx @@ -8,10 +8,12 @@ import { Label } from '@/components/ui/Label' import { Dialog, DialogContent, DialogHeader, DialogTitle, DialogFooter } from '@/components/ui/Dialog' import { routingRulesApi, type RoutingRuleResponse, type RoutingRuleCreate, type ConditionField, type ConditionOperator, type ActionType } from '@/api/routing_rules.api' import { labelsApi } from '@/api/labels.api' -import type { LabelResponse, LabelTreeResponse } from '@/types/api.types' +import { virtualBoxesApi } from '@/api/virtual_boxes.api' +import { fascicoliApi } from '@/api/fascicoli.api' +import { notificationsApi } from '@/api/notifications.api' +import type { LabelResponse } from '@/types/api.types' import { RISK_LEVEL_OPTIONS, CONFIDENTIALITY_OPTIONS } from '@/components/RiskBadge/RiskBadge' import { getErrorMessage } from '@/api/client' -import { formatDate } from '@/lib/utils' import { cn } from '@/lib/utils' const FIELD_LABELS: Record = { @@ -24,6 +26,11 @@ const FIELD_LABELS: Record = { // Rischio e Riservatezza (N3) risk_level: 'Livello di rischio', confidentiality: 'Riservatezza', + // Campi aggiuntivi + has_attachments: 'Ha allegati', + direction: 'Direzione', + protocol_type: 'Protocollo', + body_contains: 'Corpo del messaggio', } const OPERATOR_LABELS: Record = { @@ -40,13 +47,24 @@ const ACTION_LABELS: Record = { assign_vbox: 'Assegna Virtual Box', mark_read: 'Segna come letto', mark_starred: 'Aggiungi ai preferiti', - notify_webhook: 'Notifica webhook', + notify_webhook: 'Notifica webhook (URL diretto)', apply_taxonomy: 'Applica classificazione tassonomica', // Rischio e Riservatezza (N3) set_risk_level: 'Imposta livello di rischio', set_confidentiality: 'Imposta riservatezza', + // Gestione messaggio + archive: 'Archivia messaggio', + mark_for_conservation: 'Invia in conservazione digitale', + set_deadline: 'Imposta scadenza (giorni)', + // Fascicolazione + add_to_fascicolo: 'Aggiungi a fascicolo', + // Notifiche multi-canale + notify_channel: 'Notifica tramite canale configurato', } +// Azioni che NON richiedono un valore aggiuntivo +const ACTIONS_NO_VALUE: ActionType[] = ['mark_read', 'mark_starred', 'archive', 'mark_for_conservation'] + /** Costruisce il percorso completo di una label: "Ambito > Processo > Classificazione" */ function buildLabelPath(labelId: string, allLabels: LabelResponse[]): string { const map = new Map(allLabels.map((l) => [l.id, l])) @@ -74,6 +92,13 @@ interface Action { action_value: string } +const CHANNEL_TYPE_LABELS: Record = { + webhook: 'Webhook', + email: 'Email', + telegram: 'Telegram', + whatsapp: 'WhatsApp', +} + export function RoutingRulesPage() { const queryClient = useQueryClient() const [showForm, setShowForm] = useState(false) @@ -92,7 +117,8 @@ export function RoutingRulesPage() { { action_type: 'mark_read', action_value: '' } ]) - const { data, isLoading } = useQuery({ + // Dati di supporto per i selettori + const { data: rulesData, isLoading } = useQuery({ queryKey: ['routing-rules'], queryFn: () => routingRulesApi.list(), }) @@ -103,6 +129,27 @@ export function RoutingRulesPage() { }) const labels = labelsData ?? [] + const { data: vboxesData } = useQuery({ + queryKey: ['virtual-boxes-brief'], + queryFn: () => virtualBoxesApi.list({ active_only: true, page_size: 200 }), + enabled: showForm, + }) + const vboxes = vboxesData?.items ?? [] + + const { data: fascicoliData } = useQuery({ + queryKey: ['fascicoli-brief'], + queryFn: () => fascicoliApi.list({ stato: 'aperto' }), + enabled: showForm, + }) + const fascicoli = fascicoliData ?? [] + + const { data: channelsData } = useQuery({ + queryKey: ['notification-channels-brief'], + queryFn: () => notificationsApi.listChannels({ page_size: 200 }), + enabled: showForm, + }) + const channels = channelsData?.items ?? [] + const createMutation = useMutation({ mutationFn: (d: RoutingRuleCreate) => routingRulesApi.create(d), onSuccess: () => { @@ -168,7 +215,20 @@ export function RoutingRulesPage() { const handleSubmit = () => { if (!formName.trim()) return toast.error('Il nome e\' obbligatorio') - if (formConditions.some(c => !c.value.trim())) return toast.error('Tutte le condizioni devono avere un valore') + // Le azioni senza valore non richiedono validazione del valore + const conditionsWithValue = formConditions.filter(c => !ACTIONS_NO_VALUE.includes(c.field as ActionType)) + if (conditionsWithValue.some(c => !c.value.trim())) { + return toast.error('Tutte le condizioni devono avere un valore') + } + // Verifica azioni che richiedono valore + const actionsNeedingValue: ActionType[] = [ + 'apply_label', 'assign_vbox', 'notify_webhook', 'apply_taxonomy', + 'set_risk_level', 'set_confidentiality', 'set_deadline', + 'add_to_fascicolo', 'notify_channel', + ] + if (formActions.some(a => actionsNeedingValue.includes(a.action_type) && !a.action_value.trim())) { + return toast.error('Alcune azioni richiedono un valore') + } const payload: RoutingRuleCreate = { name: formName.trim(), @@ -186,7 +246,7 @@ export function RoutingRulesPage() { } } - const items = data?.items ?? [] + const items = rulesData?.items ?? [] const toggleExpand = (id: string) => { setExpandedRules(prev => { @@ -197,6 +257,260 @@ export function RoutingRulesPage() { }) } + /** Rendering del controllo valore per una condizione in base al field */ + const renderConditionValueInput = (cond: Condition, i: number) => { + const update = (val: string) => + setFormConditions(prev => prev.map((c, idx) => idx === i ? { ...c, value: val } : c)) + + if (cond.field === 'has_label') { + return ( + + ) + } + + if (cond.field === 'has_attachments') { + return ( + + ) + } + + if (cond.field === 'direction') { + return ( + + ) + } + + if (cond.field === 'protocol_type') { + return ( + + ) + } + + if (cond.field === 'risk_level') { + return ( + + ) + } + + if (cond.field === 'confidentiality') { + return ( + + ) + } + + // Default: testo libero + return ( + update(e.target.value)} + placeholder={cond.field === 'body_contains' ? 'Parola chiave nel testo...' : 'Valore...'} + /> + ) + } + + /** Rendering del controllo valore per un'azione in base al tipo */ + const renderActionValueInput = (action: Action, i: number) => { + const update = (val: string) => + setFormActions(prev => prev.map((a, idx) => idx === i ? { ...a, action_value: val } : a)) + + if (ACTIONS_NO_VALUE.includes(action.action_type)) { + return null + } + + if (action.action_type === 'apply_label') { + return ( + + ) + } + + if (action.action_type === 'apply_taxonomy') { + return ( + + ) + } + + if (action.action_type === 'assign_vbox') { + return ( + + ) + } + + if (action.action_type === 'add_to_fascicolo') { + return ( + + ) + } + + if (action.action_type === 'notify_channel') { + return ( + + ) + } + + if (action.action_type === 'notify_webhook') { + return ( + update(e.target.value)} + placeholder="https://..." + /> + ) + } + + if (action.action_type === 'set_risk_level') { + return ( + + ) + } + + if (action.action_type === 'set_confidentiality') { + return ( + + ) + } + + if (action.action_type === 'set_deadline') { + return ( +
+ update(e.target.value)} + placeholder="30" + /> + giorni dalla ricezione +
+ ) + } + + return null + } + return (
@@ -206,7 +520,7 @@ export function RoutingRulesPage() { Regole di smistamento

- Applica automaticamente etichette e azioni ai messaggi in arrivo + Applica automaticamente etichette, scadenze e azioni ai messaggi in arrivo

@@ -340,7 +658,7 @@ export function RoutingRulesPage() { @@ -351,27 +669,7 @@ export function RoutingRulesPage() { > {(Object.entries(OPERATOR_LABELS) as [ConditionOperator, string][]).map(([v, l]) => )} - {cond.field === 'has_label' ? ( - - ) : ( - setFormConditions(prev => prev.map((c, idx) => idx === i ? { ...c, value: e.target.value } : c))} - placeholder="Valore..." - /> - )} + {renderConditionValueInput(cond, i)} @@ -396,63 +694,7 @@ export function RoutingRulesPage() { > {(Object.entries(ACTION_LABELS) as [ActionType, string][]).map(([v, l]) => )} - {(action.action_type === 'apply_label') && ( - - )} - {(action.action_type === 'apply_taxonomy') && ( - - )} - {(action.action_type === 'notify_webhook') && ( - setFormActions(prev => prev.map((a, idx) => idx === i ? { ...a, action_value: e.target.value } : a))} - placeholder="https://..." - /> - )} - {/* Rischio e Riservatezza (N3) */} - {(action.action_type === 'set_risk_level') && ( - - )} - {(action.action_type === 'set_confidentiality') && ( - - )} + {renderActionValueInput(action, i)} diff --git a/worker/app/imap/sync.py b/worker/app/imap/sync.py index d4cf6a6..0425cc0 100644 --- a/worker/app/imap/sync.py +++ b/worker/app/imap/sync.py @@ -213,10 +213,13 @@ async def sync_new_messages( synced_count = 0 max_uid_synced = last_uid + # Raccoglie gli ID per i job da accodare DOPO il commit (evita race condition) + routing_ids: list[str] = [] + notification_log_ids: list[str] = [] for seq in seq_numbers: try: - uid, synced = await _fetch_and_save_message_by_seq( + uid, synced, routing_id, notif_ids = await _fetch_and_save_message_by_seq( imap_client=imap_client, seq=seq, last_uid=last_uid, @@ -230,6 +233,9 @@ async def sync_new_messages( if synced and uid and uid > max_uid_synced: synced_count += 1 max_uid_synced = uid + if routing_id: + routing_ids.append(routing_id) + notification_log_ids.extend(notif_ids) except Exception as e: logger.error( f"[{mailbox.email_address}] Errore fetch INBOX seq {seq}: {e}", @@ -248,6 +254,24 @@ async def sync_new_messages( await db.flush() await db.commit() + # Accoda i job di smistamento e notifiche DOPO il commit per evitare + # race condition: i job aprono nuove sessioni DB e devono trovare i record. + if redis_client: + for msg_id in routing_ids: + try: + await redis_client.enqueue_job("apply_routing_rules", msg_id) + except Exception as e: + logger.warning( + f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}" + ) + for log_id in notification_log_ids: + try: + await redis_client.enqueue_job("dispatch_notification", log_id) + except Exception as e: + logger.warning( + f"[{mailbox.email_address}] Impossibile enqueue dispatch_notification: {e}" + ) + return synced_count @@ -320,7 +344,8 @@ async def sync_sent_messages( for seq in seq_numbers: try: - uid, synced = await _fetch_and_save_message_by_seq( + # I messaggi outbound non generano routing rules ne' notifiche: ignoriamo i valori + uid, synced, _, _ = await _fetch_and_save_message_by_seq( imap_client=imap_client, seq=seq, last_uid=last_uid, @@ -339,37 +364,12 @@ async def sync_sent_messages( f"[{mailbox.email_address}] Errore fetch {sent_folder!r} seq {seq}: {e}", exc_info=True, ) - - # Aggiorna sent_last_sync_uid - if max_uid_synced > last_uid: - mailbox.sent_last_sync_uid = max_uid_synced - await db.flush() - await db.commit() - - logger.info( - f"[{mailbox.email_address}] Sync Sent completata: {synced_count} messaggi nuovi" - ) - - # Ri-seleziona INBOX per tornare allo stato normale - try: - await imap_client.select("INBOX") - except Exception as e: - logger.warning(f"[{mailbox.email_address}] Re-SELECT INBOX dopo Sent sync: {e}") - - return synced_count - - -async def _fetch_and_save_message_by_seq( - imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, - seq: str, - last_uid: int, - mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, imap_folder: str = "INBOX", direction: str = "inbound", state: str = "received", -) -> tuple[int | None, bool]: +) -> tuple[int | None, bool, str | None, list[str]]: """ Fetcha un singolo messaggio per NUMERO DI SEQUENZA (non UID). @@ -379,19 +379,23 @@ async def _fetch_and_save_message_by_seq( state: 'received' per INBOX, 'sent' per Sent Returns: - (uid, saved): UID del messaggio e True se salvato, False altrimenti. + (uid, saved, routing_msg_id, notification_log_ids): + - uid: UID IMAP del messaggio + - saved: True se il messaggio e' stato salvato + - routing_msg_id: str(message.id) se routing rules vanno applicate + - notification_log_ids: lista di log_id per dispatch_notification """ try: status, fetch_data = await imap_client.fetch(seq, "(UID RFC822 RFC822.SIZE)") except Exception as e: logger.error(f"[{mailbox.email_address}] FETCH seq {seq} fallito: {e}") - return None, False + return None, False, None, [] if status != "OK" or not fetch_data: logger.warning( f"[{mailbox.email_address}] FETCH seq {seq} risposta vuota: {status}" ) - return None, False + return None, False, None, [] items_info = [(type(x).__name__, len(x) if isinstance(x, (bytes, str)) else str(x)) for x in fetch_data] logger.debug(f"[{mailbox.email_address}] fetch_data seq {seq}: {items_info}") @@ -421,16 +425,16 @@ async def _fetch_and_save_message_by_seq( size_bytes = int(size_match.group(1)) if uid is None or uid <= last_uid: - return uid, False + return uid, False, None, [] if not raw_eml: logger.warning(f"[{mailbox.email_address}] seq {seq} UID {uid}: body mancante") - return uid, False + return uid, False, None, [] if size_bytes is None: size_bytes = len(raw_eml) - return uid, await _save_message( + saved, routing_id, notif_ids = await _save_message( uid=uid, raw_eml=raw_eml, size_bytes=size_bytes, @@ -441,6 +445,7 @@ async def _fetch_and_save_message_by_seq( direction=direction, state=state, ) + return uid, saved, routing_id, notif_ids async def _fetch_and_save_message( @@ -487,7 +492,7 @@ async def _fetch_and_save_message( if not raw_eml: return False - return await _save_message( + saved, _, _ = await _save_message( uid=uid, raw_eml=raw_eml, size_bytes=size_bytes or len(raw_eml), @@ -498,6 +503,7 @@ async def _fetch_and_save_message( direction="inbound", state="received", ) + return saved # ─── Save message (Fase 3 – con parser completo, allegati, state machine) ──── @@ -512,10 +518,19 @@ async def _save_message( imap_folder: str = "INBOX", direction: str = "inbound", state: str = "received", -) -> bool: +) -> tuple[bool, str | None, list[str]]: """ Salva un messaggio EML in DB e su MinIO. + Returns: + Tuple (saved, routing_msg_id, notification_log_ids): + - saved: True se il messaggio e' stato salvato + - routing_msg_id: str(message.id) se routing rules devono essere applicate, None altrimenti + - notification_log_ids: lista di UUID stringa dei NotificationLog creati + + IMPORTANTE: il chiamante deve accodare i job apply_routing_rules e + dispatch_notification DOPO db.commit() per evitare race condition. + Args: imap_folder: cartella IMAP di provenienza ('INBOX', 'Sent', ecc.) direction: 'inbound' per posta in arrivo, 'outbound' per posta inviata @@ -542,7 +557,7 @@ async def _save_message( ) if existing.scalar_one_or_none(): logger.debug(f"[{mailbox.email_address}] UID {uid} in {imap_folder!r} già in DB, skip") - return False + return False, None, [] # ── Classificazione tipo messaggio da header (veloce, senza body) ──────── # La classificazione avviene PRIMA del parsing completo perche' il parser @@ -632,7 +647,7 @@ async def _save_message( f"message_id={parsed.message_id!r} con imap_uid={uid} " f"folder={imap_folder!r} (evitato duplicato outbound)" ) - return True + return True, None, [] # ── State machine: trova e aggiorna messaggio outbound ──────────────────── # Solo per messaggi inbound che sono ricevute PEC (non per posta inviata) @@ -740,27 +755,18 @@ async def _save_message( # sia il messaggio che gli allegati dalla sessione corrente. await index_message(message.id, db) - # ── Valutazione e accodamento notifiche (non bloccante) ─────────────────── - # Solo per messaggi inbound: le ricevute PEC e la posta in arrivo - # possono triggerare regole di notifica configurate dal tenant. - # I messaggi outbound (Sent) non generano notifiche automatiche. + # ── Valutazione notifiche (crea NotificationLog, NON accoda job) ────────── + # Solo per messaggi inbound. I job dispatch_notification vengono accodati + # dal chiamante DOPO db.commit() per evitare race condition: + # il job apre una nuova sessione DB e deve trovare il record gia' committato. + notification_log_ids: list[str] = [] if direction == "inbound": - await evaluate_and_enqueue_notifications( + notification_log_ids = await evaluate_and_enqueue_notifications( message=message, mailbox=mailbox, db=db, - redis_client=redis_client, ) - # ── Regole di smistamento automatico (Feature 2) ────────────────────────── - # Solo per messaggi inbound posta_certificata (non ricevute di sistema). - # Valido anche per messaggi REM (REMDispatch e' equivalente a posta_certificata). - if direction == "inbound" and _pec_type == "posta_certificata": - try: - await redis_client.enqueue_job("apply_routing_rules", str(message.id)) - except Exception as e: - logger.warning(f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}") - # ── Auto-save mittente nella rubrica (Feature 6) ────────────────────────── # Per messaggi inbound di tipo posta_certificata, salva automaticamente # il mittente nella rubrica pec_contacts del tenant (upsert idempotente). @@ -779,7 +785,15 @@ async def _save_message( except Exception as e: logger.debug(f"[{mailbox.email_address}] Auto-save contatto fallito (non critico): {e}") - return True + # ── Routing rules: segnala il messaggio al chiamante ────────────────────── + # Solo per messaggi inbound posta_certificata (non ricevute di sistema). + # Il job apply_routing_rules viene accodato dal chiamante DOPO db.commit() + # per evitare race condition (il job apre una nuova sessione DB). + routing_msg_id: str | None = None + if direction == "inbound" and _pec_type == "posta_certificata": + routing_msg_id = str(message.id) + + return True, routing_msg_id, notification_log_ids async def _apply_outbound_state_machine( diff --git a/worker/app/jobs/dispatch_notification.py b/worker/app/jobs/dispatch_notification.py index 5419fca..d409694 100644 --- a/worker/app/jobs/dispatch_notification.py +++ b/worker/app/jobs/dispatch_notification.py @@ -8,8 +8,8 @@ Flusso completo: - legge le NotificationRule attive del tenant con event_type="new_message" - applica i filtri opzionali (mailbox_id, direction, pec_type) - per ogni regola che matcha: crea NotificationLog(status=pending) - - enqueue del job arq dispatch_notification con defer 5s - (per dare tempo al DB di committare prima che il job legga) + - restituisce lista di log_id: il chiamante accoda i job DOPO db.commit() + (evita race condition: il job apre una nuova sessione DB) 4. dispatch_notification(ctx, notification_log_id): - legge NotificationLog + NotificationChannel dal DB - controlla circuit breaker @@ -234,38 +234,49 @@ async def evaluate_and_enqueue_notifications( message: Message, mailbox: Mailbox, db: Any, # AsyncSession – evito import circolare - redis_client: Any, # ArqRedis -) -> None: +) -> list[str]: """ - Valuta le regole di notifica per un messaggio appena salvato e accoda i job. + Valuta le regole di notifica per un messaggio appena salvato, + crea i NotificationLog e restituisce i loro ID. + + NON accoda i job arq: il chiamante deve farlo DOPO db.commit() + per evitare race condition (il job dispatch_notification apre una nuova + sessione DB e deve trovare il NotificationLog gia' committato). Chiamata da sync.py dopo _save_message e index_message. Non solleva eccezioni: gli errori vengono loggati ma non propagati per non interrompere il flusso di sincronizzazione IMAP. Args: - message: messaggio appena salvato nel DB (flush, non commit) - mailbox: casella di appartenenza - db: sessione DB (open, con flush del messaggio) - redis_client: ArqRedis per enqueue_job + message: messaggio appena salvato nel DB (flush, non commit) + mailbox: casella di appartenenza + db: sessione DB (open, con flush del messaggio) + + Returns: + Lista di notification_log_id (str) da accodare dopo il commit. """ try: - await _do_evaluate_and_enqueue(message, mailbox, db, redis_client) + return await _do_evaluate_and_enqueue(message, mailbox, db) except Exception as exc: logger.error( f"Errore evaluate_and_enqueue_notifications per messaggio " f"{message.id}: {exc}", exc_info=True, ) + return [] async def _do_evaluate_and_enqueue( message: Message, mailbox: Mailbox, db: Any, - redis_client: Any, -) -> None: - """Logica interna – puo' sollevare eccezioni.""" +) -> list[str]: + """ + Logica interna – puo' sollevare eccezioni. + + Crea i NotificationLog e restituisce i loro ID. + NON accoda job arq: il chiamante lo fa dopo db.commit(). + """ # Carica regole attive per questo tenant con event_type = "new_message" rules_result = await db.execute( @@ -280,9 +291,9 @@ async def _do_evaluate_and_enqueue( rules: list[NotificationRule] = list(rules_result.scalars().all()) if not rules: - return + return [] - enqueued_count = 0 + log_ids: list[str] = [] for rule in rules: channel = rule.channel @@ -336,31 +347,22 @@ async def _do_evaluate_and_enqueue( db.add(log) await db.flush() # ottieni log.id - # Enqueue arq job con defer 5s per attendere il commit DB - try: - await redis_client.enqueue_job( - "dispatch_notification", - str(log.id), - _defer_by=timedelta(seconds=5), - ) - enqueued_count += 1 - logger.info( - f"[notify] Enqueued dispatch_notification per regola " - f"{rule.name!r} -> canale {channel.name!r} " - f"(log_id={log.id})" - ) - except Exception as e: - logger.error( - f"[notify] Impossibile enqueue dispatch_notification " - f"per log {log.id}: {e}" - ) + # Raccoglie log_id: il job viene accodato dal chiamante DOPO db.commit() + log_ids.append(str(log.id)) + logger.info( + f"[notify] NotificationLog creato per regola " + f"{rule.name!r} -> canale {channel.name!r} " + f"(log_id={log.id})" + ) - if enqueued_count > 0: + if log_ids: logger.info( f"[notify] Messaggio {message.id}: " - f"{enqueued_count} notifiche accodate" + f"{len(log_ids)} notifiche pronte per dispatch" ) + return log_ids + # ─── Job arq principale ───────────────────────────────────────────────────────