Files
2026-06-04 20:54:49 +02:00

885 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Logica di sincronizzazione messaggi IMAP Fase 3 aggiornata + Sent folder.
Responsabilità:
1. Fetch della lista UID > last_sync_uid (INBOX e Sent)
2. Download envelope + raw EML per ogni UID
3. Parsing completo EML tramite app.parsers (Fase 3):
- Classificazione tipo PEC (X-Ricevuta / X-TipoRicevuta)
- Estrazione allegati (body text/html + allegati file)
- EML-in-EML per ricevute PEC
4. Salvataggio messaggio in tabella messages
5. Upload raw EML su MinIO
6. Upload allegati su MinIO + inserimento in tabella attachments
7. State machine messaggi outbound (sent→accepted→delivered/anomaly)
tramite X-Riferimento-Message-ID
8. Aggiornamento last_sync_uid / sent_last_sync_uid sulla mailbox
9. Pubblicazione evento Redis per notifica WebSocket
Nota: ogni cartella IMAP ha un namespace UID separato, quindi la chiave
di idempotenza è (mailbox_id, imap_uid, imap_folder).
"""
import email
import email.header
import email.utils
import json
import logging
import re
import uuid
from datetime import UTC, datetime
import aioimaplib
import redis.asyncio as aioredis
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import get_settings
from app.jobs.dispatch_notification import evaluate_and_enqueue_notifications
from app.jobs.index_message import index_message
from app.models import Attachment, Mailbox, Message
from app.parsers.eml_parser import parse_eml
from app.parsers.pec_parser import apply_outbound_transition, classify_pec_message
from app.storage.minio_client import upload_attachment, upload_eml
logger = logging.getLogger(__name__)
settings = get_settings()
# Nomi comuni della cartella Sent nei provider PEC italiani (in ordine di priorità)
# Aruba PEC usa INBOX.Inviata (\Sent), altri provider usano varianti diverse
SENT_FOLDER_CANDIDATES = [
"INBOX.Inviata", # Aruba PEC (verificato via LIST)
"INBOX.Sent", # Variante comune
"INBOX.Inviati", # Variante italiana
"INBOX.Sent Items",
"Sent",
"Sent Items",
"Sent Messages",
"Inviati",
"Posta inviata",
"INBOX.Posta inviata",
]
# ─── Helper legacy (mantenuti per backward compatibility con i test) ──────────
def _decode_header(header_value: str | None) -> str | None:
"""Decodifica header RFC 2047 (es. =?utf-8?b?...?=) in stringa Python."""
if not header_value:
return None
try:
parts = email.header.decode_header(header_value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(part)
return "".join(decoded).strip()
except Exception:
return str(header_value)
def _extract_addresses(field: str | None) -> list[str]:
"""Estrae lista di indirizzi email da un campo To/Cc."""
if not field:
return []
try:
addresses = email.utils.getaddresses([field])
return [addr for _, addr in addresses if addr]
except Exception:
return []
def _parse_date(date_str: str | None) -> datetime | None:
"""Converte stringa data RFC 2822 in datetime con timezone."""
if not date_str:
return None
try:
parsed = email.utils.parsedate_to_datetime(date_str)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=UTC)
return parsed
except Exception:
return None
def _classify_pec_type(msg: email.message.Message) -> str:
"""
Classifica il tipo PEC dal header X-Ricevuta / X-TipoRicevuta.
Mantenuto per backward compatibility usa il parser completo internamente.
"""
pec_class = classify_pec_message(msg)
return pec_class.pec_type
def _parse_eml(raw_bytes: bytes) -> dict:
"""
Parsing di base di un EML.
Wrapper del nuovo parser completo (Fase 3).
Mantenuto per backward compatibility con i test esistenti.
Restituisce un dict con i campi base necessari per la tabella messages.
"""
parsed = parse_eml(raw_bytes)
pec_type = "posta_certificata"
if parsed.raw_message is not None:
pec_class = classify_pec_message(parsed.raw_message)
pec_type = pec_class.pec_type
return {
"subject": parsed.subject,
"from_address": parsed.from_address,
"to_addresses": parsed.to_addresses if parsed.to_addresses else None,
"cc_addresses": parsed.cc_addresses if parsed.cc_addresses else None,
"message_id_header": parsed.message_id,
"sent_at": parsed.date,
"pec_type": pec_type,
"body_text": parsed.body_text,
"body_html": parsed.body_html,
"has_attachments": parsed.has_attachments,
}
# ─── Sent folder discovery ────────────────────────────────────────────────────
async def _select_sent_folder(
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
email_address: str,
) -> str | None:
"""
Prova i nomi comuni della cartella Sent finché uno funziona (SELECT OK).
Se trovata, il client è già selezionato su quella cartella.
Restituisce il nome della cartella trovata, o None.
"""
for candidate in SENT_FOLDER_CANDIDATES:
try:
status, _ = await imap_client.select(candidate)
if status == "OK":
logger.debug(f"[{email_address}] Cartella Sent trovata: {candidate!r}")
return candidate
except Exception:
continue
logger.info(f"[{email_address}] Nessuna cartella Sent trovata (tentativi: {SENT_FOLDER_CANDIDATES})")
return None
# ─── Core sync functions ──────────────────────────────────────────────────────
async def sync_new_messages(
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
mailbox: Mailbox,
db: AsyncSession,
redis_client: aioredis.Redis,
) -> int:
"""
Sincronizza i messaggi nuovi (UID > last_sync_uid) dalla cartella INBOX.
ATTENZIONE: il client IMAP deve essere già selezionato su INBOX.
Returns:
Numero di nuovi messaggi sincronizzati.
"""
last_uid = mailbox.last_sync_uid or 0
search_range = f"{last_uid + 1}:*"
# ── SEARCH UID > last_sync_uid ─────────────────────────────────────────────
try:
status, search_data = await imap_client.search("UID", search_range)
except Exception as e:
logger.warning(f"[{mailbox.email_address}] SEARCH INBOX fallito: {e}")
return 0
if status != "OK":
logger.warning(
f"[{mailbox.email_address}] SEARCH INBOX status={status} data={search_data}"
)
return 0
raw_seqs = b" ".join(
d if isinstance(d, bytes) else d.encode() for d in search_data
).decode("ascii", errors="ignore").split()
seq_numbers = [s for s in raw_seqs if s.isdigit()]
if not seq_numbers:
return 0
seq_numbers = seq_numbers[: settings.imap_max_fetch_per_cycle]
logger.debug(
f"[{mailbox.email_address}] Candidati INBOX da verificare: {len(seq_numbers)} seq"
)
synced_count = 0
max_uid_synced = last_uid
for seq in seq_numbers:
try:
uid, synced = await _fetch_and_save_message_by_seq(
imap_client=imap_client,
seq=seq,
last_uid=last_uid,
mailbox=mailbox,
db=db,
redis_client=redis_client,
imap_folder="INBOX",
direction="inbound",
state="received",
)
if synced and uid and uid > max_uid_synced:
synced_count += 1
max_uid_synced = uid
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore fetch INBOX seq {seq}: {e}",
exc_info=True,
)
if synced_count > 0:
logger.info(
f"[{mailbox.email_address}] Trovati {synced_count} messaggi nuovi in INBOX"
)
# Aggiorna last_sync_uid e last_sync_at
if max_uid_synced > last_uid:
mailbox.last_sync_uid = max_uid_synced
mailbox.last_sync_at = datetime.now(UTC)
await db.flush()
await db.commit()
return synced_count
async def sync_sent_messages(
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
mailbox: Mailbox,
db: AsyncSession,
redis_client: aioredis.Redis,
) -> int:
"""
Sincronizza i messaggi nuovi dalla cartella Sent (posta inviata) del server IMAP.
Salva i messaggi come direction='outbound', state='sent'.
Dopo la sync ri-seleziona INBOX per ripristinare il client allo stato normale.
Returns:
Numero di nuovi messaggi inviati sincronizzati.
"""
# Trova e seleziona la cartella Sent
sent_folder = await _select_sent_folder(imap_client, mailbox.email_address)
if not sent_folder:
# Assicurati di tornare in INBOX
try:
await imap_client.select("INBOX")
except Exception:
pass
return 0
# Siamo ora selezionati nella cartella Sent
last_uid = mailbox.sent_last_sync_uid or 0
search_range = f"{last_uid + 1}:*"
try:
status, search_data = await imap_client.search("UID", search_range)
except Exception as e:
logger.warning(f"[{mailbox.email_address}] SEARCH Sent fallito: {e}")
try:
await imap_client.select("INBOX")
except Exception:
pass
return 0
if status != "OK":
try:
await imap_client.select("INBOX")
except Exception:
pass
return 0
raw_seqs = b" ".join(
d if isinstance(d, bytes) else d.encode() for d in search_data
).decode("ascii", errors="ignore").split()
seq_numbers = [s for s in raw_seqs if s.isdigit()]
if not seq_numbers:
logger.debug(f"[{mailbox.email_address}] Nessun messaggio nuovo in {sent_folder!r}")
try:
await imap_client.select("INBOX")
except Exception:
pass
return 0
seq_numbers = seq_numbers[: settings.imap_max_fetch_per_cycle]
logger.info(
f"[{mailbox.email_address}] Trovati {len(seq_numbers)} messaggi nuovi in {sent_folder!r}"
)
synced_count = 0
max_uid_synced = last_uid
for seq in seq_numbers:
try:
uid, synced = await _fetch_and_save_message_by_seq(
imap_client=imap_client,
seq=seq,
last_uid=last_uid,
mailbox=mailbox,
db=db,
redis_client=redis_client,
imap_folder=sent_folder,
direction="outbound",
state="sent",
)
if synced and uid and uid > max_uid_synced:
synced_count += 1
max_uid_synced = uid
except Exception as e:
logger.error(
f"[{mailbox.email_address}] Errore fetch {sent_folder!r} seq {seq}: {e}",
exc_info=True,
)
# Aggiorna sent_last_sync_uid
if max_uid_synced > last_uid:
mailbox.sent_last_sync_uid = max_uid_synced
await db.flush()
await db.commit()
logger.info(
f"[{mailbox.email_address}] Sync Sent completata: {synced_count} messaggi nuovi"
)
# Ri-seleziona INBOX per tornare allo stato normale
try:
await imap_client.select("INBOX")
except Exception as e:
logger.warning(f"[{mailbox.email_address}] Re-SELECT INBOX dopo Sent sync: {e}")
return synced_count
async def _fetch_and_save_message_by_seq(
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
seq: str,
last_uid: int,
mailbox: Mailbox,
db: AsyncSession,
redis_client: aioredis.Redis,
imap_folder: str = "INBOX",
direction: str = "inbound",
state: str = "received",
) -> tuple[int | None, bool]:
"""
Fetcha un singolo messaggio per NUMERO DI SEQUENZA (non UID).
Args:
imap_folder: cartella IMAP corrente (per idempotenza e salvataggio)
direction: 'inbound' per INBOX, 'outbound' per Sent
state: 'received' per INBOX, 'sent' per Sent
Returns:
(uid, saved): UID del messaggio e True se salvato, False altrimenti.
"""
try:
status, fetch_data = await imap_client.fetch(seq, "(UID RFC822 RFC822.SIZE)")
except Exception as e:
logger.error(f"[{mailbox.email_address}] FETCH seq {seq} fallito: {e}")
return None, False
if status != "OK" or not fetch_data:
logger.warning(
f"[{mailbox.email_address}] FETCH seq {seq} risposta vuota: {status}"
)
return None, False
items_info = [(type(x).__name__, len(x) if isinstance(x, (bytes, str)) else str(x)) for x in fetch_data]
logger.debug(f"[{mailbox.email_address}] fetch_data seq {seq}: {items_info}")
uid: int | None = None
raw_eml: bytes | None = None
size_bytes: int | None = None
for item in fetch_data:
if isinstance(item, bytearray):
if len(item) > 200:
raw_eml = bytes(item)
elif isinstance(item, bytes):
item_str = item.decode("ascii", errors="ignore")
uid_match = re.search(r"UID\s+(\d+)", item_str)
if uid_match:
uid = int(uid_match.group(1))
size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item_str)
if size_match:
size_bytes = int(size_match.group(1))
elif isinstance(item, str):
uid_match = re.search(r"UID\s+(\d+)", item)
if uid_match:
uid = int(uid_match.group(1))
size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item)
if size_match:
size_bytes = int(size_match.group(1))
if uid is None or uid <= last_uid:
return uid, False
if not raw_eml:
logger.warning(f"[{mailbox.email_address}] seq {seq} UID {uid}: body mancante")
return uid, False
if size_bytes is None:
size_bytes = len(raw_eml)
return uid, await _save_message(
uid=uid,
raw_eml=raw_eml,
size_bytes=size_bytes,
mailbox=mailbox,
db=db,
redis_client=redis_client,
imap_folder=imap_folder,
direction=direction,
state=state,
)
async def _fetch_and_save_message(
imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL,
uid: int,
mailbox: Mailbox,
db: AsyncSession,
redis_client: aioredis.Redis,
) -> bool:
"""
Fetcha un singolo messaggio per UID (usato dal job sync_mailbox one-shot).
Sincronizza solo dalla cartella INBOX.
"""
existing = await db.execute(
select(Message.id).where(
Message.mailbox_id == mailbox.id,
Message.imap_uid == uid,
Message.imap_folder == "INBOX",
)
)
if existing.scalar_one_or_none():
return False
try:
status, fetch_data = await imap_client.uid("FETCH", str(uid), "(RFC822 RFC822.SIZE)")
except Exception as e:
logger.error(f"[{mailbox.email_address}] UID FETCH {uid} fallito: {e}")
return False
if status != "OK" or not fetch_data:
return False
raw_eml: bytes | None = None
size_bytes: int | None = None
for item in fetch_data:
if isinstance(item, bytes) and len(item) > 100:
raw_eml = item
elif isinstance(item, (bytes, str)):
s = item.decode("ascii", errors="ignore") if isinstance(item, bytes) else item
m = re.search(r"RFC822\.SIZE\s+(\d+)", s)
if m:
size_bytes = int(m.group(1))
if not raw_eml:
return False
return await _save_message(
uid=uid,
raw_eml=raw_eml,
size_bytes=size_bytes or len(raw_eml),
mailbox=mailbox,
db=db,
redis_client=redis_client,
imap_folder="INBOX",
direction="inbound",
state="received",
)
# ─── Save message (Fase 3 con parser completo, allegati, state machine) ────
async def _save_message(
uid: int,
raw_eml: bytes,
size_bytes: int,
mailbox: Mailbox,
db: AsyncSession,
redis_client: aioredis.Redis,
imap_folder: str = "INBOX",
direction: str = "inbound",
state: str = "received",
) -> bool:
"""
Salva un messaggio EML in DB e su MinIO.
Args:
imap_folder: cartella IMAP di provenienza ('INBOX', 'Sent', ecc.)
direction: 'inbound' per posta in arrivo, 'outbound' per posta inviata
state: stato iniziale del messaggio ('received' per inbound, 'sent' per outbound)
Fase 3 aggiornato per:
- Idempotenza basata su (mailbox_id, imap_uid, imap_folder) le UID sono
per-cartella in IMAP, quindi lo stesso UID può esistere in INBOX e Sent
- Parser completo (body, allegati, EML-in-EML)
- Classificazione precisa tipo PEC (tutti i provider)
- Salvataggio allegati su MinIO + tabella attachments
- State machine outbound: solo per messaggi inbound (ricevute PEC)
- Collegamento parent_message_id via X-Riferimento-Message-ID
- Dedup outbound: evita duplicati quando un messaggio inviato via send_pec
viene poi trovato anche nella cartella Sent del server IMAP
"""
# ── Idempotenza: chiave composta (mailbox_id + imap_uid + imap_folder) ────
existing = await db.execute(
select(Message.id).where(
Message.mailbox_id == mailbox.id,
Message.imap_uid == uid,
Message.imap_folder == imap_folder,
)
)
if existing.scalar_one_or_none():
logger.debug(f"[{mailbox.email_address}] UID {uid} in {imap_folder!r} già in DB, skip")
return False
# ── Classificazione PEC da header (veloce, senza body) ───────────────────
# La classificazione avviene PRIMA del parsing completo perche' il parser
# deve sapere se il messaggio e' una ricevuta per evitare di sovrascrivere
# il body_text (testo della ricevuta) con il contenuto di postacert.eml.
quick_msg = email.message_from_bytes(raw_eml)
pec_class = classify_pec_message(quick_msg)
# ── Parsing completo EML (con is_receipt per proteggere il body) ──────────
parsed = parse_eml(raw_eml, is_receipt=pec_class.is_receipt)
received_at = datetime.now(UTC)
# ── Dedup outbound: upsert sul record send_pec invece di creare duplicato ─
# Problema: send_pec crea un record outbound con imap_uid=NULL e poi
# la sync della cartella Sent trova lo stesso messaggio e vorrebbe creare
# un secondo record con lo stesso message_id_header. I duplicati rompono
# il binding delle ricevute (_apply_outbound_state_machine usava
# scalar_one_or_none() che esplode con MultipleResultsFound).
# Soluzione: se esiste già un record outbound con lo stesso message_id_header
# e imap_uid=NULL (il record canonico di send_pec), aggiorniamo quel record
# con l'imap_uid/imap_folder della Sent folder invece di crearne uno nuovo.
if direction == "outbound" and parsed.message_id:
existing_outbound = await db.execute(
select(Message).where(
Message.mailbox_id == mailbox.id,
Message.message_id_header == parsed.message_id,
Message.direction == "outbound",
Message.imap_uid.is_(None),
)
)
send_pec_record = existing_outbound.scalar_one_or_none()
if send_pec_record:
# Aggiorna il record esistente con i dati IMAP della cartella Sent
send_pec_record.imap_uid = uid
send_pec_record.imap_folder = imap_folder
send_pec_record.updated_at = datetime.now(UTC)
# Aggiorna anche il raw_eml_path se non è già impostato
if not send_pec_record.raw_eml_path:
try:
eml_path = await upload_eml(
tenant_id=str(mailbox.tenant_id),
mailbox_id=str(mailbox.id),
uid=uid,
eml_bytes=raw_eml,
)
send_pec_record.raw_eml_path = eml_path
except Exception as e:
logger.warning(
f"[{mailbox.email_address}] Upload EML MinIO per record send_pec "
f"UID {uid}: {e}"
)
await db.flush()
logger.info(
f"[{mailbox.email_address}] Sent-sync: aggiornato record send_pec "
f"message_id={parsed.message_id!r} con imap_uid={uid} "
f"folder={imap_folder!r} (evitato duplicato outbound)"
)
return True
# ── State machine: trova e aggiorna messaggio outbound ────────────────────
# Solo per messaggi inbound che sono ricevute PEC (non per posta inviata)
parent_message_id: uuid.UUID | None = None
if direction == "inbound" and pec_class.is_receipt and pec_class.riferimento_message_id:
try:
parent_message_id = await _apply_outbound_state_machine(
riferimento_message_id=pec_class.riferimento_message_id,
pec_type=pec_class.pec_type,
tenant_id=mailbox.tenant_id,
db=db,
)
except Exception as bind_err:
logger.error(
f"[{mailbox.email_address}] [receipt-binding] Errore aggiornamento stato "
f"outbound per ricevuta UID={uid} tipo={pec_class.pec_type!r}: {bind_err}",
exc_info=True,
)
# Non interrompere il salvataggio della ricevuta: il record viene
# comunque inserito, ma senza parent_message_id.
# ── Upload raw EML su MinIO ───────────────────────────────────────────────
eml_path: str | None = None
try:
eml_path = await upload_eml(
tenant_id=str(mailbox.tenant_id),
mailbox_id=str(mailbox.id),
uid=uid,
eml_bytes=raw_eml,
)
except Exception as e:
logger.error(f"[{mailbox.email_address}] Upload EML MinIO UID {uid}: {e}")
# ── Determina received_at / sent_at per messaggi outbound ─────────────────
# Per posta inviata: il campo "date" dell'EML è la data di invio
msg_received_at = received_at if direction == "inbound" else None
msg_sent_at = parsed.date if direction == "outbound" else parsed.date
# ── Salva messaggio in DB ─────────────────────────────────────────────────
message = Message(
id=uuid.uuid4(),
tenant_id=mailbox.tenant_id,
mailbox_id=mailbox.id,
imap_uid=uid,
imap_folder=imap_folder,
direction=direction,
state=state,
pec_type=pec_class.pec_type,
subject=parsed.subject,
from_address=parsed.from_address,
to_addresses=parsed.to_addresses if parsed.to_addresses else None,
cc_addresses=parsed.cc_addresses if parsed.cc_addresses else None,
message_id_header=parsed.message_id,
sent_at=msg_sent_at,
received_at=msg_received_at,
size_bytes=size_bytes,
body_text=parsed.body_text,
body_html=parsed.body_html,
has_attachments=parsed.has_attachments,
parent_message_id=parent_message_id,
raw_eml_path=eml_path,
# Messaggi outbound (Sent) sono già stati letti dal mittente
is_read=(direction == "outbound"),
)
db.add(message)
await db.flush() # ottieni message.id prima di salvare gli allegati
# ── Salva allegati su MinIO + tabella attachments ─────────────────────────
if parsed.attachments:
await _save_attachments(
attachments=parsed.attachments,
message=message,
mailbox=mailbox,
db=db,
)
# ── Pubblica evento Redis per WebSocket ───────────────────────────────────
try:
event = {
"type": "mailbox:new_message",
"mailbox_id": str(mailbox.id),
"message_id": str(message.id),
"subject": message.subject or "",
"from_address": message.from_address or "",
"pec_type": message.pec_type,
"direction": direction,
"is_receipt": pec_class.is_receipt,
"received_at": received_at.isoformat(),
}
await redis_client.publish(f"ws:tenant:{mailbox.tenant_id}", json.dumps(event))
except Exception as e:
logger.warning(f"[{mailbox.email_address}] Redis publish UID {uid}: {e}")
logger.info(
f"[{mailbox.email_address}] Nuovo messaggio: UID={uid} folder={imap_folder!r} "
f"direction={direction!r} pec_type={pec_class.pec_type!r} "
f"subject={message.subject!r} allegati={len(parsed.attachments)}"
)
# ── Indicizzazione full-text (non bloccante, non interrompe la sync) ─────
# Chiamata dopo il flush degli allegati: index_message puo' leggere
# sia il messaggio che gli allegati dalla sessione corrente.
await index_message(message.id, db)
# ── Valutazione e accodamento notifiche (non bloccante) ───────────────────
# Solo per messaggi inbound: le ricevute PEC e la posta in arrivo
# possono triggerare regole di notifica configurate dal tenant.
# I messaggi outbound (Sent) non generano notifiche automatiche.
if direction == "inbound":
await evaluate_and_enqueue_notifications(
message=message,
mailbox=mailbox,
db=db,
redis_client=redis_client,
)
# ── Regole di smistamento automatico (Feature 2) ──────────────────────────
# Solo per messaggi inbound posta_certificata (non ricevute di sistema).
if direction == "inbound" and pec_class.pec_type == "posta_certificata":
try:
await redis_client.enqueue_job("apply_routing_rules", str(message.id))
except Exception as e:
logger.warning(f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}")
# ── Auto-save mittente nella rubrica (Feature 6) ──────────────────────────
# Per messaggi inbound di tipo posta_certificata, salva automaticamente
# il mittente nella rubrica pec_contacts del tenant (upsert idempotente).
if direction == "inbound" and pec_class.pec_type == "posta_certificata" and message.from_address:
try:
from sqlalchemy import text as _text
await db.execute(
_text("""
INSERT INTO pec_contacts (id, tenant_id, email, auto_saved, created_at, updated_at)
VALUES (gen_random_uuid(), :tenant_id, :email, true, now(), now())
ON CONFLICT (tenant_id, email) DO NOTHING
"""),
{"tenant_id": str(mailbox.tenant_id), "email": message.from_address.lower().strip()},
)
await db.flush()
except Exception as e:
logger.debug(f"[{mailbox.email_address}] Auto-save contatto fallito (non critico): {e}")
return True
async def _apply_outbound_state_machine(
riferimento_message_id: str,
pec_type: str,
tenant_id: uuid.UUID,
db: AsyncSession,
) -> uuid.UUID | None:
"""
Aggiorna lo stato del messaggio outbound originale in base alla ricevuta.
Cerca il messaggio outbound con message_id_header == riferimento_message_id,
applica la transizione di stato se valida.
Gestisce il caso di messaggi outbound duplicati (uno creato da send_pec con
imap_uid=NULL e uno creato dalla sync della cartella Sent): in caso di multipli,
prioritizza quello con imap_uid=NULL (il record canonico creato da send_pec).
Il dedup in _save_message riduce drasticamente la probabilità di multipli,
ma questa funzione gestisce anche i casi residui per robustezza.
Returns:
UUID del messaggio originale se trovato, None altrimenti.
"""
result = await db.execute(
select(Message).where(
Message.tenant_id == tenant_id,
Message.message_id_header == riferimento_message_id,
Message.direction == "outbound",
)
)
candidates = result.scalars().all()
if not candidates:
logger.warning(
f"[receipt-binding] Messaggio outbound non trovato per "
f"riferimento={riferimento_message_id!r} (ricevuta: {pec_type!r}). "
f"Potrebbe essere stato inviato da un client esterno o il message_id_header "
f"non e' ancora stato persistito."
)
return None
# In presenza di duplicati (es. record send_pec + record Sent-sync),
# prioritizza il messaggio con imap_uid=NULL (quello canonico di send_pec).
parent_msg: Message | None = None
if len(candidates) == 1:
parent_msg = candidates[0]
else:
logger.warning(
f"[receipt-binding] Trovati {len(candidates)} messaggi outbound con "
f"message_id_header={riferimento_message_id!r}. "
f"Prioritizzo il record con imap_uid=NULL (send_pec)."
)
# Priorità 1: imap_uid IS NULL (creato da send_pec)
for m in candidates:
if m.imap_uid is None:
parent_msg = m
break
# Priorità 2: qualsiasi altro (creato dalla sync Sent)
if parent_msg is None:
parent_msg = candidates[0]
new_state = apply_outbound_transition(parent_msg.state, pec_type)
if new_state:
old_state = parent_msg.state
parent_msg.state = new_state
parent_msg.updated_at = datetime.now(UTC)
await db.flush()
logger.info(
f"[receipt-binding] State machine outbound: {riferimento_message_id!r} "
f"{old_state!r} -> {new_state!r} (ricevuta: {pec_type!r}, "
f"msg_id={parent_msg.id})"
)
else:
logger.debug(
f"[receipt-binding] Nessuna transizione valida per {riferimento_message_id!r} "
f"state={parent_msg.state!r} ricevuta={pec_type!r}"
)
return parent_msg.id
async def _save_attachments(
attachments: list,
message: Message,
mailbox: Mailbox,
db: AsyncSession,
) -> None:
"""
Carica gli allegati su MinIO e inserisce i record in tabella attachments.
Args:
attachments: lista di AttachmentInfo dal parser EML
message: messaggio DB a cui appartengono gli allegati
mailbox: casella (per il path MinIO)
db: sessione DB
"""
for att in attachments:
# Salta i file di sistema PEC (daticert.xml, postacert.eml, smime.p7s, ecc.)
# L'EML grezzo è già conservato su MinIO tramite upload_eml
if att.is_pec_system:
continue
storage_path: str | None = None
try:
storage_path = await upload_attachment(
tenant_id=str(mailbox.tenant_id),
mailbox_id=str(mailbox.id),
message_id=str(message.id),
filename=att.filename,
content=att.content,
content_type=att.content_type,
)
except Exception as e:
logger.error(
f"Upload allegato {att.filename!r} per messaggio {message.id}: {e}"
)
# Continua con gli altri allegati anche se uno fallisce
continue
# Inserisci record in DB
att_record = Attachment(
id=uuid.uuid4(),
tenant_id=message.tenant_id,
message_id=message.id,
filename=att.filename,
content_type=att.content_type,
size_bytes=att.size_bytes,
storage_path=storage_path,
checksum_sha256=att.checksum_sha256,
)
db.add(att_record)
# flush per persistere tutti gli allegati nella transazione corrente
try:
await db.flush()
except Exception as e:
logger.error(f"Errore flush allegati per messaggio {message.id}: {e}")