diff --git a/backend/alembic/versions/0023_add_riferimento_message_id.py b/backend/alembic/versions/0023_add_riferimento_message_id.py new file mode 100644 index 0000000..23708fa --- /dev/null +++ b/backend/alembic/versions/0023_add_riferimento_message_id.py @@ -0,0 +1,49 @@ +""" +Migrazione 0023: aggiunge colonna riferimento_message_id alla tabella messages. + +La colonna memorizza il valore di X-Riferimento-Message-ID presente nelle +ricevute PEC inbound (accettazione, avvenuta_consegna, ecc.). Serve per: + + 1. Binding retroattivo: se il binding fallisce durante la sync (race condition + con send_pec che non ha ancora committato message_id_header), il job di + rebinding puo' usare questa colonna per ricollegare le ricevute orfane + al messaggio outbound originale senza dover ri-leggere l'EML da MinIO. + + 2. Diagnostica: permette di verificare rapidamente quali ricevute hanno un + X-Riferimento valorizzato ma non hanno trovato il corrispondente outbound. + +Revision ID: 0023_add_riferimento_message_id +Revises: 0022_partial_unique_mailbox_email +""" + +from alembic import op +import sqlalchemy as sa + +# ── Identificatori migrazione ───────────────────────────────────────────────── +revision = "0023" +down_revision = "0022" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Colonna nullable: solo le ricevute inbound la hanno valorizzata. + # I messaggi posta_certificata (inbound e outbound) la lasciano NULL. + op.add_column( + "messages", + sa.Column("riferimento_message_id", sa.Text(), nullable=True), + ) + + # Indice parziale per le query di binding retroattivo: + # cerca ricevute orfane (parent_message_id IS NULL) con riferimento valorizzato. + op.create_index( + "idx_messages_riferimento", + "messages", + ["riferimento_message_id"], + postgresql_where=sa.text("riferimento_message_id IS NOT NULL"), + ) + + +def downgrade() -> None: + op.drop_index("idx_messages_riferimento", table_name="messages") + op.drop_column("messages", "riferimento_message_id") diff --git a/backend/app/api/v1/messages.py b/backend/app/api/v1/messages.py index e9afe24..584cf7d 100644 --- a/backend/app/api/v1/messages.py +++ b/backend/app/api/v1/messages.py @@ -244,6 +244,24 @@ async def list_messages( Message.parent_message_id.is_(None), ) + # ── Auto-filtro ricevute ─────────────────────────────────────────────────── + # Esclude automaticamente i messaggi inbound di tipo ricevuta (accettazione, + # avvenuta_consegna, ecc.) quando pec_type non e' specificato esplicitamente. + # Le ricevute correttamente bindate hanno parent_message_id != NULL e sono + # gia' escluse dal filtro sopra. Quelle non bindato (race condition o invii + # da client esterni) verrebbero mostrate in inbox come messaggi normali senza + # questo filtro aggiuntivo. + # I messaggi outbound hanno sempre pec_type='posta_certificata' in questo + # sistema (non creiamo mai record outbound di tipo ricevuta) quindi il filtro + # e' trasparente per la vista posta inviata. + if pec_type is None: + q = q.where( + or_( + Message.direction == "outbound", + Message.pec_type == "posta_certificata", + ) + ) + if visible_mailbox_ids is not None: if not visible_mailbox_ids: return MessageListResponse(items=[], total=0, page=page, page_size=page_size) diff --git a/backend/app/models/message.py b/backend/app/models/message.py index 39f9ee1..907ce69 100644 --- a/backend/app/models/message.py +++ b/backend/app/models/message.py @@ -92,6 +92,12 @@ class Message(Base): parent_message_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("messages.id"), nullable=True ) + # X-Riferimento-Message-ID estratto dalle ricevute inbound PEC. + # Permette il binding retroattivo se la race condition ha impedito il binding + # live (send_pec non aveva ancora committato message_id_header quando la + # ricevuta e' stata processata dalla sync IMAP). + # Solo le ricevute (pec_type != posta_certificata) la hanno valorizzata. + riferimento_message_id: Mapped[str | None] = mapped_column(Text, nullable=True) # Flag operativi is_read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) diff --git a/worker/app/imap/sync.py b/worker/app/imap/sync.py index 0425cc0..bc6c40c 100644 --- a/worker/app/imap/sync.py +++ b/worker/app/imap/sync.py @@ -364,6 +364,32 @@ async def sync_sent_messages( f"[{mailbox.email_address}] Errore fetch {sent_folder!r} seq {seq}: {e}", exc_info=True, ) + + if synced_count > 0: + logger.info( + f"[{mailbox.email_address}] Sincronizzati {synced_count} messaggi nuovi da {sent_folder!r}" + ) + + # Aggiorna sent_last_sync_uid e torna in INBOX + if max_uid_synced > last_uid: + mailbox.sent_last_sync_uid = max_uid_synced + mailbox.last_sync_at = datetime.now(UTC) + await db.flush() + await db.commit() + + try: + await imap_client.select("INBOX") + except Exception: + pass + + return synced_count + + +async def _fetch_and_save_message_by_seq( + imap_client: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL, + seq: str, + last_uid: int, + mailbox: Mailbox, db: AsyncSession, redis_client: aioredis.Redis, imap_folder: str = "INBOX", @@ -711,6 +737,10 @@ async def _save_message( body_html=parsed.body_html, has_attachments=parsed.has_attachments, parent_message_id=parent_message_id, + # Salva il X-Riferimento-Message-ID per il binding retroattivo. + # Permette allo script rebind_receipts.py di ricollegare ricevute orfane + # senza dover ri-leggere l'EML da MinIO. + riferimento_message_id=_riferimento_message_id if _is_receipt else None, raw_eml_path=eml_path, # Messaggi outbound (Sent) sono già stati letti dal mittente is_read=(direction == "outbound"), diff --git a/worker/app/jobs/send_pec.py b/worker/app/jobs/send_pec.py index 476cf74..7519f70 100644 --- a/worker/app/jobs/send_pec.py +++ b/worker/app/jobs/send_pec.py @@ -24,6 +24,7 @@ import json import logging import uuid as uuid_module from datetime import datetime, timedelta, timezone +from email.utils import make_msgid from typing import Any from sqlalchemy import select @@ -135,6 +136,35 @@ async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict: ) # Continua senza questo allegato + # ── Pre-genera Message-ID e commita PRIMA dell'invio SMTP ───────────── + # RACE CONDITION FIX: il Message-ID viene scritto nel DB e committato + # PRIMA dell'handshake SMTP, in modo che, se il ciclo IMAP sync del + # worker gira durante l'invio (finestra di secondi), trovi gia' il + # record con message_id_header valorizzato e possa bindare correttamente + # le ricevute di accettazione (da Aruba: entro 1-2s dalla ricezione). + # Senza questo fix la ricevuta di accettazione veniva sempre processata + # con message_id_header=NULL nel DB → binding fallisce → parent_message_id + # non impostato sulla ricevuta → ricevuta compare in inbox come messaggio + # ordinario e lo stato outbound non avanza ad "accepted". + if not msg.message_id_header: + # Primo tentativo: genera e persiste un nuovo Message-ID + pre_generated_id = make_msgid(domain="pechub.local") + msg.message_id_header = pre_generated_id + await db.commit() + logger.info( + f"[send_pec] Message-ID pre-committato prima dell'SMTP: " + f"{pre_generated_id}" + ) + else: + # Retry successivo: riusa il Message-ID gia' persistito dal + # tentativo precedente. In questo modo eventuali ricevute gia' + # salvate (ma non bindato per la race) possono essere ricollegate. + pre_generated_id = msg.message_id_header + logger.debug( + f"[send_pec] Retry - riutilizzo Message-ID esistente: " + f"{pre_generated_id}" + ) + # ── Tenta invio SMTP ────────────────────────────────────────────────── try: sender = SmtpSender(mailbox) @@ -145,11 +175,14 @@ async def send_pec(ctx: dict[str, Any], send_job_id: str) -> dict: body_text=msg.body_text or "", body_html=msg.body_html, attachments=attachments_data, + preset_message_id=pre_generated_id, ) # ── Successo: aggiorna DB ───────────────────────────────────────── + # msg.message_id_header e' gia' impostato e committato sopra. + # message_id_header restituito dal sender deve essere identico a + # pre_generated_id (stessa stringa che abbiamo passato). now = datetime.now(tz=timezone.utc) - msg.message_id_header = message_id_header msg.state = "sent" msg.sent_at = now diff --git a/worker/app/models.py b/worker/app/models.py index 3207bab..f7540bf 100644 --- a/worker/app/models.py +++ b/worker/app/models.py @@ -119,6 +119,9 @@ class Message(Base): parent_message_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("messages.id"), nullable=True ) + # X-Riferimento-Message-ID estratto dalle ricevute inbound PEC. + # Usato dal binding retroattivo per ricollegare ricevute orfane. + riferimento_message_id: Mapped[str | None] = mapped_column(Text, nullable=True) is_read: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) is_starred: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) diff --git a/worker/app/parsers/pec_parser.py b/worker/app/parsers/pec_parser.py index 501b5d1..0fb68ca 100644 --- a/worker/app/parsers/pec_parser.py +++ b/worker/app/parsers/pec_parser.py @@ -174,23 +174,38 @@ def detect_protocol(msg: email.message.Message) -> str: """ Determina il protocollo di un messaggio in arrivo. - Logica di rilevamento automatico: - - Se il messaggio contiene almeno un header X-REM-*, il protocollo e' REM europea - - Altrimenti e' PEC italiana (default) + Logica di rilevamento automatico (priorita' decrescente): + 1. Se presenti header PEC-IT specifici (X-Ricevuta, X-TipoRicevuta, X-Trasporto) + → protocollo e' PEC italiana, indipendentemente da altri header. + 2. Se presenti header X-REM-* SENZA header PEC-IT + → protocollo e' REM europea (ETSI EN 319 532-4) + 3. Default → PEC italiana - Questo permette al worker di usare il parser corretto (classify_rem_message vs - classify_pec_message) anche per caselle configurate come 'pec_it' che potrebbero - ricevere messaggi REM da partner europei (caso edge). + IMPORTANTE: Aruba PEC aggiunge header X-REM-* (es. X-REM-Subject) anche ai + messaggi PEC-IT standard, come parte di una migrazione verso lo standard ETSI + REM. Questi header non devono essere interpretati come protocollo REM europeo + quando gli header PEC-IT (X-Ricevuta, X-TipoRicevuta) sono anch'essi presenti. + In tal caso la classificazione PEC-IT ha priorita'. Args: msg: oggetto email.message.Message gia' parsato dagli header Returns: - 'rem_eu' se header X-REM-* rilevati, 'pec_it' altrimenti. + 'pec_it' se header PEC-IT specifici presenti (o nessun header REM), + 'rem_eu' se header X-REM-* presenti senza header PEC-IT specifici. """ + # Controlla prima gli header PEC-IT specifici: hanno priorita' assoluta. + # Questi header identificano in modo univoco un messaggio PEC italiana. + _PEC_IT_SPECIFIC = {"X-RICEVUTA", "X-TIPORICEVUTA", "X-TRASPORTO"} + for header_name in msg.keys(): + if header_name.upper() in _PEC_IT_SPECIFIC: + return "pec_it" + + # Nessun header PEC-IT trovato: verifica header REM europea for header_name in msg.keys(): if header_name.upper().startswith("X-REM-"): return "rem_eu" + return "pec_it" diff --git a/worker/app/smtp/sender.py b/worker/app/smtp/sender.py index 4ca7815..9fb6dcc 100644 --- a/worker/app/smtp/sender.py +++ b/worker/app/smtp/sender.py @@ -86,17 +86,23 @@ class SmtpSender: body_text: str, body_html: str | None = None, attachments: list[dict] | None = None, + preset_message_id: str | None = None, ) -> tuple[MIMEMultipart, str]: """ Costruisce il messaggio MIME per la PEC. Args: - to_addresses: destinatari principali - cc_addresses: destinatari in copia (può essere vuoto) - subject: oggetto del messaggio - body_text: corpo in testo semplice - body_html: corpo HTML opzionale - attachments: lista di dict {filename, content: bytes, content_type} + to_addresses: destinatari principali + cc_addresses: destinatari in copia (può essere vuoto) + subject: oggetto del messaggio + body_text: corpo in testo semplice + body_html: corpo HTML opzionale + attachments: lista di dict {filename, content: bytes, content_type} + preset_message_id: Message-ID pre-generato (opzionale). Se fornito viene + usato direttamente invece di generarne uno nuovo con + make_msgid(). Permette di settare message_id_header + nel DB PRIMA dell'invio SMTP, eliminando la race + condition con il binding delle ricevute di accettazione. Returns: (msg MIME, message_id_header) @@ -115,7 +121,9 @@ class SmtpSender: body_container = msg # Headers obbligatori - message_id = make_msgid(domain="pechub.local") + # Se il chiamante ha pre-generato il Message-ID (per committarlo nel DB prima + # dell'invio SMTP), lo usiamo direttamente. Altrimenti ne generiamo uno nuovo. + message_id = preset_message_id if preset_message_id else make_msgid(domain="pechub.local") msg["From"] = self.mailbox.email_address msg["To"] = ", ".join(to_addresses) if cc_addresses: @@ -171,6 +179,7 @@ class SmtpSender: body_text: str, body_html: str | None = None, attachments: list[dict] | None = None, + preset_message_id: str | None = None, ) -> tuple[str, bytes]: """ Invia la PEC via SMTP. @@ -180,6 +189,12 @@ class SmtpSender: - Porta 587 con STARTTLS (use_tls=False, porta 587) - Porta 25 plain (uso sconsigliato) + Args: + preset_message_id: Message-ID pre-generato da usare nell'header Message-ID. + Deve essere gia' stato committato nel DB su msg.message_id_header + prima di chiamare questo metodo, per eliminare la race condition + con il binding delle ricevute PEC. + Returns: (message_id_header, raw_eml_bytes) @@ -194,6 +209,7 @@ class SmtpSender: body_text=body_text, body_html=body_html, attachments=attachments, + preset_message_id=preset_message_id, ) raw_eml: bytes = msg.as_bytes() diff --git a/worker/scripts/rebind_receipts.py b/worker/scripts/rebind_receipts.py new file mode 100644 index 0000000..ec0b552 --- /dev/null +++ b/worker/scripts/rebind_receipts.py @@ -0,0 +1,286 @@ +""" +Script di binding retroattivo per le ricevute PEC orfane. + +Problema risolto: + La race condition tra send_pec (che committava message_id_header DOPO l'invio + SMTP) e il ciclo IMAP sync (che processava le ricevute di accettazione arrivate + in pochi secondi) causava il salvataggio di ricevute con parent_message_id=NULL. + Queste ricevute "orfane" apparivano in inbox come messaggi normali. + +Questo script: + 1. Trova tutte le ricevute inbound (pec_type != 'posta_certificata') con + parent_message_id IS NULL e riferimento_message_id valorizzato. + 2. Per ciascuna cerca il messaggio outbound con message_id_header corrispondente + nello stesso tenant. + 3. Se trovato, aggiorna parent_message_id e applica la state machine outbound. + +Esecuzione: + # Sul server remoto: + docker exec pechub-worker-1 python scripts/rebind_receipts.py + + # Con dry-run (solo stampa cosa farebbe, senza modificare il DB): + docker exec pechub-worker-1 python scripts/rebind_receipts.py --dry-run + + # Limita ai messaggi degli ultimi N giorni: + docker exec pechub-worker-1 python scripts/rebind_receipts.py --days 30 + + # Stampa statistiche finali: + docker exec pechub-worker-1 python scripts/rebind_receipts.py --verbose +""" + +import argparse +import asyncio +import logging +import sys +import os +from datetime import UTC, datetime, timedelta + +# Aggiunge la root del worker al path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import and_, select, text +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker + +from app.models import Message +from app.parsers.pec_parser import apply_outbound_transition + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("rebind_receipts") + +# Tipi di ricevuta che non sono posta_certificata +RECEIPT_PEC_TYPES = { + "accettazione", + "non_accettazione", + "presa_in_carico", + "avvenuta_consegna", + "mancata_consegna", + "errore_consegna", + "preavviso_mancata_consegna", + "rilevazione_virus", +} + + +async def rebind_receipts( + db: AsyncSession, + dry_run: bool = False, + days: int | None = None, + verbose: bool = False, +) -> dict: + """ + Esegue il binding retroattivo delle ricevute orfane. + + Args: + db: sessione DB asincrona + dry_run: se True, non modifica il DB (solo log) + days: se impostato, processa solo ricevute degli ultimi N giorni + verbose: log dettagliato per ogni ricevuta + + Returns: + dict con statistiche: total, bound, already_bound, not_found, errors + """ + stats = { + "total": 0, + "bound": 0, + "already_bound": 0, + "not_found": 0, + "errors": 0, + "state_transitions": 0, + } + + # ── Costruisci query base ───────────────────────────────────────────────── + # Ricevute orfane: parent_message_id IS NULL ma riferimento_message_id valorizzato + conditions = [ + Message.direction == "inbound", + Message.pec_type.in_(list(RECEIPT_PEC_TYPES)), + Message.parent_message_id.is_(None), + Message.riferimento_message_id.isnot(None), + ] + + if days is not None: + cutoff = datetime.now(UTC) - timedelta(days=days) + conditions.append(Message.created_at >= cutoff) + + result = await db.execute( + select(Message) + .where(and_(*conditions)) + .order_by(Message.created_at.asc()) + ) + orphan_receipts = list(result.scalars().all()) + stats["total"] = len(orphan_receipts) + + logger.info( + f"Trovate {stats['total']} ricevute orfane da processare" + + (f" (ultimi {days} giorni)" if days else "") + + (" [DRY-RUN]" if dry_run else "") + ) + + if not orphan_receipts: + logger.info("Nessuna ricevuta orfana trovata.") + return stats + + # ── Processa ogni ricevuta ──────────────────────────────────────────────── + for receipt in orphan_receipts: + try: + riferimento = receipt.riferimento_message_id + + if verbose: + logger.info( + f"Elaboro ricevuta {receipt.id} " + f"pec_type={receipt.pec_type!r} " + f"riferimento={riferimento!r}" + ) + + # Cerca il messaggio outbound corrispondente nello stesso tenant + outbound_result = await db.execute( + select(Message).where( + Message.tenant_id == receipt.tenant_id, + Message.message_id_header == riferimento, + Message.direction == "outbound", + ) + ) + candidates = list(outbound_result.scalars().all()) + + if not candidates: + if verbose: + logger.warning( + f" Nessun outbound trovato per riferimento={riferimento!r}" + ) + stats["not_found"] += 1 + continue + + # Prioritizza il record con imap_uid=NULL (canonico di send_pec) + parent_msg: Message | None = None + if len(candidates) == 1: + parent_msg = candidates[0] + else: + for m in candidates: + if m.imap_uid is None: + parent_msg = m + break + if parent_msg is None: + parent_msg = candidates[0] + + if verbose: + logger.info( + f" Trovato outbound {parent_msg.id} " + f"state={parent_msg.state!r} " + f"(da {len(candidates)} candidati)" + ) + + # Aggiorna parent_message_id sulla ricevuta + if not dry_run: + receipt.parent_message_id = parent_msg.id + receipt.updated_at = datetime.now(UTC) + + stats["bound"] += 1 + + # Applica state machine outbound + new_state = apply_outbound_transition(parent_msg.state, receipt.pec_type) + if new_state and not dry_run: + old_state = parent_msg.state + parent_msg.state = new_state + parent_msg.updated_at = datetime.now(UTC) + stats["state_transitions"] += 1 + logger.info( + f" State machine: {parent_msg.id} {old_state!r} -> {new_state!r} " + f"(ricevuta: {receipt.pec_type!r})" + ) + elif new_state: + # Dry-run: solo log + logger.info( + f" [DRY-RUN] State machine applicherebbe: " + f"{parent_msg.id} {parent_msg.state!r} -> {new_state!r}" + ) + + logger.info( + f"{'[DRY-RUN] ' if dry_run else ''}" + f"Bindato: ricevuta {receipt.id} ({receipt.pec_type}) " + f"-> outbound {parent_msg.id}" + ) + + except Exception as e: + logger.error( + f"Errore processando ricevuta {receipt.id}: {e}", + exc_info=True, + ) + stats["errors"] += 1 + continue + + # ── Commit se non dry-run ───────────────────────────────────────────────── + if not dry_run and stats["bound"] > 0: + try: + await db.commit() + logger.info(f"Commit eseguito: {stats['bound']} ricevute bindato") + except Exception as e: + logger.error(f"Errore durante il commit: {e}", exc_info=True) + await db.rollback() + stats["errors"] += stats["bound"] + stats["bound"] = 0 + + return stats + + +async def main() -> None: + parser = argparse.ArgumentParser( + description="Binding retroattivo delle ricevute PEC orfane" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Mostra cosa verrebbe fatto senza modificare il DB", + ) + parser.add_argument( + "--days", + type=int, + default=None, + metavar="N", + help="Processa solo le ricevute degli ultimi N giorni (default: tutte)", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Log dettagliato per ogni ricevuta", + ) + args = parser.parse_args() + + # ── Connessione DB ──────────────────────────────────────────────────────── + from app.config import get_settings + settings = get_settings() + + engine = create_async_engine(settings.database_url, echo=False) + SessionLocal = async_sessionmaker(engine, expire_on_commit=False) + + logger.info("=== Rebind ricevute PEC orfane ===") + if args.dry_run: + logger.info("MODALITA' DRY-RUN: nessuna modifica al DB") + + try: + async with SessionLocal() as db: + stats = await rebind_receipts( + db=db, + dry_run=args.dry_run, + days=args.days, + verbose=args.verbose, + ) + finally: + await engine.dispose() + + # ── Riepilogo ───────────────────────────────────────────────────────────── + print("\n=== Riepilogo ===") + print(f"Ricevute orfane trovate: {stats['total']}") + print(f"Bindato con successo: {stats['bound']}") + print(f"Outbound non trovato: {stats['not_found']}") + print(f"Transizioni stato: {stats['state_transitions']}") + print(f"Errori: {stats['errors']}") + if args.dry_run: + print("\n[DRY-RUN] Nessuna modifica al DB effettuata.") + + if stats["errors"] > 0: + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/worker/scripts/reclassify_pec.py b/worker/scripts/reclassify_pec.py new file mode 100644 index 0000000..19daab9 --- /dev/null +++ b/worker/scripts/reclassify_pec.py @@ -0,0 +1,260 @@ +""" +Script di riclassificazione messaggi PEC mal classificati (Aruba REM header bug). + +Problema: + Aruba PEC aggiunge header X-REM-Subject (e altri X-REM-*) anche ai messaggi + PEC-IT standard. La vecchia versione di detect_protocol() vedeva X-REM-Subject + e classificava il messaggio come REM europea, bypassando il classify_pec_message() + che avrebbe correttamente letto X-Ricevuta: accettazione. + Risultato: ricevute di accettazione salvate con pec_type='posta_certificata' + invece di 'accettazione', e quindi non bindate al messaggio outbound. + +Fix applicato in pec_parser.py: + detect_protocol() ora controlla prima X-Ricevuta/X-TipoRicevuta/X-Trasporto + (PEC-IT) prima di guardare X-REM-* (REM europea). + +Questo script: + 1. Trova messaggi inbound con pec_type='posta_certificata' E raw_eml_path + 2. Re-legge l'EML da MinIO + 3. Riclassifica con il parser corretto + 4. Se pec_type cambia, aggiorna il record + 5. Se il nuovo tipo e' una ricevuta, tenta il binding al messaggio outbound + +Esecuzione: + docker exec pechub-worker-1 python scripts/reclassify_pec.py + docker exec pechub-worker-1 python scripts/reclassify_pec.py --dry-run + docker exec pechub-worker-1 python scripts/reclassify_pec.py --days 7 +""" + +import argparse +import asyncio +import logging +import os +import sys +from datetime import UTC, datetime, timedelta + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sqlalchemy import and_, select +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker + +from app.models import Message +from app.parsers.pec_parser import classify_pec_message, apply_outbound_transition, detect_protocol +from app.parsers.rem_parser import classify_rem_message + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger("reclassify_pec") + + +async def reclassify_messages( + db: AsyncSession, + dry_run: bool = False, + days: int | None = None, +) -> dict: + """ + Riclassifica i messaggi inbound posta_certificata che potrebbero essere + ricevute mal classificate a causa del bug X-REM-Subject. + """ + import email as _email + + stats = { + "total_candidates": 0, + "reclassified": 0, + "bound": 0, + "state_transitions": 0, + "errors": 0, + "skipped_no_eml": 0, + } + + # MinIO client + from miniopy_async import Minio + from app.config import get_settings + settings = get_settings() + + minio = Minio( + endpoint=settings.minio_endpoint, + access_key=settings.minio_access_key, + secret_key=settings.minio_secret_key, + secure=settings.minio_use_ssl, + ) + + # Query: messaggi inbound posta_certificata con EML disponibile + conditions = [ + Message.direction == "inbound", + Message.pec_type == "posta_certificata", + Message.raw_eml_path.isnot(None), + ] + + if days is not None: + cutoff = datetime.now(UTC) - timedelta(days=days) + conditions.append(Message.created_at >= cutoff) + + result = await db.execute( + select(Message) + .where(and_(*conditions)) + .order_by(Message.created_at.asc()) + ) + candidates = list(result.scalars().all()) + stats["total_candidates"] = len(candidates) + + logger.info( + f"Trovati {stats['total_candidates']} messaggi inbound posta_certificata" + + (f" (ultimi {days} giorni)" if days else "") + + (" [DRY-RUN]" if dry_run else "") + ) + + for msg in candidates: + try: + # Scarica EML da MinIO + try: + resp = await minio.get_object(settings.minio_bucket, msg.raw_eml_path) + raw_eml = await resp.content.read() + resp.close() + except Exception as e: + logger.debug(f" EML non disponibile per {msg.id}: {e}") + stats["skipped_no_eml"] += 1 + continue + + # Re-classifica con il parser corretto + quick_msg = _email.message_from_bytes(raw_eml) + protocol = detect_protocol(quick_msg) + + if protocol == "rem_eu": + rem_class = classify_rem_message(quick_msg) + new_pec_type = rem_class.pec_type + new_is_receipt = rem_class.is_receipt + new_riferimento = rem_class.riferimento_message_id + else: + pec_class = classify_pec_message(quick_msg) + new_pec_type = pec_class.pec_type + new_is_receipt = pec_class.is_receipt + new_riferimento = pec_class.riferimento_message_id + + # Se il tipo non e' cambiato, skip + if new_pec_type == "posta_certificata": + continue + + logger.info( + f"Riclassificazione: {msg.id} " + f"'{msg.subject[:50] if msg.subject else ''}' " + f"posta_certificata → {new_pec_type}" + ) + + if not dry_run: + msg.pec_type = new_pec_type + msg.updated_at = datetime.now(UTC) + # Aggiorna anche riferimento_message_id se disponibile + if new_riferimento and not msg.riferimento_message_id: + msg.riferimento_message_id = new_riferimento + + stats["reclassified"] += 1 + + # Tenta binding al messaggio outbound se e' una ricevuta + if new_is_receipt and new_riferimento and msg.parent_message_id is None: + outbound_result = await db.execute( + select(Message).where( + Message.tenant_id == msg.tenant_id, + Message.message_id_header == new_riferimento, + Message.direction == "outbound", + ) + ) + candidates_out = list(outbound_result.scalars().all()) + + if candidates_out: + # Priorita' a imap_uid=NULL (send_pec canonical record) + parent = None + for m in candidates_out: + if m.imap_uid is None: + parent = m + break + if parent is None: + parent = candidates_out[0] + + if not dry_run: + msg.parent_message_id = parent.id + new_state = apply_outbound_transition(parent.state, new_pec_type) + if new_state: + old_state = parent.state + parent.state = new_state + parent.updated_at = datetime.now(UTC) + stats["state_transitions"] += 1 + logger.info( + f" State machine: {parent.id} {old_state!r} → {new_state!r}" + ) + + stats["bound"] += 1 + logger.info( + f" Bindato a outbound {parent.id}" + + (" [DRY-RUN]" if dry_run else "") + ) + else: + logger.debug( + f" Nessun outbound trovato per riferimento={new_riferimento!r}" + ) + + except Exception as e: + logger.error(f"Errore processando {msg.id}: {e}", exc_info=True) + stats["errors"] += 1 + continue + + # Commit + if not dry_run and (stats["reclassified"] > 0 or stats["bound"] > 0): + try: + await db.commit() + logger.info( + f"Commit: {stats['reclassified']} riclassificati, " + f"{stats['bound']} bindati" + ) + except Exception as e: + logger.error(f"Errore commit: {e}", exc_info=True) + await db.rollback() + stats["errors"] += stats["reclassified"] + stats["reclassified"] = 0 + stats["bound"] = 0 + + return stats + + +async def main() -> None: + parser = argparse.ArgumentParser( + description="Riclassifica messaggi PEC mal classificati (bug X-REM-Subject Aruba)" + ) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--days", type=int, default=None, metavar="N") + args = parser.parse_args() + + from app.config import get_settings + settings = get_settings() + + engine = create_async_engine(settings.database_url, echo=False) + SessionLocal = async_sessionmaker(engine, expire_on_commit=False) + + logger.info("=== Riclassificazione messaggi PEC (fix X-REM-Subject Aruba) ===") + if args.dry_run: + logger.info("MODALITA' DRY-RUN") + + try: + async with SessionLocal() as db: + stats = await reclassify_messages(db=db, dry_run=args.dry_run, days=args.days) + finally: + await engine.dispose() + + print("\n=== Riepilogo ===") + print(f"Candidati analizzati: {stats['total_candidates']}") + print(f"Riclassificati: {stats['reclassified']}") + print(f"Bindati a outbound: {stats['bound']}") + print(f"Transizioni stato: {stats['state_transitions']}") + print(f"EML non disponibili: {stats['skipped_no_eml']}") + print(f"Errori: {stats['errors']}") + if args.dry_run: + print("\n[DRY-RUN] Nessuna modifica effettuata.") + + if stats["errors"] > 0: + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())