""" Script di riclassificazione messaggi PEC mal classificati (Aruba REM header bug). Problema: Aruba PEC aggiunge header X-REM-Subject (e altri X-REM-*) anche ai messaggi PEC-IT standard. La vecchia versione di detect_protocol() vedeva X-REM-Subject e classificava il messaggio come REM europea, bypassando il classify_pec_message() che avrebbe correttamente letto X-Ricevuta: accettazione. Risultato: ricevute di accettazione salvate con pec_type='posta_certificata' invece di 'accettazione', e quindi non bindate al messaggio outbound. Fix applicato in pec_parser.py: detect_protocol() ora controlla prima X-Ricevuta/X-TipoRicevuta/X-Trasporto (PEC-IT) prima di guardare X-REM-* (REM europea). Questo script: 1. Trova messaggi inbound con pec_type='posta_certificata' E raw_eml_path 2. Re-legge l'EML da MinIO 3. Riclassifica con il parser corretto 4. Se pec_type cambia, aggiorna il record 5. Se il nuovo tipo e' una ricevuta, tenta il binding al messaggio outbound Esecuzione: docker exec pechub-worker-1 python scripts/reclassify_pec.py docker exec pechub-worker-1 python scripts/reclassify_pec.py --dry-run docker exec pechub-worker-1 python scripts/reclassify_pec.py --days 7 """ import argparse import asyncio import logging import os import sys from datetime import UTC, datetime, timedelta sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from app.models import Message from app.parsers.pec_parser import classify_pec_message, apply_outbound_transition, detect_protocol from app.parsers.rem_parser import classify_rem_message logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger("reclassify_pec") async def reclassify_messages( db: AsyncSession, dry_run: bool = False, days: int | None = None, ) -> dict: """ Riclassifica i messaggi inbound posta_certificata che potrebbero essere ricevute mal classificate a causa del bug X-REM-Subject. """ import email as _email stats = { "total_candidates": 0, "reclassified": 0, "bound": 0, "state_transitions": 0, "errors": 0, "skipped_no_eml": 0, } # MinIO client from miniopy_async import Minio from app.config import get_settings settings = get_settings() minio = Minio( endpoint=settings.minio_endpoint, access_key=settings.minio_access_key, secret_key=settings.minio_secret_key, secure=settings.minio_use_ssl, ) # Query: messaggi inbound posta_certificata con EML disponibile conditions = [ Message.direction == "inbound", Message.pec_type == "posta_certificata", Message.raw_eml_path.isnot(None), ] if days is not None: cutoff = datetime.now(UTC) - timedelta(days=days) conditions.append(Message.created_at >= cutoff) result = await db.execute( select(Message) .where(and_(*conditions)) .order_by(Message.created_at.asc()) ) candidates = list(result.scalars().all()) stats["total_candidates"] = len(candidates) logger.info( f"Trovati {stats['total_candidates']} messaggi inbound posta_certificata" + (f" (ultimi {days} giorni)" if days else "") + (" [DRY-RUN]" if dry_run else "") ) for msg in candidates: try: # Scarica EML da MinIO try: resp = await minio.get_object(settings.minio_bucket, msg.raw_eml_path) raw_eml = await resp.content.read() resp.close() except Exception as e: logger.debug(f" EML non disponibile per {msg.id}: {e}") stats["skipped_no_eml"] += 1 continue # Re-classifica con il parser corretto quick_msg = _email.message_from_bytes(raw_eml) protocol = detect_protocol(quick_msg) if protocol == "rem_eu": rem_class = classify_rem_message(quick_msg) new_pec_type = rem_class.pec_type new_is_receipt = rem_class.is_receipt new_riferimento = rem_class.riferimento_message_id else: pec_class = classify_pec_message(quick_msg) new_pec_type = pec_class.pec_type new_is_receipt = pec_class.is_receipt new_riferimento = pec_class.riferimento_message_id # Se il tipo non e' cambiato, skip if new_pec_type == "posta_certificata": continue logger.info( f"Riclassificazione: {msg.id} " f"'{msg.subject[:50] if msg.subject else ''}' " f"posta_certificata → {new_pec_type}" ) if not dry_run: msg.pec_type = new_pec_type msg.updated_at = datetime.now(UTC) # Aggiorna anche riferimento_message_id se disponibile if new_riferimento and not msg.riferimento_message_id: msg.riferimento_message_id = new_riferimento stats["reclassified"] += 1 # Tenta binding al messaggio outbound se e' una ricevuta if new_is_receipt and new_riferimento and msg.parent_message_id is None: outbound_result = await db.execute( select(Message).where( Message.tenant_id == msg.tenant_id, Message.message_id_header == new_riferimento, Message.direction == "outbound", ) ) candidates_out = list(outbound_result.scalars().all()) if candidates_out: # Priorita' a imap_uid=NULL (send_pec canonical record) parent = None for m in candidates_out: if m.imap_uid is None: parent = m break if parent is None: parent = candidates_out[0] if not dry_run: msg.parent_message_id = parent.id new_state = apply_outbound_transition(parent.state, new_pec_type) if new_state: old_state = parent.state parent.state = new_state parent.updated_at = datetime.now(UTC) stats["state_transitions"] += 1 logger.info( f" State machine: {parent.id} {old_state!r} → {new_state!r}" ) stats["bound"] += 1 logger.info( f" Bindato a outbound {parent.id}" + (" [DRY-RUN]" if dry_run else "") ) else: logger.debug( f" Nessun outbound trovato per riferimento={new_riferimento!r}" ) except Exception as e: logger.error(f"Errore processando {msg.id}: {e}", exc_info=True) stats["errors"] += 1 continue # Commit if not dry_run and (stats["reclassified"] > 0 or stats["bound"] > 0): try: await db.commit() logger.info( f"Commit: {stats['reclassified']} riclassificati, " f"{stats['bound']} bindati" ) except Exception as e: logger.error(f"Errore commit: {e}", exc_info=True) await db.rollback() stats["errors"] += stats["reclassified"] stats["reclassified"] = 0 stats["bound"] = 0 return stats async def main() -> None: parser = argparse.ArgumentParser( description="Riclassifica messaggi PEC mal classificati (bug X-REM-Subject Aruba)" ) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--days", type=int, default=None, metavar="N") args = parser.parse_args() from app.config import get_settings settings = get_settings() engine = create_async_engine(settings.database_url, echo=False) SessionLocal = async_sessionmaker(engine, expire_on_commit=False) logger.info("=== Riclassificazione messaggi PEC (fix X-REM-Subject Aruba) ===") if args.dry_run: logger.info("MODALITA' DRY-RUN") try: async with SessionLocal() as db: stats = await reclassify_messages(db=db, dry_run=args.dry_run, days=args.days) finally: await engine.dispose() print("\n=== Riepilogo ===") print(f"Candidati analizzati: {stats['total_candidates']}") print(f"Riclassificati: {stats['reclassified']}") print(f"Bindati a outbound: {stats['bound']}") print(f"Transizioni stato: {stats['state_transitions']}") print(f"EML non disponibili: {stats['skipped_no_eml']}") print(f"Errori: {stats['errors']}") if args.dry_run: print("\n[DRY-RUN] Nessuna modifica effettuata.") if stats["errors"] > 0: sys.exit(1) if __name__ == "__main__": asyncio.run(main())