mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
537 lines
20 KiB
Python
537 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Test integrazione SMTP/IMAP REALE – Casella Aruba PEC
|
||
======================================================
|
||
|
||
Testa:
|
||
1. Connessione SMTP SSL (porta 465) → smtps.pec.aruba.it
|
||
2. Invio PEC reale a matteo1801@spidmail.it
|
||
3. Connessione IMAP SSL (porta 993) → imaps.pec.aruba.it
|
||
4. Lista cartelle IMAP + ispezione completa messaggi recenti
|
||
5. Rilevazione ricevute accettazione/consegna
|
||
|
||
Eseguire DENTRO il container worker:
|
||
docker exec -e PYTHONPATH=/worker pecflow-worker-1 python \
|
||
/worker/tests/integration/test_smtp_real_aruba.py
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import email
|
||
import email.header
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
from unittest.mock import MagicMock
|
||
|
||
# ─── Variabili d'ambiente ─────────────────────────────────────────────────────
|
||
if "ENCRYPTION_KEY" not in os.environ:
|
||
os.environ["ENCRYPTION_KEY"] = "6465762d656e6372797074696f6e2d6b65792d6e6f742d666f722d70726f6400"
|
||
os.environ.setdefault("SECRET_KEY", "dev-secret-key-not-for-production-use-only-for-local-0000000000000")
|
||
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://pecflow:pecflow_dev_password@db:5432/pecflow")
|
||
os.environ.setdefault("REDIS_URL", "redis://redis:6379/0")
|
||
os.environ.setdefault("MINIO_ENDPOINT", "minio:9000")
|
||
|
||
# ─── Parametri PEC Aruba ─────────────────────────────────────────────────────
|
||
PEC_EMAIL = "matteo.giustini@arubapec.it"
|
||
PEC_PASSWORD = "MadonnaPuttana1!"
|
||
SMTP_HOST = "smtps.pec.aruba.it"
|
||
SMTP_PORT = 465
|
||
IMAP_HOST = "imaps.pec.aruba.it"
|
||
IMAP_PORT = 993
|
||
TO_ADDRESS = "matteo1801@spidmail.it"
|
||
|
||
|
||
# ─── Helpers cifratura ────────────────────────────────────────────────────────
|
||
|
||
def _encrypt(value: str) -> str:
|
||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||
key = bytes.fromhex(os.environ["ENCRYPTION_KEY"])
|
||
nonce = os.urandom(12)
|
||
ct = AESGCM(key).encrypt(nonce, value.encode("utf-8"), None)
|
||
return base64.b64encode(nonce + ct).decode("ascii")
|
||
|
||
|
||
def make_fake_mailbox() -> MagicMock:
|
||
m = MagicMock()
|
||
m.email_address = PEC_EMAIL
|
||
m.smtp_host_enc = _encrypt(SMTP_HOST)
|
||
m.smtp_port_enc = _encrypt(str(SMTP_PORT))
|
||
m.smtp_user_enc = _encrypt(PEC_EMAIL)
|
||
m.smtp_pass_enc = _encrypt(PEC_PASSWORD)
|
||
m.smtp_use_tls = True
|
||
return m
|
||
|
||
|
||
def _decode_header(raw: str) -> str:
|
||
"""Decodifica un header MIME (gestisce encoded-words =?utf-8?...?=)."""
|
||
try:
|
||
parts = email.header.decode_header(raw)
|
||
decoded = []
|
||
for part, enc in parts:
|
||
if isinstance(part, bytes):
|
||
decoded.append(part.decode(enc or "utf-8", errors="replace"))
|
||
else:
|
||
decoded.append(part)
|
||
return "".join(decoded)
|
||
except Exception:
|
||
return raw
|
||
|
||
|
||
def _is_pec_receipt(headers_text: str) -> tuple[bool, str]:
|
||
"""
|
||
Determina se un messaggio è una ricevuta PEC.
|
||
Restituisce (is_receipt, tipo_ricevuta).
|
||
"""
|
||
lower = headers_text.lower()
|
||
|
||
# Header proprietari Aruba/PEC
|
||
receipt_markers = [
|
||
("x-ricevuta:", "ricevuta"),
|
||
("x-trasporto:", "posta-certificata"),
|
||
("x-tiporicevuta:", "tipo-ricevuta"),
|
||
("x-riferimentomessaggioid:", "riferimento"),
|
||
]
|
||
for marker, label in receipt_markers:
|
||
if marker in lower:
|
||
return True, label
|
||
|
||
# Pattern nel Subject
|
||
subject_keywords = [
|
||
"accettazione:",
|
||
"avvenuta consegna:",
|
||
"avvenuta-consegna:",
|
||
"errore di consegna:",
|
||
"posta certificata:",
|
||
"ricevuta di accettazione",
|
||
"ricevuta di avvenuta consegna",
|
||
]
|
||
for kw in subject_keywords:
|
||
if kw in lower:
|
||
return True, f"subject-match:{kw}"
|
||
|
||
return False, ""
|
||
|
||
|
||
# ─── STEP 0: Connettività SMTP ────────────────────────────────────────────────
|
||
|
||
async def test_smtp_connectivity() -> bool:
|
||
import aiosmtplib
|
||
|
||
_sep()
|
||
print("STEP 0 – VERIFICA CONNETTIVITÀ SMTP")
|
||
_sep()
|
||
|
||
try:
|
||
smtp = aiosmtplib.SMTP(
|
||
hostname=SMTP_HOST, port=SMTP_PORT,
|
||
use_tls=True, start_tls=False, timeout=15,
|
||
)
|
||
await smtp.connect()
|
||
print(f" ✅ TCP+SSL OK → {SMTP_HOST}:{SMTP_PORT}")
|
||
await smtp.login(PEC_EMAIL, PEC_PASSWORD)
|
||
print(f" ✅ Autenticazione SMTP OK")
|
||
exts = list((smtp.esmtp_extensions or {}).keys())
|
||
print(f" ℹ️ ESMTP: {exts}")
|
||
await smtp.quit()
|
||
print(f" ✅ QUIT OK")
|
||
return True
|
||
except Exception as exc:
|
||
print(f" ❌ ERRORE: {exc}")
|
||
import traceback; traceback.print_exc()
|
||
return False
|
||
|
||
|
||
# ─── STEP 1: Invio SMTP ───────────────────────────────────────────────────────
|
||
|
||
async def test_smtp_send() -> tuple[str | None, bool]:
|
||
try:
|
||
from app.config import get_settings
|
||
get_settings.cache_clear()
|
||
except Exception:
|
||
pass
|
||
|
||
from app.smtp.sender import SmtpSender
|
||
|
||
mailbox = make_fake_mailbox()
|
||
sender = SmtpSender(mailbox)
|
||
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
subject = f"[PecFlow TEST] Verifica SMTP+Ricevute – {ts}"
|
||
body = (
|
||
f"Messaggio di test automatico PecFlow – {ts}\n\n"
|
||
"Verifica:\n"
|
||
" 1. Connessione SMTP SSL porta 465 → smtps.pec.aruba.it\n"
|
||
" 2. Autenticazione e invio PEC\n"
|
||
" 3. Ricezione ricevuta di accettazione\n"
|
||
" 4. Ricezione ricevuta di avvenuta consegna\n\n"
|
||
f"Mittente : {PEC_EMAIL}\n"
|
||
f"Destinato: {TO_ADDRESS}\n\n"
|
||
"Non rispondere. Generato da PecFlow SaaS.\n"
|
||
)
|
||
|
||
_sep()
|
||
print("STEP 1 – INVIO SMTP")
|
||
_sep()
|
||
print(f" Da : {PEC_EMAIL}")
|
||
print(f" A : {TO_ADDRESS}")
|
||
print(f" Host : {SMTP_HOST}:{SMTP_PORT} (SSL)")
|
||
print(f" Oggetto: {subject}")
|
||
print()
|
||
|
||
try:
|
||
message_id, raw_eml = await sender.send(
|
||
to_addresses=[TO_ADDRESS],
|
||
cc_addresses=[],
|
||
subject=subject,
|
||
body_text=body,
|
||
)
|
||
print(f" ✅ INVIO OK")
|
||
print(f" Message-ID : {message_id}")
|
||
print(f" EML size : {len(raw_eml)} bytes")
|
||
return message_id, True
|
||
except Exception as exc:
|
||
print(f" ❌ ERRORE SMTP: {exc}")
|
||
import traceback; traceback.print_exc()
|
||
return None, False
|
||
|
||
|
||
# ─── STEP 2: Ispezione IMAP completa ─────────────────────────────────────────
|
||
|
||
async def test_imap_full_inspection() -> None:
|
||
"""
|
||
Connessione IMAP completa:
|
||
1. Lista tutte le cartelle
|
||
2. Ispeziona INBOX con fetch header completo
|
||
3. Cerca ricevute PEC in tutte le cartelle rilevanti
|
||
"""
|
||
import aioimaplib
|
||
|
||
_sep()
|
||
print("STEP 2 – ISPEZIONE IMAP COMPLETA")
|
||
_sep()
|
||
print(f" Host : {IMAP_HOST}:{IMAP_PORT} (SSL)")
|
||
print(f" User : {PEC_EMAIL}")
|
||
print()
|
||
|
||
try:
|
||
client = aioimaplib.IMAP4_SSL(host=IMAP_HOST, port=IMAP_PORT, timeout=30)
|
||
await asyncio.wait_for(client.wait_hello_from_server(), timeout=30)
|
||
|
||
status, _ = await client.login(PEC_EMAIL, PEC_PASSWORD)
|
||
if status != "OK":
|
||
print(f" ❌ Login IMAP fallito: {status}")
|
||
return
|
||
print(f" ✅ Login IMAP riuscito")
|
||
|
||
# ── Lista cartelle ────────────────────────────────────────────────────
|
||
status, folder_data = await client.list('""', "*")
|
||
folders = []
|
||
if status == "OK":
|
||
for item in folder_data:
|
||
if not item:
|
||
continue
|
||
raw = item.decode("utf-8", errors="replace") if isinstance(item, bytes) else str(item)
|
||
# Estrai il nome cartella (ultima parte dopo l'ultimo separatore)
|
||
if '"/"' in raw or "|" in raw or raw.strip():
|
||
# Formato tipico: (\HasChildren) "/" "INBOX"
|
||
parts = raw.split('"')
|
||
if len(parts) >= 3:
|
||
folder_name = parts[-2].strip()
|
||
if folder_name:
|
||
folders.append(folder_name)
|
||
elif raw.strip():
|
||
# Prova a estrarre diversamente
|
||
tokens = raw.split()
|
||
if tokens:
|
||
folders.append(tokens[-1].strip('"'))
|
||
|
||
# Rimuovi duplicati mantenendo ordine
|
||
seen = set()
|
||
unique_folders = []
|
||
for f in folders:
|
||
if f not in seen and f:
|
||
seen.add(f)
|
||
unique_folders.append(f)
|
||
|
||
print(f" 📂 Cartelle IMAP disponibili ({len(unique_folders)}):")
|
||
for f in unique_folders:
|
||
print(f" • {f}")
|
||
print()
|
||
|
||
# ── Cartelle da ispezionare per ricevute ──────────────────────────────
|
||
# Aruba PEC mette ricevute in INBOX, ma può avere cartelle dedicate
|
||
priority_folders = ["INBOX"]
|
||
for f in unique_folders:
|
||
fl = f.lower()
|
||
if any(kw in fl for kw in ["ricevut", "certif", "sent", "inviati", "outbox", "pec"]):
|
||
if f not in priority_folders:
|
||
priority_folders.append(f)
|
||
|
||
all_receipts: list[dict] = []
|
||
|
||
for folder in priority_folders:
|
||
print(f" 📁 Ispezione cartella: {folder!r}")
|
||
print(f" {'─' * 50}")
|
||
|
||
try:
|
||
status, sel_data = await client.select(folder)
|
||
if status != "OK":
|
||
print(f" ⚠️ SELECT fallito: {status}")
|
||
print()
|
||
continue
|
||
|
||
# Conta messaggi
|
||
total = 0
|
||
for line in sel_data:
|
||
if isinstance(line, bytes) and b"EXISTS" in line:
|
||
try:
|
||
total = int(line.split()[0])
|
||
except Exception:
|
||
pass
|
||
print(f" 📬 Messaggi totali: {total}")
|
||
|
||
if total == 0:
|
||
print(f" (cartella vuota)")
|
||
print()
|
||
continue
|
||
|
||
# Fetch ultimi 10 messaggi (per numero sequenza)
|
||
start_seq = max(1, total - 9)
|
||
status, fetch_data = await client.fetch(
|
||
f"{start_seq}:{total}",
|
||
"(BODY.PEEK[HEADER])"
|
||
)
|
||
|
||
if status != "OK":
|
||
print(f" ⚠️ FETCH fallito: {status}")
|
||
print()
|
||
continue
|
||
|
||
# Parsa i risultati fetch
|
||
messages_found: list[dict] = []
|
||
current_headers = []
|
||
current_seq = None
|
||
|
||
for part in fetch_data:
|
||
if not part:
|
||
continue
|
||
if isinstance(part, bytes):
|
||
raw = part.decode("utf-8", errors="replace")
|
||
else:
|
||
raw = str(part)
|
||
|
||
# Linea di risposta FETCH (es: "1 FETCH (BODY[HEADER] {1234}")
|
||
if "FETCH" in raw and "BODY" in raw:
|
||
# Salva il messaggio precedente se esiste
|
||
if current_headers and current_seq is not None:
|
||
hdr_text = "\n".join(current_headers)
|
||
messages_found.append({
|
||
"seq": current_seq,
|
||
"headers": hdr_text,
|
||
})
|
||
# Estrai numero sequenza
|
||
try:
|
||
current_seq = int(raw.split()[0])
|
||
except Exception:
|
||
current_seq = "?"
|
||
current_headers = []
|
||
elif raw.strip() == ")":
|
||
# Fine del fetch per questo messaggio
|
||
if current_headers and current_seq is not None:
|
||
hdr_text = "\n".join(current_headers)
|
||
messages_found.append({
|
||
"seq": current_seq,
|
||
"headers": hdr_text,
|
||
})
|
||
current_headers = []
|
||
current_seq = None
|
||
else:
|
||
# Corpo dell'header
|
||
if current_seq is not None:
|
||
current_headers.append(raw)
|
||
|
||
# Aggiungi l'ultimo messaggio pendente
|
||
if current_headers and current_seq is not None:
|
||
messages_found.append({
|
||
"seq": current_seq,
|
||
"headers": "\n".join(current_headers),
|
||
})
|
||
|
||
# De-duplicazione per seq
|
||
seen_seqs: set = set()
|
||
unique_msgs: list[dict] = []
|
||
for m in messages_found:
|
||
if m["seq"] not in seen_seqs:
|
||
seen_seqs.add(m["seq"])
|
||
unique_msgs.append(m)
|
||
|
||
print(f" 🔍 Analizzati {len(unique_msgs)} messaggi recenti")
|
||
print()
|
||
|
||
for msg in unique_msgs:
|
||
hdrs = msg["headers"]
|
||
is_receipt, receipt_type = _is_pec_receipt(hdrs)
|
||
|
||
# Estrai soggetto
|
||
subject_raw = ""
|
||
for line in hdrs.splitlines():
|
||
if line.lower().startswith("subject:"):
|
||
subject_raw = line[8:].strip()
|
||
break
|
||
subject_decoded = _decode_header(subject_raw) if subject_raw else "(nessun oggetto)"
|
||
|
||
# Estrai From
|
||
from_raw = ""
|
||
for line in hdrs.splitlines():
|
||
if line.lower().startswith("from:"):
|
||
from_raw = line[5:].strip()
|
||
break
|
||
|
||
# Estrai Date
|
||
date_raw = ""
|
||
for line in hdrs.splitlines():
|
||
if line.lower().startswith("date:"):
|
||
date_raw = line[5:].strip()
|
||
break
|
||
|
||
# Estrai headers PEC specifici
|
||
x_ricevuta = ""
|
||
x_trasporto = ""
|
||
for line in hdrs.splitlines():
|
||
ll = line.lower()
|
||
if ll.startswith("x-ricevuta:"):
|
||
x_ricevuta = line.split(":", 1)[1].strip()
|
||
elif ll.startswith("x-trasporto:"):
|
||
x_trasporto = line.split(":", 1)[1].strip()
|
||
|
||
if is_receipt:
|
||
marker = "🏆 RICEVUTA PEC"
|
||
all_receipts.append({
|
||
"folder": folder,
|
||
"seq": msg["seq"],
|
||
"subject": subject_decoded,
|
||
"from": from_raw,
|
||
"date": date_raw,
|
||
"x_ricevuta": x_ricevuta,
|
||
"x_trasporto": x_trasporto,
|
||
"receipt_type": receipt_type,
|
||
})
|
||
else:
|
||
marker = "📧"
|
||
|
||
print(f" {marker} [seq={msg['seq']}]")
|
||
print(f" Subject : {subject_decoded}")
|
||
print(f" From : {from_raw[:80] if from_raw else '(n/a)'}")
|
||
print(f" Date : {date_raw[:60] if date_raw else '(n/a)'}")
|
||
if x_ricevuta:
|
||
print(f" X-Ricevuta: {x_ricevuta}")
|
||
if x_trasporto:
|
||
print(f" X-Trasporto: {x_trasporto}")
|
||
print()
|
||
|
||
except Exception as folder_err:
|
||
print(f" ❌ Errore cartella {folder!r}: {folder_err}")
|
||
print()
|
||
|
||
# ── Riepilogo ricevute ────────────────────────────────────────────────
|
||
_sep()
|
||
print("RIEPILOGO RICEVUTE PEC")
|
||
_sep()
|
||
|
||
if all_receipts:
|
||
print(f" ✅ TROVATE {len(all_receipts)} RICEVUTA/E PEC:\n")
|
||
for r in all_receipts:
|
||
print(f" ── {r['folder']} / seq={r['seq']} ──")
|
||
print(f" Subject : {r['subject']}")
|
||
print(f" From : {r['from']}")
|
||
print(f" Date : {r['date']}")
|
||
print(f" X-Ricevuta : {r.get('x_ricevuta', '-')}")
|
||
print(f" X-Trasporto: {r.get('x_trasporto', '-')}")
|
||
print(f" Tipo : {r['receipt_type']}")
|
||
print()
|
||
else:
|
||
print(" ⚠️ Nessuna ricevuta PEC trovata.")
|
||
print(" Possibili cause:")
|
||
print(" • Le ricevute non sono ancora arrivate (Aruba PEC: <30s normalmente)")
|
||
print(" • La casella destinatario non è PEC → ricevuta solo di accettazione")
|
||
print(" • I messaggi in INBOX non sono ancora stati aggiornati")
|
||
print()
|
||
print(" 💡 Ri-esegui lo STEP 2 manualmente tra qualche secondo con:")
|
||
print(" docker exec -e PYTHONPATH=/worker pecflow-worker-1 python -c \"")
|
||
print(" import asyncio, sys; sys.path.insert(0,'/worker')")
|
||
print(" from tests.integration.test_smtp_real_aruba import test_imap_full_inspection")
|
||
print(" asyncio.run(test_imap_full_inspection())\"")
|
||
|
||
await client.logout()
|
||
print(f"\n ✅ Logout IMAP completato")
|
||
|
||
except asyncio.TimeoutError:
|
||
print(f" ❌ Timeout IMAP ({IMAP_HOST}:{IMAP_PORT})")
|
||
except Exception as exc:
|
||
print(f" ❌ ERRORE IMAP: {exc}")
|
||
import traceback; traceback.print_exc()
|
||
|
||
|
||
# ─── Utilities ────────────────────────────────────────────────────────────────
|
||
|
||
def _sep(char: str = "─", width: int = 60) -> None:
|
||
print(char * width)
|
||
|
||
|
||
def _banner() -> None:
|
||
_sep("═")
|
||
print(" PecFlow – Test SMTP/IMAP Reale (Aruba PEC)")
|
||
_sep("═")
|
||
print(f" Timestamp : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print(f" Casella PEC : {PEC_EMAIL}")
|
||
print(f" SMTP : {SMTP_HOST}:{SMTP_PORT} (SSL)")
|
||
print(f" IMAP : {IMAP_HOST}:{IMAP_PORT} (SSL)")
|
||
print(f" Destinatario: {TO_ADDRESS}")
|
||
_sep("═")
|
||
print()
|
||
|
||
|
||
# ─── Main ─────────────────────────────────────────────────────────────────────
|
||
|
||
async def main() -> None:
|
||
_banner()
|
||
|
||
# STEP 0: connettività SMTP
|
||
conn_ok = await test_smtp_connectivity()
|
||
print()
|
||
if not conn_ok:
|
||
print("❌ Connettività SMTP fallita.")
|
||
sys.exit(1)
|
||
|
||
# STEP 1: invio
|
||
message_id, send_ok = await test_smtp_send()
|
||
print()
|
||
if not send_ok:
|
||
print("❌ Invio SMTP fallito – skip verifica ricevute.")
|
||
sys.exit(1)
|
||
|
||
# Attendi ricevute
|
||
wait_secs = 45
|
||
print(f"⏳ Attesa {wait_secs}s per ricezione ricevute Aruba...")
|
||
for r in range(wait_secs, 0, -5):
|
||
print(f" {r}s...", end="\r", flush=True)
|
||
await asyncio.sleep(5)
|
||
print(f" Attesa completata. ")
|
||
print()
|
||
|
||
# STEP 2: ispezione IMAP completa
|
||
await test_imap_full_inspection()
|
||
|
||
print()
|
||
_sep("═")
|
||
print(" TEST COMPLETATO")
|
||
_sep("═")
|
||
if message_id:
|
||
print(f" Message-ID: {message_id}")
|
||
print()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|