""" Logica di sincronizzazione messaggi IMAP – Fase 3 aggiornata + Sent folder. Responsabilità: 1. Fetch della lista UID > last_sync_uid (INBOX e Sent) 2. Download envelope + raw EML per ogni UID 3. Parsing completo EML tramite app.parsers (Fase 3): - Classificazione tipo PEC (X-Ricevuta / X-TipoRicevuta) - Estrazione allegati (body text/html + allegati file) - EML-in-EML per ricevute PEC 4. Salvataggio messaggio in tabella messages 5. Upload raw EML su MinIO 6. Upload allegati su MinIO + inserimento in tabella attachments 7. State machine messaggi outbound (sent→accepted→delivered/anomaly) tramite X-Riferimento-Message-ID 8. Aggiornamento last_sync_uid / sent_last_sync_uid sulla mailbox 9. Pubblicazione evento Redis per notifica WebSocket Nota: ogni cartella IMAP ha un namespace UID separato, quindi la chiave di idempotenza è (mailbox_id, imap_uid, imap_folder). """ import email import email.header import email.utils import json import logging import re import uuid from datetime import UTC, datetime import aioimaplib import redis.asyncio as aioredis from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.jobs.dispatch_notification import evaluate_and_enqueue_notifications from app.jobs.index_message import index_message from app.models import Attachment, Mailbox, Message from app.parsers.eml_parser import parse_eml from app.parsers.pec_parser import apply_outbound_transition, classify_pec_message from app.storage.minio_client import upload_attachment, upload_eml logger = logging.getLogger(__name__) settings = get_settings() # Nomi comuni della cartella Sent nei provider PEC italiani (in ordine di priorità) # Aruba PEC usa INBOX.Inviata (\Sent), altri provider usano varianti diverse SENT_FOLDER_CANDIDATES = [ "INBOX.Inviata", # Aruba PEC (verificato via LIST) "INBOX.Sent", # Variante comune "INBOX.Inviati", # Variante italiana "INBOX.Sent Items", "Sent", "Sent Items", "Sent Messages", "Inviati", "Posta inviata", "INBOX.Posta inviata", ] # ─── Helper legacy (mantenuti per backward compatibility con i test) ────────── def _decode_header(header_value: str | None) -> str | None: """Decodifica header RFC 2047 (es. =?utf-8?b?...?=) in stringa Python.""" if not header_value: return None try: parts = email.header.decode_header(header_value) decoded = [] for part, charset in parts: if isinstance(part, bytes): decoded.append(part.decode(charset or "utf-8", errors="replace")) else: decoded.append(part) return "".join(decoded).strip() except Exception: return str(header_value) def _extract_addresses(field: str | None) -> list[str]: """Estrae lista di indirizzi email da un campo To/Cc.""" if not field: return [] try: addresses = email.utils.getaddresses([field]) return [addr for _, addr in addresses if addr] except Exception: return [] def _parse_date(date_str: str | None) -> datetime | None: """Converte stringa data RFC 2822 in datetime con timezone.""" if not date_str: return None try: parsed = email.utils.parsedate_to_datetime(date_str) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) return parsed except Exception: return None def _classify_pec_type(msg: email.message.Message) -> str: """ Classifica il tipo PEC dal header X-Ricevuta / X-TipoRicevuta. Mantenuto per backward compatibility – usa il parser completo internamente. """ pec_class = classify_pec_message(msg) return pec_class.pec_type def _parse_eml(raw_bytes: bytes) -> dict: """ Parsing di base di un EML. Wrapper del nuovo parser completo (Fase 3). Mantenuto per backward compatibility con i test esistenti. Restituisce un dict con i campi base necessari per la tabella messages. """ parsed = parse_eml(raw_bytes) pec_type = "posta_certificata" if parsed.raw_message is not None: pec_class = classify_pec_message(parsed.raw_message) pec_type = pec_class.pec_type return { "subject": parsed.subject, "from_address": parsed.from_address, "to_addresses": parsed.to_addresses if parsed.to_addresses else None, "cc_addresses": parsed.cc_addresses if parsed.cc_addresses else None, "message_id_header": parsed.message_id, "sent_at": parsed.date, "pec_type": pec_type, "body_text": parsed.body_text, "body_html": parsed.body_html, "has_attachments": parsed.has_attachments, } # ─── Sent folder discovery ──────────────────────────────────────────────────── async def _select_sent_folder( imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, email_address: str, ) -> str | None: """ Prova i nomi comuni della cartella Sent finché uno funziona (SELECT OK). Se trovata, il client è già selezionato su quella cartella. Restituisce il nome della cartella trovata, o None. """ for candidate in SENT_FOLDER_CANDIDATES: try: status, _ = await imap_client.select(candidate) if status == "OK": logger.debug(f"[{email_address}] Cartella Sent trovata: {candidate!r}") return candidate except Exception: continue logger.info(f"[{email_address}] Nessuna cartella Sent trovata (tentativi: {SENT_FOLDER_CANDIDATES})") return None # ─── Core sync functions ────────────────────────────────────────────────────── async def sync_new_messages( imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, ) -> int: """ Sincronizza i messaggi nuovi (UID > last_sync_uid) dalla cartella INBOX. ATTENZIONE: il client IMAP deve essere già selezionato su INBOX. Returns: Numero di nuovi messaggi sincronizzati. """ last_uid = mailbox.last_sync_uid or 0 search_range = f"{last_uid + 1}:*" # ── SEARCH UID > last_sync_uid ───────────────────────────────────────────── try: status, search_data = await imap_client.search("UID", search_range) except Exception as e: logger.warning(f"[{mailbox.email_address}] SEARCH INBOX fallito: {e}") return 0 if status != "OK": logger.warning( f"[{mailbox.email_address}] SEARCH INBOX status={status} data={search_data}" ) return 0 raw_seqs = b" ".join( d if isinstance(d, bytes) else d.encode() for d in search_data ).decode("ascii", errors="ignore").split() seq_numbers = [s for s in raw_seqs if s.isdigit()] if not seq_numbers: return 0 seq_numbers = seq_numbers[: settings.imap_max_fetch_per_cycle] logger.debug( f"[{mailbox.email_address}] Candidati INBOX da verificare: {len(seq_numbers)} seq" ) synced_count = 0 max_uid_synced = last_uid for seq in seq_numbers: try: uid, synced = await _fetch_and_save_message_by_seq( imap_client=imap_client, seq=seq, last_uid=last_uid, mailbox=mailbox, db=db, redis_client=redis_client, imap_folder="INBOX", direction="inbound", state="received", ) if synced and uid and uid > max_uid_synced: synced_count += 1 max_uid_synced = uid except Exception as e: logger.error( f"[{mailbox.email_address}] Errore fetch INBOX seq {seq}: {e}", exc_info=True, ) if synced_count > 0: logger.info( f"[{mailbox.email_address}] Trovati {synced_count} messaggi nuovi in INBOX" ) # Aggiorna last_sync_uid e last_sync_at if max_uid_synced > last_uid: mailbox.last_sync_uid = max_uid_synced mailbox.last_sync_at = datetime.now(UTC) await db.flush() await db.commit() return synced_count async def sync_sent_messages( imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, ) -> int: """ Sincronizza i messaggi nuovi dalla cartella Sent (posta inviata) del server IMAP. Salva i messaggi come direction='outbound', state='sent'. Dopo la sync ri-seleziona INBOX per ripristinare il client allo stato normale. Returns: Numero di nuovi messaggi inviati sincronizzati. """ # Trova e seleziona la cartella Sent sent_folder = await _select_sent_folder(imap_client, mailbox.email_address) if not sent_folder: # Assicurati di tornare in INBOX try: await imap_client.select("INBOX") except Exception: pass return 0 # Siamo ora selezionati nella cartella Sent last_uid = mailbox.sent_last_sync_uid or 0 search_range = f"{last_uid + 1}:*" try: status, search_data = await imap_client.search("UID", search_range) except Exception as e: logger.warning(f"[{mailbox.email_address}] SEARCH Sent fallito: {e}") try: await imap_client.select("INBOX") except Exception: pass return 0 if status != "OK": try: await imap_client.select("INBOX") except Exception: pass return 0 raw_seqs = b" ".join( d if isinstance(d, bytes) else d.encode() for d in search_data ).decode("ascii", errors="ignore").split() seq_numbers = [s for s in raw_seqs if s.isdigit()] if not seq_numbers: logger.debug(f"[{mailbox.email_address}] Nessun messaggio nuovo in {sent_folder!r}") try: await imap_client.select("INBOX") except Exception: pass return 0 seq_numbers = seq_numbers[: settings.imap_max_fetch_per_cycle] logger.info( f"[{mailbox.email_address}] Trovati {len(seq_numbers)} messaggi nuovi in {sent_folder!r}" ) synced_count = 0 max_uid_synced = last_uid for seq in seq_numbers: try: uid, synced = await _fetch_and_save_message_by_seq( imap_client=imap_client, seq=seq, last_uid=last_uid, mailbox=mailbox, db=db, redis_client=redis_client, imap_folder=sent_folder, direction="outbound", state="sent", ) if synced and uid and uid > max_uid_synced: synced_count += 1 max_uid_synced = uid except Exception as e: logger.error( f"[{mailbox.email_address}] Errore fetch {sent_folder!r} seq {seq}: {e}", exc_info=True, ) # Aggiorna sent_last_sync_uid if max_uid_synced > last_uid: mailbox.sent_last_sync_uid = max_uid_synced await db.flush() await db.commit() logger.info( f"[{mailbox.email_address}] Sync Sent completata: {synced_count} messaggi nuovi" ) # Ri-seleziona INBOX per tornare allo stato normale try: await imap_client.select("INBOX") except Exception as e: logger.warning(f"[{mailbox.email_address}] Re-SELECT INBOX dopo Sent sync: {e}") return synced_count async def _fetch_and_save_message_by_seq( imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, seq: str, last_uid: int, mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, imap_folder: str = "INBOX", direction: str = "inbound", state: str = "received", ) -> tuple[int | None, bool]: """ Fetcha un singolo messaggio per NUMERO DI SEQUENZA (non UID). Args: imap_folder: cartella IMAP corrente (per idempotenza e salvataggio) direction: 'inbound' per INBOX, 'outbound' per Sent state: 'received' per INBOX, 'sent' per Sent Returns: (uid, saved): UID del messaggio e True se salvato, False altrimenti. """ try: status, fetch_data = await imap_client.fetch(seq, "(UID RFC822 RFC822.SIZE)") except Exception as e: logger.error(f"[{mailbox.email_address}] FETCH seq {seq} fallito: {e}") return None, False if status != "OK" or not fetch_data: logger.warning( f"[{mailbox.email_address}] FETCH seq {seq} risposta vuota: {status}" ) return None, False items_info = [(type(x).__name__, len(x) if isinstance(x, (bytes, str)) else str(x)) for x in fetch_data] logger.debug(f"[{mailbox.email_address}] fetch_data seq {seq}: {items_info}") uid: int | None = None raw_eml: bytes | None = None size_bytes: int | None = None for item in fetch_data: if isinstance(item, bytearray): if len(item) > 200: raw_eml = bytes(item) elif isinstance(item, bytes): item_str = item.decode("ascii", errors="ignore") uid_match = re.search(r"UID\s+(\d+)", item_str) if uid_match: uid = int(uid_match.group(1)) size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item_str) if size_match: size_bytes = int(size_match.group(1)) elif isinstance(item, str): uid_match = re.search(r"UID\s+(\d+)", item) if uid_match: uid = int(uid_match.group(1)) size_match = re.search(r"RFC822\.SIZE\s+(\d+)", item) if size_match: size_bytes = int(size_match.group(1)) if uid is None or uid <= last_uid: return uid, False if not raw_eml: logger.warning(f"[{mailbox.email_address}] seq {seq} UID {uid}: body mancante") return uid, False if size_bytes is None: size_bytes = len(raw_eml) return uid, await _save_message( uid=uid, raw_eml=raw_eml, size_bytes=size_bytes, mailbox=mailbox, db=db, redis_client=redis_client, imap_folder=imap_folder, direction=direction, state=state, ) async def _fetch_and_save_message( imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, uid: int, mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, ) -> bool: """ Fetcha un singolo messaggio per UID (usato dal job sync_mailbox one-shot). Sincronizza solo dalla cartella INBOX. """ existing = await db.execute( select(Message.id).where( Message.mailbox_id == mailbox.id, Message.imap_uid == uid, Message.imap_folder == "INBOX", ) ) if existing.scalar_one_or_none(): return False try: status, fetch_data = await imap_client.uid("FETCH", str(uid), "(RFC822 RFC822.SIZE)") except Exception as e: logger.error(f"[{mailbox.email_address}] UID FETCH {uid} fallito: {e}") return False if status != "OK" or not fetch_data: return False raw_eml: bytes | None = None size_bytes: int | None = None for item in fetch_data: if isinstance(item, bytes) and len(item) > 100: raw_eml = item elif isinstance(item, (bytes, str)): s = item.decode("ascii", errors="ignore") if isinstance(item, bytes) else item m = re.search(r"RFC822\.SIZE\s+(\d+)", s) if m: size_bytes = int(m.group(1)) if not raw_eml: return False return await _save_message( uid=uid, raw_eml=raw_eml, size_bytes=size_bytes or len(raw_eml), mailbox=mailbox, db=db, redis_client=redis_client, imap_folder="INBOX", direction="inbound", state="received", ) # ─── Save message (Fase 3 – con parser completo, allegati, state machine) ──── async def _save_message( uid: int, raw_eml: bytes, size_bytes: int, mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, imap_folder: str = "INBOX", direction: str = "inbound", state: str = "received", ) -> bool: """ Salva un messaggio EML in DB e su MinIO. Args: imap_folder: cartella IMAP di provenienza ('INBOX', 'Sent', ecc.) direction: 'inbound' per posta in arrivo, 'outbound' per posta inviata state: stato iniziale del messaggio ('received' per inbound, 'sent' per outbound) Fase 3 – aggiornato per: - Idempotenza basata su (mailbox_id, imap_uid, imap_folder) – le UID sono per-cartella in IMAP, quindi lo stesso UID può esistere in INBOX e Sent - Parser completo (body, allegati, EML-in-EML) - Classificazione precisa tipo PEC (tutti i provider) - Salvataggio allegati su MinIO + tabella attachments - State machine outbound: solo per messaggi inbound (ricevute PEC) - Collegamento parent_message_id via X-Riferimento-Message-ID - Dedup outbound: evita duplicati quando un messaggio inviato via send_pec viene poi trovato anche nella cartella Sent del server IMAP """ # ── Idempotenza: chiave composta (mailbox_id + imap_uid + imap_folder) ──── existing = await db.execute( select(Message.id).where( Message.mailbox_id == mailbox.id, Message.imap_uid == uid, Message.imap_folder == imap_folder, ) ) if existing.scalar_one_or_none(): logger.debug(f"[{mailbox.email_address}] UID {uid} in {imap_folder!r} già in DB, skip") return False # ── Classificazione PEC da header (veloce, senza body) ─────────────────── # La classificazione avviene PRIMA del parsing completo perche' il parser # deve sapere se il messaggio e' una ricevuta per evitare di sovrascrivere # il body_text (testo della ricevuta) con il contenuto di postacert.eml. quick_msg = email.message_from_bytes(raw_eml) pec_class = classify_pec_message(quick_msg) # ── Parsing completo EML (con is_receipt per proteggere il body) ────────── parsed = parse_eml(raw_eml, is_receipt=pec_class.is_receipt) received_at = datetime.now(UTC) # ── Dedup outbound: upsert sul record send_pec invece di creare duplicato ─ # Problema: send_pec crea un record outbound con imap_uid=NULL e poi # la sync della cartella Sent trova lo stesso messaggio e vorrebbe creare # un secondo record con lo stesso message_id_header. I duplicati rompono # il binding delle ricevute (_apply_outbound_state_machine usava # scalar_one_or_none() che esplode con MultipleResultsFound). # Soluzione: se esiste già un record outbound con lo stesso message_id_header # e imap_uid=NULL (il record canonico di send_pec), aggiorniamo quel record # con l'imap_uid/imap_folder della Sent folder invece di crearne uno nuovo. if direction == "outbound" and parsed.message_id: existing_outbound = await db.execute( select(Message).where( Message.mailbox_id == mailbox.id, Message.message_id_header == parsed.message_id, Message.direction == "outbound", Message.imap_uid.is_(None), ) ) send_pec_record = existing_outbound.scalar_one_or_none() if send_pec_record: # Aggiorna il record esistente con i dati IMAP della cartella Sent send_pec_record.imap_uid = uid send_pec_record.imap_folder = imap_folder send_pec_record.updated_at = datetime.now(UTC) # Aggiorna anche il raw_eml_path se non è già impostato if not send_pec_record.raw_eml_path: try: eml_path = await upload_eml( tenant_id=str(mailbox.tenant_id), mailbox_id=str(mailbox.id), uid=uid, eml_bytes=raw_eml, ) send_pec_record.raw_eml_path = eml_path except Exception as e: logger.warning( f"[{mailbox.email_address}] Upload EML MinIO per record send_pec " f"UID {uid}: {e}" ) await db.flush() logger.info( f"[{mailbox.email_address}] Sent-sync: aggiornato record send_pec " f"message_id={parsed.message_id!r} con imap_uid={uid} " f"folder={imap_folder!r} (evitato duplicato outbound)" ) return True # ── State machine: trova e aggiorna messaggio outbound ──────────────────── # Solo per messaggi inbound che sono ricevute PEC (non per posta inviata) parent_message_id: uuid.UUID | None = None if direction == "inbound" and pec_class.is_receipt and pec_class.riferimento_message_id: try: parent_message_id = await _apply_outbound_state_machine( riferimento_message_id=pec_class.riferimento_message_id, pec_type=pec_class.pec_type, tenant_id=mailbox.tenant_id, db=db, ) except Exception as bind_err: logger.error( f"[{mailbox.email_address}] [receipt-binding] Errore aggiornamento stato " f"outbound per ricevuta UID={uid} tipo={pec_class.pec_type!r}: {bind_err}", exc_info=True, ) # Non interrompere il salvataggio della ricevuta: il record viene # comunque inserito, ma senza parent_message_id. # ── Upload raw EML su MinIO ─────────────────────────────────────────────── eml_path: str | None = None try: eml_path = await upload_eml( tenant_id=str(mailbox.tenant_id), mailbox_id=str(mailbox.id), uid=uid, eml_bytes=raw_eml, ) except Exception as e: logger.error(f"[{mailbox.email_address}] Upload EML MinIO UID {uid}: {e}") # ── Determina received_at / sent_at per messaggi outbound ───────────────── # Per posta inviata: il campo "date" dell'EML è la data di invio msg_received_at = received_at if direction == "inbound" else None msg_sent_at = parsed.date if direction == "outbound" else parsed.date # ── Salva messaggio in DB ───────────────────────────────────────────────── message = Message( id=uuid.uuid4(), tenant_id=mailbox.tenant_id, mailbox_id=mailbox.id, imap_uid=uid, imap_folder=imap_folder, direction=direction, state=state, pec_type=pec_class.pec_type, subject=parsed.subject, from_address=parsed.from_address, to_addresses=parsed.to_addresses if parsed.to_addresses else None, cc_addresses=parsed.cc_addresses if parsed.cc_addresses else None, message_id_header=parsed.message_id, sent_at=msg_sent_at, received_at=msg_received_at, size_bytes=size_bytes, body_text=parsed.body_text, body_html=parsed.body_html, has_attachments=parsed.has_attachments, parent_message_id=parent_message_id, raw_eml_path=eml_path, # Messaggi outbound (Sent) sono già stati letti dal mittente is_read=(direction == "outbound"), ) db.add(message) await db.flush() # ottieni message.id prima di salvare gli allegati # ── Salva allegati su MinIO + tabella attachments ───────────────────────── if parsed.attachments: await _save_attachments( attachments=parsed.attachments, message=message, mailbox=mailbox, db=db, ) # ── Pubblica evento Redis per WebSocket ─────────────────────────────────── try: event = { "type": "mailbox:new_message", "mailbox_id": str(mailbox.id), "message_id": str(message.id), "subject": message.subject or "", "from_address": message.from_address or "", "pec_type": message.pec_type, "direction": direction, "is_receipt": pec_class.is_receipt, "received_at": received_at.isoformat(), } await redis_client.publish(f"ws:tenant:{mailbox.tenant_id}", json.dumps(event)) except Exception as e: logger.warning(f"[{mailbox.email_address}] Redis publish UID {uid}: {e}") logger.info( f"[{mailbox.email_address}] Nuovo messaggio: UID={uid} folder={imap_folder!r} " f"direction={direction!r} pec_type={pec_class.pec_type!r} " f"subject={message.subject!r} allegati={len(parsed.attachments)}" ) # ── Indicizzazione full-text (non bloccante, non interrompe la sync) ───── # Chiamata dopo il flush degli allegati: index_message puo' leggere # sia il messaggio che gli allegati dalla sessione corrente. await index_message(message.id, db) # ── Valutazione e accodamento notifiche (non bloccante) ─────────────────── # Solo per messaggi inbound: le ricevute PEC e la posta in arrivo # possono triggerare regole di notifica configurate dal tenant. # I messaggi outbound (Sent) non generano notifiche automatiche. if direction == "inbound": await evaluate_and_enqueue_notifications( message=message, mailbox=mailbox, db=db, redis_client=redis_client, ) # ── Regole di smistamento automatico (Feature 2) ────────────────────────── # Solo per messaggi inbound posta_certificata (non ricevute di sistema). if direction == "inbound" and pec_class.pec_type == "posta_certificata": try: await redis_client.enqueue_job("apply_routing_rules", str(message.id)) except Exception as e: logger.warning(f"[{mailbox.email_address}] Impossibile enqueue apply_routing_rules: {e}") # ── Auto-save mittente nella rubrica (Feature 6) ────────────────────────── # Per messaggi inbound di tipo posta_certificata, salva automaticamente # il mittente nella rubrica pec_contacts del tenant (upsert idempotente). if direction == "inbound" and pec_class.pec_type == "posta_certificata" and message.from_address: try: from sqlalchemy import text as _text await db.execute( _text(""" INSERT INTO pec_contacts (id, tenant_id, email, auto_saved, created_at, updated_at) VALUES (gen_random_uuid(), :tenant_id, :email, true, now(), now()) ON CONFLICT (tenant_id, email) DO NOTHING """), {"tenant_id": str(mailbox.tenant_id), "email": message.from_address.lower().strip()}, ) await db.flush() except Exception as e: logger.debug(f"[{mailbox.email_address}] Auto-save contatto fallito (non critico): {e}") return True async def _apply_outbound_state_machine( riferimento_message_id: str, pec_type: str, tenant_id: uuid.UUID, db: AsyncSession, ) -> uuid.UUID | None: """ Aggiorna lo stato del messaggio outbound originale in base alla ricevuta. Cerca il messaggio outbound con message_id_header == riferimento_message_id, applica la transizione di stato se valida. Gestisce il caso di messaggi outbound duplicati (uno creato da send_pec con imap_uid=NULL e uno creato dalla sync della cartella Sent): in caso di multipli, prioritizza quello con imap_uid=NULL (il record canonico creato da send_pec). Il dedup in _save_message riduce drasticamente la probabilità di multipli, ma questa funzione gestisce anche i casi residui per robustezza. Returns: UUID del messaggio originale se trovato, None altrimenti. """ result = await db.execute( select(Message).where( Message.tenant_id == tenant_id, Message.message_id_header == riferimento_message_id, Message.direction == "outbound", ) ) candidates = result.scalars().all() if not candidates: logger.warning( f"[receipt-binding] Messaggio outbound non trovato per " f"riferimento={riferimento_message_id!r} (ricevuta: {pec_type!r}). " f"Potrebbe essere stato inviato da un client esterno o il message_id_header " f"non e' ancora stato persistito." ) return None # In presenza di duplicati (es. record send_pec + record Sent-sync), # prioritizza il messaggio con imap_uid=NULL (quello canonico di send_pec). parent_msg: Message | None = None if len(candidates) == 1: parent_msg = candidates[0] else: logger.warning( f"[receipt-binding] Trovati {len(candidates)} messaggi outbound con " f"message_id_header={riferimento_message_id!r}. " f"Prioritizzo il record con imap_uid=NULL (send_pec)." ) # Priorità 1: imap_uid IS NULL (creato da send_pec) for m in candidates: if m.imap_uid is None: parent_msg = m break # Priorità 2: qualsiasi altro (creato dalla sync Sent) if parent_msg is None: parent_msg = candidates[0] new_state = apply_outbound_transition(parent_msg.state, pec_type) if new_state: old_state = parent_msg.state parent_msg.state = new_state parent_msg.updated_at = datetime.now(UTC) await db.flush() logger.info( f"[receipt-binding] State machine outbound: {riferimento_message_id!r} " f"{old_state!r} -> {new_state!r} (ricevuta: {pec_type!r}, " f"msg_id={parent_msg.id})" ) else: logger.debug( f"[receipt-binding] Nessuna transizione valida per {riferimento_message_id!r} " f"state={parent_msg.state!r} ricevuta={pec_type!r}" ) return parent_msg.id async def _save_attachments( attachments: list, message: Message, mailbox: Mailbox, db: AsyncSession, ) -> None: """ Carica gli allegati su MinIO e inserisce i record in tabella attachments. Args: attachments: lista di AttachmentInfo dal parser EML message: messaggio DB a cui appartengono gli allegati mailbox: casella (per il path MinIO) db: sessione DB """ for att in attachments: # Salta i file di sistema PEC (daticert.xml, postacert.eml, smime.p7s, ecc.) # L'EML grezzo è già conservato su MinIO tramite upload_eml if att.is_pec_system: continue storage_path: str | None = None try: storage_path = await upload_attachment( tenant_id=str(mailbox.tenant_id), mailbox_id=str(mailbox.id), message_id=str(message.id), filename=att.filename, content=att.content, content_type=att.content_type, ) except Exception as e: logger.error( f"Upload allegato {att.filename!r} per messaggio {message.id}: {e}" ) # Continua con gli altri allegati anche se uno fallisce continue # Inserisci record in DB att_record = Attachment( id=uuid.uuid4(), tenant_id=message.tenant_id, message_id=message.id, filename=att.filename, content_type=att.content_type, size_bytes=att.size_bytes, storage_path=storage_path, checksum_sha256=att.checksum_sha256, ) db.add(att_record) # flush per persistere tutti gli allegati nella transazione corrente try: await db.flush() except Exception as e: logger.error(f"Errore flush allegati per messaggio {message.id}: {e}")