""" Parser MIME completo per messaggi email/PEC (Fase 3). Responsabilità: - Parsing MIME ricorsivo (multipart, nested) - Estrazione body text e html - Estrazione allegati con filename, content_type, sha256 - Decodifica header RFC 2047 (=?utf-8?b?...?=) - Gestione parti message/rfc822 (EML-in-EML per ricevute PEC) """ import email import email.header import email.message import email.utils import hashlib import logging from dataclasses import dataclass, field from datetime import UTC, datetime logger = logging.getLogger(__name__) # ─── Data classes ───────────────────────────────────────────────────────────── @dataclass class AttachmentInfo: """Dati estratti da un singolo allegato MIME.""" filename: str content_type: str content: bytes size_bytes: int checksum_sha256: str is_inline: bool = False # True per parti inline (immagini nel corpo HTML) is_pec_system: bool = False # True per file di infrastruttura PEC (daticert.xml, postacert.eml) @dataclass class ParsedEmail: """Risultato completo del parsing di un EML.""" subject: str | None = None from_address: str | None = None to_addresses: list[str] = field(default_factory=list) cc_addresses: list[str] = field(default_factory=list) message_id: str | None = None date: datetime | None = None body_text: str | None = None body_html: str | None = None attachments: list[AttachmentInfo] = field(default_factory=list) has_attachments: bool = False # Riferimento all'oggetto email.message.Message originale (per parser PEC) raw_message: email.message.Message | None = None # ─── Funzioni helper (pubbliche, usate anche dai test) ─────────────────────── def decode_header(value: str | None) -> str | None: """ Decodifica un header RFC 2047 (=?charset?encoding?text?=) in stringa Python. Esempi: =?utf-8?b?UEVDIHRlc3Q=?= → "PEC test" =?iso-8859-1?q?Multa_n=2E_123?= → "Multa n. 123" """ if not value: return None try: parts = email.header.decode_header(value) decoded = [] for part, charset in parts: if isinstance(part, bytes): decoded.append(part.decode(charset or "utf-8", errors="replace")) else: decoded.append(str(part)) return "".join(decoded).strip() except Exception: return str(value) def extract_addresses(field_value: str | None) -> list[str]: """ Estrae una lista di indirizzi email da un campo header To/Cc/From. Gestisce sia "addr@example.com" sia "Nome ". """ if not field_value: return [] try: parsed = email.utils.getaddresses([field_value]) return [addr for _, addr in parsed if addr] except Exception: return [] def parse_date(date_str: str | None) -> datetime | None: """ Converte una stringa data RFC 2822 (header Date) in datetime con timezone. Ritorna None in caso di stringa non valida. """ if not date_str: return None try: parsed = email.utils.parsedate_to_datetime(date_str) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=UTC) return parsed except Exception: return None # ─── Parser principale ──────────────────────────────────────────────────────── def parse_eml(raw_bytes: bytes) -> ParsedEmail: """ Parsing completo di un raw EML. Estrae: - Header: Subject, From, To, Cc, Message-ID, Date - Body: text/plain, text/html (primo trovato per tipo) - Allegati: tutti i parti con filename, inclusi message/rfc822 Args: raw_bytes: byte del messaggio EML grezzo Returns: ParsedEmail con tutti i campi estratti (fields None/[] se non presenti) """ if not raw_bytes: return ParsedEmail() try: msg = email.message_from_bytes(raw_bytes) except Exception as exc: logger.warning(f"Errore parsing EML bytes: {exc}") return ParsedEmail() result = ParsedEmail() result.raw_message = msg # ── Header ──────────────────────────────────────────────────────────────── result.subject = decode_header(msg.get("Subject")) result.from_address = email.utils.parseaddr(msg.get("From", ""))[1] or None result.to_addresses = extract_addresses(msg.get("To")) result.cc_addresses = extract_addresses(msg.get("Cc")) result.message_id = msg.get("Message-ID", "").strip() or None result.date = parse_date(msg.get("Date")) # ── Body e allegati ─────────────────────────────────────────────────────── if msg.is_multipart(): _walk_parts(msg, result) else: _extract_single_part_body(msg, result) result.has_attachments = any( not att.is_pec_system for att in result.attachments ) if result.attachments else False return result # ─── Helper privati ─────────────────────────────────────────────────────────── # Nomi file usati dall'infrastruttura PEC (non allegati utente) _PEC_SYSTEM_FILENAMES = frozenset({ "daticert.xml", "postacert.eml", "smime.p7s", "smime.p7m", }) # Content-type usati dall'infrastruttura PEC _PEC_SYSTEM_CONTENT_TYPES = frozenset({ "application/pkcs7-signature", "application/pkcs7-mime", }) def _is_pec_system_part(filename: str | None, content_type: str) -> bool: """Determina se un allegato è parte dell'infrastruttura PEC.""" if filename and filename.lower() in _PEC_SYSTEM_FILENAMES: return True if content_type in _PEC_SYSTEM_CONTENT_TYPES: return True return False def _get_filename(part: email.message.Message) -> str | None: """ Estrae il filename da un part MIME. Prova nell'ordine: 1. Content-Disposition: attachment; filename="..." 2. Content-Type: ...; name="..." """ filename = part.get_filename() if filename: return decode_header(filename) name = part.get_param("name") if name: return decode_header(name) return None def _walk_parts(msg: email.message.Message, result: ParsedEmail) -> None: """ Naviga ricorsivamente tutti i part MIME del messaggio. Logica: - multipart/* → salta (è un container, i figli vengono visitati nel loop) - text/plain → body_text (primo trovato, se non è "attachment") - text/html → body_html (primo trovato, se non è "attachment") - message/rfc822 → allegato di tipo EML-in-EML - tutto il resto con filename → allegato """ for part in msg.walk(): # Salta container multipart if part.get_content_maintype() == "multipart": continue ct = part.get_content_type() disp = part.get("Content-Disposition", "") filename = _get_filename(part) # ── EML-in-EML (message/rfc822) ─────────────────────────────────────── if ct == "message/rfc822": _extract_eml_in_eml(part, filename, result) continue # ── Allegato esplicito (Content-Disposition: attachment) ────────────── if "attachment" in disp: if filename: _extract_attachment(part, filename, ct, is_inline=False, result=result) continue # ── Parte inline con filename (es. immagine embeds in HTML) ─────────── if "inline" in disp and filename: _extract_attachment(part, filename, ct, is_inline=True, result=result) continue # ── Body text/plain ─────────────────────────────────────────────────── if ct == "text/plain" and result.body_text is None: body = _decode_payload(part) if body is not None: result.body_text = body continue # ── Body text/html ──────────────────────────────────────────────────── if ct == "text/html" and result.body_html is None: body = _decode_payload(part) if body is not None: result.body_html = body continue # ── Altri tipi con filename → allegato ──────────────────────────────── if filename: _extract_attachment(part, filename, ct, is_inline=False, result=result) def _extract_single_part_body(msg: email.message.Message, result: ParsedEmail) -> None: """Estrae il body per messaggi non-multipart.""" ct = msg.get_content_type() body = _decode_payload(msg) if body is None: return if ct == "text/plain": result.body_text = body elif ct == "text/html": result.body_html = body def _decode_payload(part: email.message.Message) -> str | None: """Decodifica il payload di una parte MIME come stringa testo.""" try: raw = part.get_payload(decode=True) if raw is None: return None charset = part.get_content_charset() or "utf-8" return raw.decode(charset, errors="replace") except Exception as exc: logger.debug(f"Errore decodifica payload {part.get_content_type()}: {exc}") return None def _extract_eml_in_eml( part: email.message.Message, filename: str | None, result: ParsedEmail, ) -> None: """ Estrae il messaggio EML annidato in un part message/rfc822. Nella struttura PEC, questo è tipicamente il messaggio originale allegato alle ricevute di consegna/accettazione. """ try: payload = part.get_payload() inner_bytes: bytes | None = None if isinstance(payload, list) and payload: # Forma canonica: payload è lista di Message inner_msg = payload[0] if isinstance(inner_msg, email.message.Message): inner_bytes = inner_msg.as_bytes() elif isinstance(payload, bytes): inner_bytes = payload elif isinstance(payload, str): inner_bytes = payload.encode("utf-8", errors="replace") if inner_bytes: eff_filename = filename or "inner_message.eml" is_system = _is_pec_system_part(eff_filename, "message/rfc822") att = AttachmentInfo( filename=eff_filename, content_type="message/rfc822", content=inner_bytes, size_bytes=len(inner_bytes), checksum_sha256=hashlib.sha256(inner_bytes).hexdigest(), is_inline=False, is_pec_system=is_system, ) result.attachments.append(att) except Exception as exc: logger.warning(f"Errore estrazione EML-in-EML: {exc}") def _extract_attachment( part: email.message.Message, filename: str, content_type: str, is_inline: bool, result: ParsedEmail, ) -> None: """Estrae il contenuto di un allegato e lo aggiunge al ParsedEmail.""" try: content = part.get_payload(decode=True) if content is None: return is_system = _is_pec_system_part(filename, content_type) att = AttachmentInfo( filename=filename, content_type=content_type, content=content, size_bytes=len(content), checksum_sha256=hashlib.sha256(content).hexdigest(), is_inline=is_inline, is_pec_system=is_system, ) result.attachments.append(att) except Exception as exc: logger.warning(f"Errore estrazione allegato {filename!r}: {exc}")