""" Estrazione EML annidato nelle ricevute PEC (EML-in-EML) – Fase 3. Una ricevuta PEC è strutturalmente un messaggio multipart che contiene: - text/plain → testo leggibile della ricevuta - application/xml → dati strutturati XML (daticert.xml, opzionale) - message/rfc822 → il messaggio originale allegato (o la busta PEC) Questo modulo estrae e analizza l'EML annidato per identificare il messaggio originale a cui si riferisce la ricevuta. Struttura tipica ricevuta Aruba PEC: multipart/mixed ├── text/plain (descrizione ricevuta) ├── application/xml (daticert.xml – campi strutturati) └── message/rfc822 (messaggio originale) ├── From: mittente@pec.it ├── To: destinatario@pec.it └── Subject: ... """ import email import email.header import email.message import email.utils import logging from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class NestedEmlInfo: """Informazioni estratte dall'EML annidato in una ricevuta PEC.""" raw_bytes: bytes message_id: str | None subject: str | None from_address: str | None to_addresses: list[str] = field(default_factory=list) def extract_nested_eml(msg: email.message.Message) -> NestedEmlInfo | None: """ Estrae l'EML annidato (message/rfc822) da una ricevuta PEC. Naviga il messaggio alla ricerca della prima parte di tipo message/rfc822 e restituisce le informazioni estratte dall'EML interno. Returns: NestedEmlInfo se trovato un EML annidato, None altrimenti. """ if not msg.is_multipart(): return None for part in msg.walk(): if part.get_content_type() != "message/rfc822": continue try: payload = part.get_payload() inner_bytes: bytes | None = None inner_msg: email.message.Message | None = None if isinstance(payload, list) and payload: # Forma canonica RFC: payload è lista di Message candidate = payload[0] if isinstance(candidate, email.message.Message): inner_msg = candidate inner_bytes = candidate.as_bytes() elif isinstance(payload, bytes): inner_bytes = payload try: inner_msg = email.message_from_bytes(payload) except Exception: pass elif isinstance(payload, str): inner_bytes = payload.encode("utf-8", errors="replace") try: inner_msg = email.message_from_bytes(inner_bytes) except Exception: pass if inner_bytes and inner_msg: return _extract_info_from_inner(inner_msg, inner_bytes) except Exception as exc: logger.warning(f"Errore estrazione EML annidato: {exc}") return None def extract_receipt_xml(msg: email.message.Message) -> str | None: """ Estrae il contenuto XML strutturato da una ricevuta PEC. Le ricevute PEC conformi al DM 2 novembre 2005 contengono un allegato XML (tipicamente "daticert.xml") con i campi strutturati della ricevuta: tipo, timestamp, identificatore messaggio, mittente, destinatario, ecc. Returns: Stringa XML se trovata, None altrimenti. """ for part in msg.walk(): ct = part.get_content_type() if ct not in ("application/xml", "text/xml"): continue # Verifica che sia il daticert (opzionale: controlla filename) filename = part.get_filename() or part.get_param("name") or "" # Accetta qualsiasi XML nelle ricevute PEC (non solo daticert.xml) try: payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" return payload.decode(charset, errors="replace") except Exception as exc: logger.debug(f"Errore estrazione XML ricevuta (filename={filename!r}): {exc}") return None # ─── Helper privati ─────────────────────────────────────────────────────────── def _extract_info_from_inner( msg: email.message.Message, raw_bytes: bytes, ) -> NestedEmlInfo: """Estrae i campi identificativi dall'EML annidato.""" message_id = msg.get("Message-ID", "").strip() or None subject = _decode_hdr(msg.get("Subject")) from_address = email.utils.parseaddr(msg.get("From", ""))[1] or None to_addresses = _extract_addrs(msg.get("To")) return NestedEmlInfo( raw_bytes=raw_bytes, message_id=message_id, subject=subject, from_address=from_address, to_addresses=to_addresses, ) def _decode_hdr(value: str | None) -> str | None: """Decodifica un header RFC 2047.""" if not value: return None try: parts = email.header.decode_header(value) decoded = [] for part, charset in parts: if isinstance(part, bytes): decoded.append(part.decode(charset or "utf-8", errors="replace")) else: decoded.append(str(part)) return "".join(decoded).strip() except Exception: return str(value) def _extract_addrs(field_value: str | None) -> list[str]: """Estrae lista di indirizzi email da un campo header.""" if not field_value: return [] try: parsed = email.utils.getaddresses([field_value]) return [addr for _, addr in parsed if addr] except Exception: return []