Files
PecHub/worker/app/parsers/receipt_extractor.py
T
2026-03-18 17:43:03 +01:00

175 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Estrazione EML annidato nelle ricevute PEC (EML-in-EML) Fase 3.
Una ricevuta PEC è strutturalmente un messaggio multipart che contiene:
- text/plain → testo leggibile della ricevuta
- application/xml → dati strutturati XML (daticert.xml, opzionale)
- message/rfc822 → il messaggio originale allegato (o la busta PEC)
Questo modulo estrae e analizza l'EML annidato per identificare
il messaggio originale a cui si riferisce la ricevuta.
Struttura tipica ricevuta Aruba PEC:
multipart/mixed
├── text/plain (descrizione ricevuta)
├── application/xml (daticert.xml campi strutturati)
└── message/rfc822 (messaggio originale)
├── From: mittente@pec.it
├── To: destinatario@pec.it
└── Subject: ...
"""
import email
import email.header
import email.message
import email.utils
import logging
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class NestedEmlInfo:
"""Informazioni estratte dall'EML annidato in una ricevuta PEC."""
raw_bytes: bytes
message_id: str | None
subject: str | None
from_address: str | None
to_addresses: list[str] = field(default_factory=list)
def extract_nested_eml(msg: email.message.Message) -> NestedEmlInfo | None:
"""
Estrae l'EML annidato (message/rfc822) da una ricevuta PEC.
Naviga il messaggio alla ricerca della prima parte di tipo message/rfc822
e restituisce le informazioni estratte dall'EML interno.
Returns:
NestedEmlInfo se trovato un EML annidato, None altrimenti.
"""
if not msg.is_multipart():
return None
for part in msg.walk():
if part.get_content_type() != "message/rfc822":
continue
try:
payload = part.get_payload()
inner_bytes: bytes | None = None
inner_msg: email.message.Message | None = None
if isinstance(payload, list) and payload:
# Forma canonica RFC: payload è lista di Message
candidate = payload[0]
if isinstance(candidate, email.message.Message):
inner_msg = candidate
inner_bytes = candidate.as_bytes()
elif isinstance(payload, bytes):
inner_bytes = payload
try:
inner_msg = email.message_from_bytes(payload)
except Exception:
pass
elif isinstance(payload, str):
inner_bytes = payload.encode("utf-8", errors="replace")
try:
inner_msg = email.message_from_bytes(inner_bytes)
except Exception:
pass
if inner_bytes and inner_msg:
return _extract_info_from_inner(inner_msg, inner_bytes)
except Exception as exc:
logger.warning(f"Errore estrazione EML annidato: {exc}")
return None
def extract_receipt_xml(msg: email.message.Message) -> str | None:
"""
Estrae il contenuto XML strutturato da una ricevuta PEC.
Le ricevute PEC conformi al DM 2 novembre 2005 contengono un allegato XML
(tipicamente "daticert.xml") con i campi strutturati della ricevuta:
tipo, timestamp, identificatore messaggio, mittente, destinatario, ecc.
Returns:
Stringa XML se trovata, None altrimenti.
"""
for part in msg.walk():
ct = part.get_content_type()
if ct not in ("application/xml", "text/xml"):
continue
# Verifica che sia il daticert (opzionale: controlla filename)
filename = part.get_filename() or part.get_param("name") or ""
# Accetta qualsiasi XML nelle ricevute PEC (non solo daticert.xml)
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
return payload.decode(charset, errors="replace")
except Exception as exc:
logger.debug(f"Errore estrazione XML ricevuta (filename={filename!r}): {exc}")
return None
# ─── Helper privati ───────────────────────────────────────────────────────────
def _extract_info_from_inner(
msg: email.message.Message,
raw_bytes: bytes,
) -> NestedEmlInfo:
"""Estrae i campi identificativi dall'EML annidato."""
message_id = msg.get("Message-ID", "").strip() or None
subject = _decode_hdr(msg.get("Subject"))
from_address = email.utils.parseaddr(msg.get("From", ""))[1] or None
to_addresses = _extract_addrs(msg.get("To"))
return NestedEmlInfo(
raw_bytes=raw_bytes,
message_id=message_id,
subject=subject,
from_address=from_address,
to_addresses=to_addresses,
)
def _decode_hdr(value: str | None) -> str | None:
"""Decodifica un header RFC 2047."""
if not value:
return None
try:
parts = email.header.decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(str(part))
return "".join(decoded).strip()
except Exception:
return str(value)
def _extract_addrs(field_value: str | None) -> list[str]:
"""Estrae lista di indirizzi email da un campo header."""
if not field_value:
return []
try:
parsed = email.utils.getaddresses([field_value])
return [addr for _, addr in parsed if addr]
except Exception:
return []