diff --git a/KnowledgeBaseCline.md b/KnowledgeBaseCline.md index 4935cdc..dcf3133 100644 --- a/KnowledgeBaseCline.md +++ b/KnowledgeBaseCline.md @@ -10,39 +10,12 @@ Non effettuare test da Browser, ci penso io Queste le caselle PEC e i loro parametri IMAP/SMTP che puoi usare per test, non effettuare invii per adesso -Casella: gmgspa@pec.it -Password: Gmgbaccio1960!26 -Server IMAP: imaps.pec.mail-certificata.eu +Casella: matteo.giustini@arubapec.it +Password: MadonnaPuttana1! +Server IMAP: imaps.pec.aruba.it Porta:993 SSL: Sì -Server SMTP: smtps.pec.mail-certificata.eu -Porta: 465 -SSL: Sì - -Casella: akron@pec.akronservices.it -Password: Akron@PEC2026! -Server IMAP: imap.pec-email.com -Porta:993 -SSL: Sì -Server SMTP: smtp.pec-email.com -Porta: 465 -SSL: Sì - -Casella: birindelliauto@pec.it -Password: PECBirindelli2026@ -Server IMAP: imaps.pec.mail-certificata.eu -Porta:993 -SSL: Sì -Server SMTP: smtps.pec.mail-certificata.eu -Porta: 465 -SSL: Sì - -Casella: quattrocarsrl@pec.it -Password: 4Car2026GMG! -Server IMAP: imaps.pec.mail-certificata.eu -Porta:993 -SSL: Sì -Server SMTP: smtps.pec.mail-certificata.eu +Server SMTP: smtps.pec.aruba.it Porta: 465 SSL: Sì diff --git a/worker/tests/integration/test_smtp_real_aruba.py b/worker/tests/integration/test_smtp_real_aruba.py new file mode 100644 index 0000000..23eaddc --- /dev/null +++ b/worker/tests/integration/test_smtp_real_aruba.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python3 +""" +Test integrazione SMTP/IMAP REALE – Casella Aruba PEC +====================================================== + +Testa: +1. Connessione SMTP SSL (porta 465) → smtps.pec.aruba.it +2. Invio PEC reale a matteo1801@spidmail.it +3. Connessione IMAP SSL (porta 993) → imaps.pec.aruba.it +4. Lista cartelle IMAP + ispezione completa messaggi recenti +5. Rilevazione ricevute accettazione/consegna + +Eseguire DENTRO il container worker: + docker exec -e PYTHONPATH=/worker pecflow-worker-1 python \ + /worker/tests/integration/test_smtp_real_aruba.py +""" + +import asyncio +import base64 +import email +import email.header +import os +import sys +from datetime import datetime +from unittest.mock import MagicMock + +# ─── Variabili d'ambiente ───────────────────────────────────────────────────── +if "ENCRYPTION_KEY" not in os.environ: + os.environ["ENCRYPTION_KEY"] = "6465762d656e6372797074696f6e2d6b65792d6e6f742d666f722d70726f6400" +os.environ.setdefault("SECRET_KEY", "dev-secret-key-not-for-production-use-only-for-local-0000000000000") +os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://pecflow:pecflow_dev_password@db:5432/pecflow") +os.environ.setdefault("REDIS_URL", "redis://redis:6379/0") +os.environ.setdefault("MINIO_ENDPOINT", "minio:9000") + +# ─── Parametri PEC Aruba ───────────────────────────────────────────────────── +PEC_EMAIL = "matteo.giustini@arubapec.it" +PEC_PASSWORD = "MadonnaPuttana1!" +SMTP_HOST = "smtps.pec.aruba.it" +SMTP_PORT = 465 +IMAP_HOST = "imaps.pec.aruba.it" +IMAP_PORT = 993 +TO_ADDRESS = "matteo1801@spidmail.it" + + +# ─── Helpers cifratura ──────────────────────────────────────────────────────── + +def _encrypt(value: str) -> str: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + key = bytes.fromhex(os.environ["ENCRYPTION_KEY"]) + nonce = os.urandom(12) + ct = AESGCM(key).encrypt(nonce, value.encode("utf-8"), None) + return base64.b64encode(nonce + ct).decode("ascii") + + +def make_fake_mailbox() -> MagicMock: + m = MagicMock() + m.email_address = PEC_EMAIL + m.smtp_host_enc = _encrypt(SMTP_HOST) + m.smtp_port_enc = _encrypt(str(SMTP_PORT)) + m.smtp_user_enc = _encrypt(PEC_EMAIL) + m.smtp_pass_enc = _encrypt(PEC_PASSWORD) + m.smtp_use_tls = True + return m + + +def _decode_header(raw: str) -> str: + """Decodifica un header MIME (gestisce encoded-words =?utf-8?...?=).""" + try: + parts = email.header.decode_header(raw) + decoded = [] + for part, enc in parts: + if isinstance(part, bytes): + decoded.append(part.decode(enc or "utf-8", errors="replace")) + else: + decoded.append(part) + return "".join(decoded) + except Exception: + return raw + + +def _is_pec_receipt(headers_text: str) -> tuple[bool, str]: + """ + Determina se un messaggio è una ricevuta PEC. + Restituisce (is_receipt, tipo_ricevuta). + """ + lower = headers_text.lower() + + # Header proprietari Aruba/PEC + receipt_markers = [ + ("x-ricevuta:", "ricevuta"), + ("x-trasporto:", "posta-certificata"), + ("x-tiporicevuta:", "tipo-ricevuta"), + ("x-riferimentomessaggioid:", "riferimento"), + ] + for marker, label in receipt_markers: + if marker in lower: + return True, label + + # Pattern nel Subject + subject_keywords = [ + "accettazione:", + "avvenuta consegna:", + "avvenuta-consegna:", + "errore di consegna:", + "posta certificata:", + "ricevuta di accettazione", + "ricevuta di avvenuta consegna", + ] + for kw in subject_keywords: + if kw in lower: + return True, f"subject-match:{kw}" + + return False, "" + + +# ─── STEP 0: Connettività SMTP ──────────────────────────────────────────────── + +async def test_smtp_connectivity() -> bool: + import aiosmtplib + + _sep() + print("STEP 0 – VERIFICA CONNETTIVITÀ SMTP") + _sep() + + try: + smtp = aiosmtplib.SMTP( + hostname=SMTP_HOST, port=SMTP_PORT, + use_tls=True, start_tls=False, timeout=15, + ) + await smtp.connect() + print(f" ✅ TCP+SSL OK → {SMTP_HOST}:{SMTP_PORT}") + await smtp.login(PEC_EMAIL, PEC_PASSWORD) + print(f" ✅ Autenticazione SMTP OK") + exts = list((smtp.esmtp_extensions or {}).keys()) + print(f" ℹ️ ESMTP: {exts}") + await smtp.quit() + print(f" ✅ QUIT OK") + return True + except Exception as exc: + print(f" ❌ ERRORE: {exc}") + import traceback; traceback.print_exc() + return False + + +# ─── STEP 1: Invio SMTP ─────────────────────────────────────────────────────── + +async def test_smtp_send() -> tuple[str | None, bool]: + try: + from app.config import get_settings + get_settings.cache_clear() + except Exception: + pass + + from app.smtp.sender import SmtpSender + + mailbox = make_fake_mailbox() + sender = SmtpSender(mailbox) + + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + subject = f"[PecFlow TEST] Verifica SMTP+Ricevute – {ts}" + body = ( + f"Messaggio di test automatico PecFlow – {ts}\n\n" + "Verifica:\n" + " 1. Connessione SMTP SSL porta 465 → smtps.pec.aruba.it\n" + " 2. Autenticazione e invio PEC\n" + " 3. Ricezione ricevuta di accettazione\n" + " 4. Ricezione ricevuta di avvenuta consegna\n\n" + f"Mittente : {PEC_EMAIL}\n" + f"Destinato: {TO_ADDRESS}\n\n" + "Non rispondere. Generato da PecFlow SaaS.\n" + ) + + _sep() + print("STEP 1 – INVIO SMTP") + _sep() + print(f" Da : {PEC_EMAIL}") + print(f" A : {TO_ADDRESS}") + print(f" Host : {SMTP_HOST}:{SMTP_PORT} (SSL)") + print(f" Oggetto: {subject}") + print() + + try: + message_id, raw_eml = await sender.send( + to_addresses=[TO_ADDRESS], + cc_addresses=[], + subject=subject, + body_text=body, + ) + print(f" ✅ INVIO OK") + print(f" Message-ID : {message_id}") + print(f" EML size : {len(raw_eml)} bytes") + return message_id, True + except Exception as exc: + print(f" ❌ ERRORE SMTP: {exc}") + import traceback; traceback.print_exc() + return None, False + + +# ─── STEP 2: Ispezione IMAP completa ───────────────────────────────────────── + +async def test_imap_full_inspection() -> None: + """ + Connessione IMAP completa: + 1. Lista tutte le cartelle + 2. Ispeziona INBOX con fetch header completo + 3. Cerca ricevute PEC in tutte le cartelle rilevanti + """ + import aioimaplib + + _sep() + print("STEP 2 – ISPEZIONE IMAP COMPLETA") + _sep() + print(f" Host : {IMAP_HOST}:{IMAP_PORT} (SSL)") + print(f" User : {PEC_EMAIL}") + print() + + try: + client = aioimaplib.IMAP4_SSL(host=IMAP_HOST, port=IMAP_PORT, timeout=30) + await asyncio.wait_for(client.wait_hello_from_server(), timeout=30) + + status, _ = await client.login(PEC_EMAIL, PEC_PASSWORD) + if status != "OK": + print(f" ❌ Login IMAP fallito: {status}") + return + print(f" ✅ Login IMAP riuscito") + + # ── Lista cartelle ──────────────────────────────────────────────────── + status, folder_data = await client.list('""', "*") + folders = [] + if status == "OK": + for item in folder_data: + if not item: + continue + raw = item.decode("utf-8", errors="replace") if isinstance(item, bytes) else str(item) + # Estrai il nome cartella (ultima parte dopo l'ultimo separatore) + if '"/"' in raw or "|" in raw or raw.strip(): + # Formato tipico: (\HasChildren) "/" "INBOX" + parts = raw.split('"') + if len(parts) >= 3: + folder_name = parts[-2].strip() + if folder_name: + folders.append(folder_name) + elif raw.strip(): + # Prova a estrarre diversamente + tokens = raw.split() + if tokens: + folders.append(tokens[-1].strip('"')) + + # Rimuovi duplicati mantenendo ordine + seen = set() + unique_folders = [] + for f in folders: + if f not in seen and f: + seen.add(f) + unique_folders.append(f) + + print(f" 📂 Cartelle IMAP disponibili ({len(unique_folders)}):") + for f in unique_folders: + print(f" • {f}") + print() + + # ── Cartelle da ispezionare per ricevute ────────────────────────────── + # Aruba PEC mette ricevute in INBOX, ma può avere cartelle dedicate + priority_folders = ["INBOX"] + for f in unique_folders: + fl = f.lower() + if any(kw in fl for kw in ["ricevut", "certif", "sent", "inviati", "outbox", "pec"]): + if f not in priority_folders: + priority_folders.append(f) + + all_receipts: list[dict] = [] + + for folder in priority_folders: + print(f" 📁 Ispezione cartella: {folder!r}") + print(f" {'─' * 50}") + + try: + status, sel_data = await client.select(folder) + if status != "OK": + print(f" ⚠️ SELECT fallito: {status}") + print() + continue + + # Conta messaggi + total = 0 + for line in sel_data: + if isinstance(line, bytes) and b"EXISTS" in line: + try: + total = int(line.split()[0]) + except Exception: + pass + print(f" 📬 Messaggi totali: {total}") + + if total == 0: + print(f" (cartella vuota)") + print() + continue + + # Fetch ultimi 10 messaggi (per numero sequenza) + start_seq = max(1, total - 9) + status, fetch_data = await client.fetch( + f"{start_seq}:{total}", + "(BODY.PEEK[HEADER])" + ) + + if status != "OK": + print(f" ⚠️ FETCH fallito: {status}") + print() + continue + + # Parsa i risultati fetch + messages_found: list[dict] = [] + current_headers = [] + current_seq = None + + for part in fetch_data: + if not part: + continue + if isinstance(part, bytes): + raw = part.decode("utf-8", errors="replace") + else: + raw = str(part) + + # Linea di risposta FETCH (es: "1 FETCH (BODY[HEADER] {1234}") + if "FETCH" in raw and "BODY" in raw: + # Salva il messaggio precedente se esiste + if current_headers and current_seq is not None: + hdr_text = "\n".join(current_headers) + messages_found.append({ + "seq": current_seq, + "headers": hdr_text, + }) + # Estrai numero sequenza + try: + current_seq = int(raw.split()[0]) + except Exception: + current_seq = "?" + current_headers = [] + elif raw.strip() == ")": + # Fine del fetch per questo messaggio + if current_headers and current_seq is not None: + hdr_text = "\n".join(current_headers) + messages_found.append({ + "seq": current_seq, + "headers": hdr_text, + }) + current_headers = [] + current_seq = None + else: + # Corpo dell'header + if current_seq is not None: + current_headers.append(raw) + + # Aggiungi l'ultimo messaggio pendente + if current_headers and current_seq is not None: + messages_found.append({ + "seq": current_seq, + "headers": "\n".join(current_headers), + }) + + # De-duplicazione per seq + seen_seqs: set = set() + unique_msgs: list[dict] = [] + for m in messages_found: + if m["seq"] not in seen_seqs: + seen_seqs.add(m["seq"]) + unique_msgs.append(m) + + print(f" 🔍 Analizzati {len(unique_msgs)} messaggi recenti") + print() + + for msg in unique_msgs: + hdrs = msg["headers"] + is_receipt, receipt_type = _is_pec_receipt(hdrs) + + # Estrai soggetto + subject_raw = "" + for line in hdrs.splitlines(): + if line.lower().startswith("subject:"): + subject_raw = line[8:].strip() + break + subject_decoded = _decode_header(subject_raw) if subject_raw else "(nessun oggetto)" + + # Estrai From + from_raw = "" + for line in hdrs.splitlines(): + if line.lower().startswith("from:"): + from_raw = line[5:].strip() + break + + # Estrai Date + date_raw = "" + for line in hdrs.splitlines(): + if line.lower().startswith("date:"): + date_raw = line[5:].strip() + break + + # Estrai headers PEC specifici + x_ricevuta = "" + x_trasporto = "" + for line in hdrs.splitlines(): + ll = line.lower() + if ll.startswith("x-ricevuta:"): + x_ricevuta = line.split(":", 1)[1].strip() + elif ll.startswith("x-trasporto:"): + x_trasporto = line.split(":", 1)[1].strip() + + if is_receipt: + marker = "🏆 RICEVUTA PEC" + all_receipts.append({ + "folder": folder, + "seq": msg["seq"], + "subject": subject_decoded, + "from": from_raw, + "date": date_raw, + "x_ricevuta": x_ricevuta, + "x_trasporto": x_trasporto, + "receipt_type": receipt_type, + }) + else: + marker = "📧" + + print(f" {marker} [seq={msg['seq']}]") + print(f" Subject : {subject_decoded}") + print(f" From : {from_raw[:80] if from_raw else '(n/a)'}") + print(f" Date : {date_raw[:60] if date_raw else '(n/a)'}") + if x_ricevuta: + print(f" X-Ricevuta: {x_ricevuta}") + if x_trasporto: + print(f" X-Trasporto: {x_trasporto}") + print() + + except Exception as folder_err: + print(f" ❌ Errore cartella {folder!r}: {folder_err}") + print() + + # ── Riepilogo ricevute ──────────────────────────────────────────────── + _sep() + print("RIEPILOGO RICEVUTE PEC") + _sep() + + if all_receipts: + print(f" ✅ TROVATE {len(all_receipts)} RICEVUTA/E PEC:\n") + for r in all_receipts: + print(f" ── {r['folder']} / seq={r['seq']} ──") + print(f" Subject : {r['subject']}") + print(f" From : {r['from']}") + print(f" Date : {r['date']}") + print(f" X-Ricevuta : {r.get('x_ricevuta', '-')}") + print(f" X-Trasporto: {r.get('x_trasporto', '-')}") + print(f" Tipo : {r['receipt_type']}") + print() + else: + print(" ⚠️ Nessuna ricevuta PEC trovata.") + print(" Possibili cause:") + print(" • Le ricevute non sono ancora arrivate (Aruba PEC: <30s normalmente)") + print(" • La casella destinatario non è PEC → ricevuta solo di accettazione") + print(" • I messaggi in INBOX non sono ancora stati aggiornati") + print() + print(" 💡 Ri-esegui lo STEP 2 manualmente tra qualche secondo con:") + print(" docker exec -e PYTHONPATH=/worker pecflow-worker-1 python -c \"") + print(" import asyncio, sys; sys.path.insert(0,'/worker')") + print(" from tests.integration.test_smtp_real_aruba import test_imap_full_inspection") + print(" asyncio.run(test_imap_full_inspection())\"") + + await client.logout() + print(f"\n ✅ Logout IMAP completato") + + except asyncio.TimeoutError: + print(f" ❌ Timeout IMAP ({IMAP_HOST}:{IMAP_PORT})") + except Exception as exc: + print(f" ❌ ERRORE IMAP: {exc}") + import traceback; traceback.print_exc() + + +# ─── Utilities ──────────────────────────────────────────────────────────────── + +def _sep(char: str = "─", width: int = 60) -> None: + print(char * width) + + +def _banner() -> None: + _sep("═") + print(" PecFlow – Test SMTP/IMAP Reale (Aruba PEC)") + _sep("═") + print(f" Timestamp : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f" Casella PEC : {PEC_EMAIL}") + print(f" SMTP : {SMTP_HOST}:{SMTP_PORT} (SSL)") + print(f" IMAP : {IMAP_HOST}:{IMAP_PORT} (SSL)") + print(f" Destinatario: {TO_ADDRESS}") + _sep("═") + print() + + +# ─── Main ───────────────────────────────────────────────────────────────────── + +async def main() -> None: + _banner() + + # STEP 0: connettività SMTP + conn_ok = await test_smtp_connectivity() + print() + if not conn_ok: + print("❌ Connettività SMTP fallita.") + sys.exit(1) + + # STEP 1: invio + message_id, send_ok = await test_smtp_send() + print() + if not send_ok: + print("❌ Invio SMTP fallito – skip verifica ricevute.") + sys.exit(1) + + # Attendi ricevute + wait_secs = 45 + print(f"⏳ Attesa {wait_secs}s per ricezione ricevute Aruba...") + for r in range(wait_secs, 0, -5): + print(f" {r}s...", end="\r", flush=True) + await asyncio.sleep(5) + print(f" Attesa completata. ") + print() + + # STEP 2: ispezione IMAP completa + await test_imap_full_inspection() + + print() + _sep("═") + print(" TEST COMPLETATO") + _sep("═") + if message_id: + print(f" Message-ID: {message_id}") + print() + + +if __name__ == "__main__": + asyncio.run(main())