Test smtp

This commit is contained in:
2026-03-19 16:36:26 +01:00
parent 7fc9108d2a
commit 3bdb32e6ae
2 changed files with 540 additions and 31 deletions
+4 -31
View File
@@ -10,39 +10,12 @@ Non effettuare test da Browser, ci penso io
Queste le caselle PEC e i loro parametri IMAP/SMTP che puoi usare per test, non effettuare invii per adesso
Casella: gmgspa@pec.it
Password: Gmgbaccio1960!26
Server IMAP: imaps.pec.mail-certificata.eu
Casella: matteo.giustini@arubapec.it
Password: MadonnaPuttana1!
Server IMAP: imaps.pec.aruba.it
Porta:993
SSL: Sì
Server SMTP: smtps.pec.mail-certificata.eu
Porta: 465
SSL: Sì
Casella: akron@pec.akronservices.it
Password: Akron@PEC2026!
Server IMAP: imap.pec-email.com
Porta:993
SSL: Sì
Server SMTP: smtp.pec-email.com
Porta: 465
SSL: Sì
Casella: birindelliauto@pec.it
Password: PECBirindelli2026@
Server IMAP: imaps.pec.mail-certificata.eu
Porta:993
SSL: Sì
Server SMTP: smtps.pec.mail-certificata.eu
Porta: 465
SSL: Sì
Casella: quattrocarsrl@pec.it
Password: 4Car2026GMG!
Server IMAP: imaps.pec.mail-certificata.eu
Porta:993
SSL: Sì
Server SMTP: smtps.pec.mail-certificata.eu
Server SMTP: smtps.pec.aruba.it
Porta: 465
SSL: Sì
@@ -0,0 +1,536 @@
#!/usr/bin/env python3
"""
Test integrazione SMTP/IMAP REALE Casella Aruba PEC
======================================================
Testa:
1. Connessione SMTP SSL (porta 465) → smtps.pec.aruba.it
2. Invio PEC reale a matteo1801@spidmail.it
3. Connessione IMAP SSL (porta 993) → imaps.pec.aruba.it
4. Lista cartelle IMAP + ispezione completa messaggi recenti
5. Rilevazione ricevute accettazione/consegna
Eseguire DENTRO il container worker:
docker exec -e PYTHONPATH=/worker pecflow-worker-1 python \
/worker/tests/integration/test_smtp_real_aruba.py
"""
import asyncio
import base64
import email
import email.header
import os
import sys
from datetime import datetime
from unittest.mock import MagicMock
# ─── Variabili d'ambiente ─────────────────────────────────────────────────────
if "ENCRYPTION_KEY" not in os.environ:
os.environ["ENCRYPTION_KEY"] = "6465762d656e6372797074696f6e2d6b65792d6e6f742d666f722d70726f6400"
os.environ.setdefault("SECRET_KEY", "dev-secret-key-not-for-production-use-only-for-local-0000000000000")
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://pecflow:pecflow_dev_password@db:5432/pecflow")
os.environ.setdefault("REDIS_URL", "redis://redis:6379/0")
os.environ.setdefault("MINIO_ENDPOINT", "minio:9000")
# ─── Parametri PEC Aruba ─────────────────────────────────────────────────────
PEC_EMAIL = "matteo.giustini@arubapec.it"
PEC_PASSWORD = "MadonnaPuttana1!"
SMTP_HOST = "smtps.pec.aruba.it"
SMTP_PORT = 465
IMAP_HOST = "imaps.pec.aruba.it"
IMAP_PORT = 993
TO_ADDRESS = "matteo1801@spidmail.it"
# ─── Helpers cifratura ────────────────────────────────────────────────────────
def _encrypt(value: str) -> str:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
key = bytes.fromhex(os.environ["ENCRYPTION_KEY"])
nonce = os.urandom(12)
ct = AESGCM(key).encrypt(nonce, value.encode("utf-8"), None)
return base64.b64encode(nonce + ct).decode("ascii")
def make_fake_mailbox() -> MagicMock:
m = MagicMock()
m.email_address = PEC_EMAIL
m.smtp_host_enc = _encrypt(SMTP_HOST)
m.smtp_port_enc = _encrypt(str(SMTP_PORT))
m.smtp_user_enc = _encrypt(PEC_EMAIL)
m.smtp_pass_enc = _encrypt(PEC_PASSWORD)
m.smtp_use_tls = True
return m
def _decode_header(raw: str) -> str:
"""Decodifica un header MIME (gestisce encoded-words =?utf-8?...?=)."""
try:
parts = email.header.decode_header(raw)
decoded = []
for part, enc in parts:
if isinstance(part, bytes):
decoded.append(part.decode(enc or "utf-8", errors="replace"))
else:
decoded.append(part)
return "".join(decoded)
except Exception:
return raw
def _is_pec_receipt(headers_text: str) -> tuple[bool, str]:
"""
Determina se un messaggio è una ricevuta PEC.
Restituisce (is_receipt, tipo_ricevuta).
"""
lower = headers_text.lower()
# Header proprietari Aruba/PEC
receipt_markers = [
("x-ricevuta:", "ricevuta"),
("x-trasporto:", "posta-certificata"),
("x-tiporicevuta:", "tipo-ricevuta"),
("x-riferimentomessaggioid:", "riferimento"),
]
for marker, label in receipt_markers:
if marker in lower:
return True, label
# Pattern nel Subject
subject_keywords = [
"accettazione:",
"avvenuta consegna:",
"avvenuta-consegna:",
"errore di consegna:",
"posta certificata:",
"ricevuta di accettazione",
"ricevuta di avvenuta consegna",
]
for kw in subject_keywords:
if kw in lower:
return True, f"subject-match:{kw}"
return False, ""
# ─── STEP 0: Connettività SMTP ────────────────────────────────────────────────
async def test_smtp_connectivity() -> bool:
import aiosmtplib
_sep()
print("STEP 0 VERIFICA CONNETTIVITÀ SMTP")
_sep()
try:
smtp = aiosmtplib.SMTP(
hostname=SMTP_HOST, port=SMTP_PORT,
use_tls=True, start_tls=False, timeout=15,
)
await smtp.connect()
print(f" ✅ TCP+SSL OK → {SMTP_HOST}:{SMTP_PORT}")
await smtp.login(PEC_EMAIL, PEC_PASSWORD)
print(f" ✅ Autenticazione SMTP OK")
exts = list((smtp.esmtp_extensions or {}).keys())
print(f" ️ ESMTP: {exts}")
await smtp.quit()
print(f" ✅ QUIT OK")
return True
except Exception as exc:
print(f" ❌ ERRORE: {exc}")
import traceback; traceback.print_exc()
return False
# ─── STEP 1: Invio SMTP ───────────────────────────────────────────────────────
async def test_smtp_send() -> tuple[str | None, bool]:
try:
from app.config import get_settings
get_settings.cache_clear()
except Exception:
pass
from app.smtp.sender import SmtpSender
mailbox = make_fake_mailbox()
sender = SmtpSender(mailbox)
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"[PecFlow TEST] Verifica SMTP+Ricevute {ts}"
body = (
f"Messaggio di test automatico PecFlow {ts}\n\n"
"Verifica:\n"
" 1. Connessione SMTP SSL porta 465 → smtps.pec.aruba.it\n"
" 2. Autenticazione e invio PEC\n"
" 3. Ricezione ricevuta di accettazione\n"
" 4. Ricezione ricevuta di avvenuta consegna\n\n"
f"Mittente : {PEC_EMAIL}\n"
f"Destinato: {TO_ADDRESS}\n\n"
"Non rispondere. Generato da PecFlow SaaS.\n"
)
_sep()
print("STEP 1 INVIO SMTP")
_sep()
print(f" Da : {PEC_EMAIL}")
print(f" A : {TO_ADDRESS}")
print(f" Host : {SMTP_HOST}:{SMTP_PORT} (SSL)")
print(f" Oggetto: {subject}")
print()
try:
message_id, raw_eml = await sender.send(
to_addresses=[TO_ADDRESS],
cc_addresses=[],
subject=subject,
body_text=body,
)
print(f" ✅ INVIO OK")
print(f" Message-ID : {message_id}")
print(f" EML size : {len(raw_eml)} bytes")
return message_id, True
except Exception as exc:
print(f" ❌ ERRORE SMTP: {exc}")
import traceback; traceback.print_exc()
return None, False
# ─── STEP 2: Ispezione IMAP completa ─────────────────────────────────────────
async def test_imap_full_inspection() -> None:
"""
Connessione IMAP completa:
1. Lista tutte le cartelle
2. Ispeziona INBOX con fetch header completo
3. Cerca ricevute PEC in tutte le cartelle rilevanti
"""
import aioimaplib
_sep()
print("STEP 2 ISPEZIONE IMAP COMPLETA")
_sep()
print(f" Host : {IMAP_HOST}:{IMAP_PORT} (SSL)")
print(f" User : {PEC_EMAIL}")
print()
try:
client = aioimaplib.IMAP4_SSL(host=IMAP_HOST, port=IMAP_PORT, timeout=30)
await asyncio.wait_for(client.wait_hello_from_server(), timeout=30)
status, _ = await client.login(PEC_EMAIL, PEC_PASSWORD)
if status != "OK":
print(f" ❌ Login IMAP fallito: {status}")
return
print(f" ✅ Login IMAP riuscito")
# ── Lista cartelle ────────────────────────────────────────────────────
status, folder_data = await client.list('""', "*")
folders = []
if status == "OK":
for item in folder_data:
if not item:
continue
raw = item.decode("utf-8", errors="replace") if isinstance(item, bytes) else str(item)
# Estrai il nome cartella (ultima parte dopo l'ultimo separatore)
if '"/"' in raw or "|" in raw or raw.strip():
# Formato tipico: (\HasChildren) "/" "INBOX"
parts = raw.split('"')
if len(parts) >= 3:
folder_name = parts[-2].strip()
if folder_name:
folders.append(folder_name)
elif raw.strip():
# Prova a estrarre diversamente
tokens = raw.split()
if tokens:
folders.append(tokens[-1].strip('"'))
# Rimuovi duplicati mantenendo ordine
seen = set()
unique_folders = []
for f in folders:
if f not in seen and f:
seen.add(f)
unique_folders.append(f)
print(f" 📂 Cartelle IMAP disponibili ({len(unique_folders)}):")
for f in unique_folders:
print(f"{f}")
print()
# ── Cartelle da ispezionare per ricevute ──────────────────────────────
# Aruba PEC mette ricevute in INBOX, ma può avere cartelle dedicate
priority_folders = ["INBOX"]
for f in unique_folders:
fl = f.lower()
if any(kw in fl for kw in ["ricevut", "certif", "sent", "inviati", "outbox", "pec"]):
if f not in priority_folders:
priority_folders.append(f)
all_receipts: list[dict] = []
for folder in priority_folders:
print(f" 📁 Ispezione cartella: {folder!r}")
print(f" {'' * 50}")
try:
status, sel_data = await client.select(folder)
if status != "OK":
print(f" ⚠️ SELECT fallito: {status}")
print()
continue
# Conta messaggi
total = 0
for line in sel_data:
if isinstance(line, bytes) and b"EXISTS" in line:
try:
total = int(line.split()[0])
except Exception:
pass
print(f" 📬 Messaggi totali: {total}")
if total == 0:
print(f" (cartella vuota)")
print()
continue
# Fetch ultimi 10 messaggi (per numero sequenza)
start_seq = max(1, total - 9)
status, fetch_data = await client.fetch(
f"{start_seq}:{total}",
"(BODY.PEEK[HEADER])"
)
if status != "OK":
print(f" ⚠️ FETCH fallito: {status}")
print()
continue
# Parsa i risultati fetch
messages_found: list[dict] = []
current_headers = []
current_seq = None
for part in fetch_data:
if not part:
continue
if isinstance(part, bytes):
raw = part.decode("utf-8", errors="replace")
else:
raw = str(part)
# Linea di risposta FETCH (es: "1 FETCH (BODY[HEADER] {1234}")
if "FETCH" in raw and "BODY" in raw:
# Salva il messaggio precedente se esiste
if current_headers and current_seq is not None:
hdr_text = "\n".join(current_headers)
messages_found.append({
"seq": current_seq,
"headers": hdr_text,
})
# Estrai numero sequenza
try:
current_seq = int(raw.split()[0])
except Exception:
current_seq = "?"
current_headers = []
elif raw.strip() == ")":
# Fine del fetch per questo messaggio
if current_headers and current_seq is not None:
hdr_text = "\n".join(current_headers)
messages_found.append({
"seq": current_seq,
"headers": hdr_text,
})
current_headers = []
current_seq = None
else:
# Corpo dell'header
if current_seq is not None:
current_headers.append(raw)
# Aggiungi l'ultimo messaggio pendente
if current_headers and current_seq is not None:
messages_found.append({
"seq": current_seq,
"headers": "\n".join(current_headers),
})
# De-duplicazione per seq
seen_seqs: set = set()
unique_msgs: list[dict] = []
for m in messages_found:
if m["seq"] not in seen_seqs:
seen_seqs.add(m["seq"])
unique_msgs.append(m)
print(f" 🔍 Analizzati {len(unique_msgs)} messaggi recenti")
print()
for msg in unique_msgs:
hdrs = msg["headers"]
is_receipt, receipt_type = _is_pec_receipt(hdrs)
# Estrai soggetto
subject_raw = ""
for line in hdrs.splitlines():
if line.lower().startswith("subject:"):
subject_raw = line[8:].strip()
break
subject_decoded = _decode_header(subject_raw) if subject_raw else "(nessun oggetto)"
# Estrai From
from_raw = ""
for line in hdrs.splitlines():
if line.lower().startswith("from:"):
from_raw = line[5:].strip()
break
# Estrai Date
date_raw = ""
for line in hdrs.splitlines():
if line.lower().startswith("date:"):
date_raw = line[5:].strip()
break
# Estrai headers PEC specifici
x_ricevuta = ""
x_trasporto = ""
for line in hdrs.splitlines():
ll = line.lower()
if ll.startswith("x-ricevuta:"):
x_ricevuta = line.split(":", 1)[1].strip()
elif ll.startswith("x-trasporto:"):
x_trasporto = line.split(":", 1)[1].strip()
if is_receipt:
marker = "🏆 RICEVUTA PEC"
all_receipts.append({
"folder": folder,
"seq": msg["seq"],
"subject": subject_decoded,
"from": from_raw,
"date": date_raw,
"x_ricevuta": x_ricevuta,
"x_trasporto": x_trasporto,
"receipt_type": receipt_type,
})
else:
marker = "📧"
print(f" {marker} [seq={msg['seq']}]")
print(f" Subject : {subject_decoded}")
print(f" From : {from_raw[:80] if from_raw else '(n/a)'}")
print(f" Date : {date_raw[:60] if date_raw else '(n/a)'}")
if x_ricevuta:
print(f" X-Ricevuta: {x_ricevuta}")
if x_trasporto:
print(f" X-Trasporto: {x_trasporto}")
print()
except Exception as folder_err:
print(f" ❌ Errore cartella {folder!r}: {folder_err}")
print()
# ── Riepilogo ricevute ────────────────────────────────────────────────
_sep()
print("RIEPILOGO RICEVUTE PEC")
_sep()
if all_receipts:
print(f" ✅ TROVATE {len(all_receipts)} RICEVUTA/E PEC:\n")
for r in all_receipts:
print(f" ── {r['folder']} / seq={r['seq']} ──")
print(f" Subject : {r['subject']}")
print(f" From : {r['from']}")
print(f" Date : {r['date']}")
print(f" X-Ricevuta : {r.get('x_ricevuta', '-')}")
print(f" X-Trasporto: {r.get('x_trasporto', '-')}")
print(f" Tipo : {r['receipt_type']}")
print()
else:
print(" ⚠️ Nessuna ricevuta PEC trovata.")
print(" Possibili cause:")
print(" • Le ricevute non sono ancora arrivate (Aruba PEC: <30s normalmente)")
print(" • La casella destinatario non è PEC → ricevuta solo di accettazione")
print(" • I messaggi in INBOX non sono ancora stati aggiornati")
print()
print(" 💡 Ri-esegui lo STEP 2 manualmente tra qualche secondo con:")
print(" docker exec -e PYTHONPATH=/worker pecflow-worker-1 python -c \"")
print(" import asyncio, sys; sys.path.insert(0,'/worker')")
print(" from tests.integration.test_smtp_real_aruba import test_imap_full_inspection")
print(" asyncio.run(test_imap_full_inspection())\"")
await client.logout()
print(f"\n ✅ Logout IMAP completato")
except asyncio.TimeoutError:
print(f" ❌ Timeout IMAP ({IMAP_HOST}:{IMAP_PORT})")
except Exception as exc:
print(f" ❌ ERRORE IMAP: {exc}")
import traceback; traceback.print_exc()
# ─── Utilities ────────────────────────────────────────────────────────────────
def _sep(char: str = "", width: int = 60) -> None:
print(char * width)
def _banner() -> None:
_sep("")
print(" PecFlow Test SMTP/IMAP Reale (Aruba PEC)")
_sep("")
print(f" Timestamp : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Casella PEC : {PEC_EMAIL}")
print(f" SMTP : {SMTP_HOST}:{SMTP_PORT} (SSL)")
print(f" IMAP : {IMAP_HOST}:{IMAP_PORT} (SSL)")
print(f" Destinatario: {TO_ADDRESS}")
_sep("")
print()
# ─── Main ─────────────────────────────────────────────────────────────────────
async def main() -> None:
_banner()
# STEP 0: connettività SMTP
conn_ok = await test_smtp_connectivity()
print()
if not conn_ok:
print("❌ Connettività SMTP fallita.")
sys.exit(1)
# STEP 1: invio
message_id, send_ok = await test_smtp_send()
print()
if not send_ok:
print("❌ Invio SMTP fallito skip verifica ricevute.")
sys.exit(1)
# Attendi ricevute
wait_secs = 45
print(f"⏳ Attesa {wait_secs}s per ricezione ricevute Aruba...")
for r in range(wait_secs, 0, -5):
print(f" {r}s...", end="\r", flush=True)
await asyncio.sleep(5)
print(f" Attesa completata. ")
print()
# STEP 2: ispezione IMAP completa
await test_imap_full_inspection()
print()
_sep("")
print(" TEST COMPLETATO")
_sep("")
if message_id:
print(f" Message-ID: {message_id}")
print()
if __name__ == "__main__":
asyncio.run(main())