Fix routing rule
This commit is contained in:
+68
-54
@@ -213,10 +213,13 @@ async def sync_new_messages(
|
||||
|
||||
synced_count = 0
|
||||
max_uid_synced = last_uid
|
||||
# Raccoglie gli ID per i job da accodare DOPO il commit (evita race condition)
|
||||
routing_ids: list[str] = []
|
||||
notification_log_ids: list[str] = []
|
||||
|
||||
for seq in seq_numbers:
|
||||
try:
|
||||
uid, synced = await _fetch_and_save_message_by_seq(
|
||||
uid, synced, routing_id, notif_ids = await _fetch_and_save_message_by_seq(
|
||||
imap_client=imap_client,
|
||||
seq=seq,
|
||||
last_uid=last_uid,
|
||||
@@ -230,6 +233,9 @@ async def sync_new_messages(
|
||||
if synced and uid and uid > max_uid_synced:
|
||||
synced_count += 1
|
||||
max_uid_synced = uid
|
||||
if routing_id:
|
||||
routing_ids.append(routing_id)
|
||||
notification_log_ids.extend(notif_ids)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[{mailbox.email_address}] Errore fetch INBOX seq {seq}: {e}",
|
||||
@@ -248,6 +254,24 @@ async def sync_new_messages(
|
||||
await db.flush()
|
||||
await db.commit()
|
||||
|
||||
# Accoda i job di smistamento e notifiche DOPO il commit per evitare
|
||||
# race condition: i job aprono nuove sessioni DB e devono trovare i record.
|
||||
if redis_client:
|
||||
for msg_id in routing_ids:
|
||||
try:
|
||||
await redis_client.enqueue_job("apply_routing_rules", msg_id)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}"
|
||||
)
|
||||
for log_id in notification_log_ids:
|
||||
try:
|
||||
await redis_client.enqueue_job("dispatch_notification", log_id)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[{mailbox.email_address}] Impossibile enqueue dispatch_notification: {e}"
|
||||
)
|
||||
|
||||
return synced_count
|
||||
|
||||
|
||||
@@ -320,7 +344,8 @@ async def sync_sent_messages(
|
||||
|
||||
for seq in seq_numbers:
|
||||
try:
|
||||
uid, synced = await _fetch_and_save_message_by_seq(
|
||||
# I messaggi outbound non generano routing rules ne' notifiche: ignoriamo i valori
|
||||
uid, synced, _, _ = await _fetch_and_save_message_by_seq(
|
||||
imap_client=imap_client,
|
||||
seq=seq,
|
||||
last_uid=last_uid,
|
||||
@@ -339,37 +364,12 @@ async def sync_sent_messages(
|
||||
f"[{mailbox.email_address}] Errore fetch {sent_folder!r} seq {seq}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Aggiorna sent_last_sync_uid
|
||||
if max_uid_synced > last_uid:
|
||||
mailbox.sent_last_sync_uid = max_uid_synced
|
||||
await db.flush()
|
||||
await db.commit()
|
||||
|
||||
logger.info(
|
||||
f"[{mailbox.email_address}] Sync Sent completata: {synced_count} messaggi nuovi"
|
||||
)
|
||||
|
||||
# Ri-seleziona INBOX per tornare allo stato normale
|
||||
try:
|
||||
await imap_client.select("INBOX")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{mailbox.email_address}] Re-SELECT INBOX dopo Sent sync: {e}")
|
||||
|
||||
return synced_count
|
||||
|
||||
|
||||
async def _fetch_and_save_message_by_seq(
|
||||
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
|
||||
seq: str,
|
||||
last_uid: int,
|
||||
mailbox: Mailbox,
|
||||
db: AsyncSession,
|
||||
redis_client: aioredis.Redis,
|
||||
imap_folder: str = "INBOX",
|
||||
direction: str = "inbound",
|
||||
state: str = "received",
|
||||
) -> tuple[int | None, bool]:
|
||||
) -> tuple[int | None, bool, str | None, list[str]]:
|
||||
"""
|
||||
Fetcha un singolo messaggio per NUMERO DI SEQUENZA (non UID).
|
||||
|
||||
@@ -379,19 +379,23 @@ async def _fetch_and_save_message_by_seq(
|
||||
state: 'received' per INBOX, 'sent' per Sent
|
||||
|
||||
Returns:
|
||||
(uid, saved): UID del messaggio e True se salvato, False altrimenti.
|
||||
(uid, saved, routing_msg_id, notification_log_ids):
|
||||
- uid: UID IMAP del messaggio
|
||||
- saved: True se il messaggio e' stato salvato
|
||||
- routing_msg_id: str(message.id) se routing rules vanno applicate
|
||||
- notification_log_ids: lista di log_id per dispatch_notification
|
||||
"""
|
||||
try:
|
||||
status, fetch_data = await imap_client.fetch(seq, "(UID RFC822 RFC822.SIZE)")
|
||||
except Exception as e:
|
||||
logger.error(f"[{mailbox.email_address}] FETCH seq {seq} fallito: {e}")
|
||||
return None, False
|
||||
return None, False, None, []
|
||||
|
||||
if status != "OK" or not fetch_data:
|
||||
logger.warning(
|
||||
f"[{mailbox.email_address}] FETCH seq {seq} risposta vuota: {status}"
|
||||
)
|
||||
return None, False
|
||||
return None, False, None, []
|
||||
|
||||
items_info = [(type(x).__name__, len(x) if isinstance(x, (bytes, str)) else str(x)) for x in fetch_data]
|
||||
logger.debug(f"[{mailbox.email_address}] fetch_data seq {seq}: {items_info}")
|
||||
@@ -421,16 +425,16 @@ async def _fetch_and_save_message_by_seq(
|
||||
size_bytes = int(size_match.group(1))
|
||||
|
||||
if uid is None or uid <= last_uid:
|
||||
return uid, False
|
||||
return uid, False, None, []
|
||||
|
||||
if not raw_eml:
|
||||
logger.warning(f"[{mailbox.email_address}] seq {seq} UID {uid}: body mancante")
|
||||
return uid, False
|
||||
return uid, False, None, []
|
||||
|
||||
if size_bytes is None:
|
||||
size_bytes = len(raw_eml)
|
||||
|
||||
return uid, await _save_message(
|
||||
saved, routing_id, notif_ids = await _save_message(
|
||||
uid=uid,
|
||||
raw_eml=raw_eml,
|
||||
size_bytes=size_bytes,
|
||||
@@ -441,6 +445,7 @@ async def _fetch_and_save_message_by_seq(
|
||||
direction=direction,
|
||||
state=state,
|
||||
)
|
||||
return uid, saved, routing_id, notif_ids
|
||||
|
||||
|
||||
async def _fetch_and_save_message(
|
||||
@@ -487,7 +492,7 @@ async def _fetch_and_save_message(
|
||||
if not raw_eml:
|
||||
return False
|
||||
|
||||
return await _save_message(
|
||||
saved, _, _ = await _save_message(
|
||||
uid=uid,
|
||||
raw_eml=raw_eml,
|
||||
size_bytes=size_bytes or len(raw_eml),
|
||||
@@ -498,6 +503,7 @@ async def _fetch_and_save_message(
|
||||
direction="inbound",
|
||||
state="received",
|
||||
)
|
||||
return saved
|
||||
|
||||
|
||||
# ─── Save message (Fase 3 – con parser completo, allegati, state machine) ────
|
||||
@@ -512,10 +518,19 @@ async def _save_message(
|
||||
imap_folder: str = "INBOX",
|
||||
direction: str = "inbound",
|
||||
state: str = "received",
|
||||
) -> bool:
|
||||
) -> tuple[bool, str | None, list[str]]:
|
||||
"""
|
||||
Salva un messaggio EML in DB e su MinIO.
|
||||
|
||||
Returns:
|
||||
Tuple (saved, routing_msg_id, notification_log_ids):
|
||||
- saved: True se il messaggio e' stato salvato
|
||||
- routing_msg_id: str(message.id) se routing rules devono essere applicate, None altrimenti
|
||||
- notification_log_ids: lista di UUID stringa dei NotificationLog creati
|
||||
|
||||
IMPORTANTE: il chiamante deve accodare i job apply_routing_rules e
|
||||
dispatch_notification DOPO db.commit() per evitare race condition.
|
||||
|
||||
Args:
|
||||
imap_folder: cartella IMAP di provenienza ('INBOX', 'Sent', ecc.)
|
||||
direction: 'inbound' per posta in arrivo, 'outbound' per posta inviata
|
||||
@@ -542,7 +557,7 @@ async def _save_message(
|
||||
)
|
||||
if existing.scalar_one_or_none():
|
||||
logger.debug(f"[{mailbox.email_address}] UID {uid} in {imap_folder!r} già in DB, skip")
|
||||
return False
|
||||
return False, None, []
|
||||
|
||||
# ── Classificazione tipo messaggio da header (veloce, senza body) ────────
|
||||
# La classificazione avviene PRIMA del parsing completo perche' il parser
|
||||
@@ -632,7 +647,7 @@ async def _save_message(
|
||||
f"message_id={parsed.message_id!r} con imap_uid={uid} "
|
||||
f"folder={imap_folder!r} (evitato duplicato outbound)"
|
||||
)
|
||||
return True
|
||||
return True, None, []
|
||||
|
||||
# ── State machine: trova e aggiorna messaggio outbound ────────────────────
|
||||
# Solo per messaggi inbound che sono ricevute PEC (non per posta inviata)
|
||||
@@ -740,27 +755,18 @@ async def _save_message(
|
||||
# sia il messaggio che gli allegati dalla sessione corrente.
|
||||
await index_message(message.id, db)
|
||||
|
||||
# ── Valutazione e accodamento notifiche (non bloccante) ───────────────────
|
||||
# Solo per messaggi inbound: le ricevute PEC e la posta in arrivo
|
||||
# possono triggerare regole di notifica configurate dal tenant.
|
||||
# I messaggi outbound (Sent) non generano notifiche automatiche.
|
||||
# ── Valutazione notifiche (crea NotificationLog, NON accoda job) ──────────
|
||||
# Solo per messaggi inbound. I job dispatch_notification vengono accodati
|
||||
# dal chiamante DOPO db.commit() per evitare race condition:
|
||||
# il job apre una nuova sessione DB e deve trovare il record gia' committato.
|
||||
notification_log_ids: list[str] = []
|
||||
if direction == "inbound":
|
||||
await evaluate_and_enqueue_notifications(
|
||||
notification_log_ids = await evaluate_and_enqueue_notifications(
|
||||
message=message,
|
||||
mailbox=mailbox,
|
||||
db=db,
|
||||
redis_client=redis_client,
|
||||
)
|
||||
|
||||
# ── Regole di smistamento automatico (Feature 2) ──────────────────────────
|
||||
# Solo per messaggi inbound posta_certificata (non ricevute di sistema).
|
||||
# Valido anche per messaggi REM (REMDispatch e' equivalente a posta_certificata).
|
||||
if direction == "inbound" and _pec_type == "posta_certificata":
|
||||
try:
|
||||
await redis_client.enqueue_job("apply_routing_rules", str(message.id))
|
||||
except Exception as e:
|
||||
logger.warning(f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}")
|
||||
|
||||
# ── Auto-save mittente nella rubrica (Feature 6) ──────────────────────────
|
||||
# Per messaggi inbound di tipo posta_certificata, salva automaticamente
|
||||
# il mittente nella rubrica pec_contacts del tenant (upsert idempotente).
|
||||
@@ -779,7 +785,15 @@ async def _save_message(
|
||||
except Exception as e:
|
||||
logger.debug(f"[{mailbox.email_address}] Auto-save contatto fallito (non critico): {e}")
|
||||
|
||||
return True
|
||||
# ── Routing rules: segnala il messaggio al chiamante ──────────────────────
|
||||
# Solo per messaggi inbound posta_certificata (non ricevute di sistema).
|
||||
# Il job apply_routing_rules viene accodato dal chiamante DOPO db.commit()
|
||||
# per evitare race condition (il job apre una nuova sessione DB).
|
||||
routing_msg_id: str | None = None
|
||||
if direction == "inbound" and _pec_type == "posta_certificata":
|
||||
routing_msg_id = str(message.id)
|
||||
|
||||
return True, routing_msg_id, notification_log_ids
|
||||
|
||||
|
||||
async def _apply_outbound_state_machine(
|
||||
|
||||
@@ -8,8 +8,8 @@ Flusso completo:
|
||||
- legge le NotificationRule attive del tenant con event_type="new_message"
|
||||
- applica i filtri opzionali (mailbox_id, direction, pec_type)
|
||||
- per ogni regola che matcha: crea NotificationLog(status=pending)
|
||||
- enqueue del job arq dispatch_notification con defer 5s
|
||||
(per dare tempo al DB di committare prima che il job legga)
|
||||
- restituisce lista di log_id: il chiamante accoda i job DOPO db.commit()
|
||||
(evita race condition: il job apre una nuova sessione DB)
|
||||
4. dispatch_notification(ctx, notification_log_id):
|
||||
- legge NotificationLog + NotificationChannel dal DB
|
||||
- controlla circuit breaker
|
||||
@@ -234,38 +234,49 @@ async def evaluate_and_enqueue_notifications(
|
||||
message: Message,
|
||||
mailbox: Mailbox,
|
||||
db: Any, # AsyncSession – evito import circolare
|
||||
redis_client: Any, # ArqRedis
|
||||
) -> None:
|
||||
) -> list[str]:
|
||||
"""
|
||||
Valuta le regole di notifica per un messaggio appena salvato e accoda i job.
|
||||
Valuta le regole di notifica per un messaggio appena salvato,
|
||||
crea i NotificationLog e restituisce i loro ID.
|
||||
|
||||
NON accoda i job arq: il chiamante deve farlo DOPO db.commit()
|
||||
per evitare race condition (il job dispatch_notification apre una nuova
|
||||
sessione DB e deve trovare il NotificationLog gia' committato).
|
||||
|
||||
Chiamata da sync.py dopo _save_message e index_message.
|
||||
Non solleva eccezioni: gli errori vengono loggati ma non propagati per
|
||||
non interrompere il flusso di sincronizzazione IMAP.
|
||||
|
||||
Args:
|
||||
message: messaggio appena salvato nel DB (flush, non commit)
|
||||
mailbox: casella di appartenenza
|
||||
db: sessione DB (open, con flush del messaggio)
|
||||
redis_client: ArqRedis per enqueue_job
|
||||
message: messaggio appena salvato nel DB (flush, non commit)
|
||||
mailbox: casella di appartenenza
|
||||
db: sessione DB (open, con flush del messaggio)
|
||||
|
||||
Returns:
|
||||
Lista di notification_log_id (str) da accodare dopo il commit.
|
||||
"""
|
||||
try:
|
||||
await _do_evaluate_and_enqueue(message, mailbox, db, redis_client)
|
||||
return await _do_evaluate_and_enqueue(message, mailbox, db)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
f"Errore evaluate_and_enqueue_notifications per messaggio "
|
||||
f"{message.id}: {exc}",
|
||||
exc_info=True,
|
||||
)
|
||||
return []
|
||||
|
||||
|
||||
async def _do_evaluate_and_enqueue(
|
||||
message: Message,
|
||||
mailbox: Mailbox,
|
||||
db: Any,
|
||||
redis_client: Any,
|
||||
) -> None:
|
||||
"""Logica interna – puo' sollevare eccezioni."""
|
||||
) -> list[str]:
|
||||
"""
|
||||
Logica interna – puo' sollevare eccezioni.
|
||||
|
||||
Crea i NotificationLog e restituisce i loro ID.
|
||||
NON accoda job arq: il chiamante lo fa dopo db.commit().
|
||||
"""
|
||||
|
||||
# Carica regole attive per questo tenant con event_type = "new_message"
|
||||
rules_result = await db.execute(
|
||||
@@ -280,9 +291,9 @@ async def _do_evaluate_and_enqueue(
|
||||
rules: list[NotificationRule] = list(rules_result.scalars().all())
|
||||
|
||||
if not rules:
|
||||
return
|
||||
return []
|
||||
|
||||
enqueued_count = 0
|
||||
log_ids: list[str] = []
|
||||
|
||||
for rule in rules:
|
||||
channel = rule.channel
|
||||
@@ -336,31 +347,22 @@ async def _do_evaluate_and_enqueue(
|
||||
db.add(log)
|
||||
await db.flush() # ottieni log.id
|
||||
|
||||
# Enqueue arq job con defer 5s per attendere il commit DB
|
||||
try:
|
||||
await redis_client.enqueue_job(
|
||||
"dispatch_notification",
|
||||
str(log.id),
|
||||
_defer_by=timedelta(seconds=5),
|
||||
)
|
||||
enqueued_count += 1
|
||||
logger.info(
|
||||
f"[notify] Enqueued dispatch_notification per regola "
|
||||
f"{rule.name!r} -> canale {channel.name!r} "
|
||||
f"(log_id={log.id})"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[notify] Impossibile enqueue dispatch_notification "
|
||||
f"per log {log.id}: {e}"
|
||||
)
|
||||
# Raccoglie log_id: il job viene accodato dal chiamante DOPO db.commit()
|
||||
log_ids.append(str(log.id))
|
||||
logger.info(
|
||||
f"[notify] NotificationLog creato per regola "
|
||||
f"{rule.name!r} -> canale {channel.name!r} "
|
||||
f"(log_id={log.id})"
|
||||
)
|
||||
|
||||
if enqueued_count > 0:
|
||||
if log_ids:
|
||||
logger.info(
|
||||
f"[notify] Messaggio {message.id}: "
|
||||
f"{enqueued_count} notifiche accodate"
|
||||
f"{len(log_ids)} notifiche pronte per dispatch"
|
||||
)
|
||||
|
||||
return log_ids
|
||||
|
||||
|
||||
# ─── Job arq principale ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
Reference in New Issue
Block a user