Files
2026-03-27 14:58:12 +01:00

389 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Servizio caselle PEC CRUD con cifratura AES-256-GCM delle credenziali (ADR-002).
"""
import time
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError
from app.core.security import decrypt_credential, encrypt_credential
from app.models.mailbox import Mailbox
from app.models.tenant import Tenant
from app.services.audit_service import log_audit
from app.schemas.mailbox import (
ConnectionTestRequest,
ConnectionTestResult,
MailboxCreateRequest,
MailboxUpdateRequest,
)
class MailboxService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ─── CRUD ─────────────────────────────────────────────────────────────────
async def create_mailbox(
self,
tenant_id: uuid.UUID,
data: MailboxCreateRequest,
created_by: uuid.UUID,
) -> Mailbox:
"""Crea una nuova casella PEC cifrando le credenziali."""
# Verifica limite caselle del tenant
tenant = await self.db.get(Tenant, tenant_id)
if not tenant:
raise NotFoundError("tenant")
count_result = await self.db.execute(
select(func.count(Mailbox.id)).where(
Mailbox.tenant_id == tenant_id,
Mailbox.status != "deleted",
)
)
current_count = count_result.scalar_one()
if current_count >= tenant.max_mailboxes:
raise ForbiddenError(
f"Limite caselle raggiunto ({tenant.max_mailboxes}). "
"Aggiorna il piano per aggiungerne altre."
)
# Verifica unicità email nel tenant
existing = await self.db.execute(
select(Mailbox.id).where(
Mailbox.tenant_id == tenant_id,
Mailbox.email_address == str(data.email_address),
Mailbox.status != "deleted",
)
)
if existing.scalar_one_or_none():
raise ConflictError("Casella con questo indirizzo già presente nel tenant")
mailbox = Mailbox(
tenant_id=tenant_id,
email_address=str(data.email_address),
display_name=data.display_name,
provider=data.provider,
# Cifra tutte le credenziali IMAP
imap_host_enc=encrypt_credential(data.imap_host),
imap_port_enc=encrypt_credential(str(data.imap_port)),
imap_user_enc=encrypt_credential(data.imap_user),
imap_pass_enc=encrypt_credential(data.imap_pass),
imap_use_ssl=data.imap_use_ssl,
# Cifra tutte le credenziali SMTP
smtp_host_enc=encrypt_credential(data.smtp_host),
smtp_port_enc=encrypt_credential(str(data.smtp_port)),
smtp_user_enc=encrypt_credential(data.smtp_user),
smtp_pass_enc=encrypt_credential(data.smtp_pass),
smtp_use_tls=data.smtp_use_tls,
created_by=created_by,
status="active",
)
self.db.add(mailbox)
await self.db.flush()
await log_audit(
self.db,
"mailbox.created",
tenant_id=tenant_id,
user_id=created_by,
resource_type="mailbox",
resource_id=mailbox.id,
payload={"email_address": mailbox.email_address},
)
return mailbox
async def list_mailboxes(
self,
tenant_id: uuid.UUID,
page: int = 1,
page_size: int = 50,
include_deleted: bool = False,
) -> tuple[list[Mailbox], int]:
"""Elenca le caselle di un tenant con paginazione."""
base_q = select(Mailbox).where(Mailbox.tenant_id == tenant_id)
if not include_deleted:
base_q = base_q.where(Mailbox.status != "deleted")
# Count totale
count_q = select(func.count()).select_from(base_q.subquery())
total = (await self.db.execute(count_q)).scalar_one()
# Pagina
items_q = (
base_q.order_by(Mailbox.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
result = await self.db.execute(items_q)
items = list(result.scalars().all())
return items, total
async def get_mailbox(
self,
mailbox_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> Mailbox:
"""Carica una singola casella verificando l'appartenenza al tenant."""
mailbox = await self.db.get(Mailbox, mailbox_id)
if not mailbox or mailbox.tenant_id != tenant_id or mailbox.status == "deleted":
raise NotFoundError("casella")
return mailbox
async def update_mailbox(
self,
mailbox_id: uuid.UUID,
tenant_id: uuid.UUID,
data: MailboxUpdateRequest,
) -> Mailbox:
"""Aggiornamento parziale di una casella. Ri-cifra le credenziali se modificate."""
mailbox = await self.get_mailbox(mailbox_id, tenant_id)
if data.display_name is not None:
mailbox.display_name = data.display_name
if data.provider is not None:
mailbox.provider = data.provider
if data.status is not None:
mailbox.status = data.status
# IMAP
if data.imap_host is not None:
mailbox.imap_host_enc = encrypt_credential(data.imap_host)
if data.imap_port is not None:
mailbox.imap_port_enc = encrypt_credential(str(data.imap_port))
if data.imap_user is not None:
mailbox.imap_user_enc = encrypt_credential(data.imap_user)
if data.imap_pass is not None:
mailbox.imap_pass_enc = encrypt_credential(data.imap_pass)
if data.imap_use_ssl is not None:
mailbox.imap_use_ssl = data.imap_use_ssl
# SMTP
if data.smtp_host is not None:
mailbox.smtp_host_enc = encrypt_credential(data.smtp_host)
if data.smtp_port is not None:
mailbox.smtp_port_enc = encrypt_credential(str(data.smtp_port))
if data.smtp_user is not None:
mailbox.smtp_user_enc = encrypt_credential(data.smtp_user)
if data.smtp_pass is not None:
mailbox.smtp_pass_enc = encrypt_credential(data.smtp_pass)
if data.smtp_use_tls is not None:
mailbox.smtp_use_tls = data.smtp_use_tls
# Reset error state se il tenant ha aggiornato le credenziali
if any(
v is not None
for v in [data.imap_host, data.imap_pass, data.imap_user, data.imap_port]
):
mailbox.sync_error_count = 0
mailbox.sync_error_msg = None
if mailbox.status == "error":
mailbox.status = "active"
await self.db.flush()
await log_audit(
self.db,
"mailbox.updated",
tenant_id=tenant_id,
resource_type="mailbox",
resource_id=mailbox_id,
payload={"mailbox_id": str(mailbox_id)},
)
return mailbox
async def delete_mailbox(
self,
mailbox_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> None:
"""Soft-delete: imposta status=deleted."""
mailbox = await self.get_mailbox(mailbox_id, tenant_id)
email = mailbox.email_address
mailbox.status = "deleted"
await self.db.flush()
await log_audit(
self.db,
"mailbox.deleted",
tenant_id=tenant_id,
resource_type="mailbox",
resource_id=mailbox_id,
payload={"email_address": email},
)
# ─── Decrypt helpers (usati internamente e dal worker) ───────────────────
@staticmethod
def decrypt_imap_credentials(mailbox: Mailbox) -> dict:
"""Decifra le credenziali IMAP per uso interno."""
return {
"host": decrypt_credential(mailbox.imap_host_enc),
"port": int(decrypt_credential(mailbox.imap_port_enc)),
"user": decrypt_credential(mailbox.imap_user_enc),
"password": decrypt_credential(mailbox.imap_pass_enc),
"use_ssl": mailbox.imap_use_ssl,
}
@staticmethod
def decrypt_smtp_credentials(mailbox: Mailbox) -> dict:
"""Decifra le credenziali SMTP per uso interno."""
return {
"host": decrypt_credential(mailbox.smtp_host_enc),
"port": int(decrypt_credential(mailbox.smtp_port_enc)),
"user": decrypt_credential(mailbox.smtp_user_enc),
"password": decrypt_credential(mailbox.smtp_pass_enc),
"use_tls": mailbox.smtp_use_tls,
}
# ─── Test connessione ─────────────────────────────────────────────────────
async def test_connection(
self,
mailbox_id: uuid.UUID,
tenant_id: uuid.UUID,
data: ConnectionTestRequest,
) -> ConnectionTestResult:
"""
Testa la connessione IMAP o SMTP della casella.
NON invia messaggi (conforme alle istruzioni: solo test connessione).
"""
mailbox = await self.get_mailbox(mailbox_id, tenant_id)
if data.protocol == "imap":
return await self._test_imap(mailbox)
else:
return await self._test_smtp(mailbox)
async def _test_imap(self, mailbox: Mailbox) -> ConnectionTestResult:
"""Testa connessione IMAP LOGIN + LIST + LOGOUT."""
import asyncio
try:
import aioimaplib
except ImportError:
return ConnectionTestResult(
success=False,
message="aioimaplib non installato nel worker. Eseguire dal worker container.",
)
creds = self.decrypt_imap_credentials(mailbox)
start = time.monotonic()
try:
if creds["use_ssl"]:
client = aioimaplib.IMAP4_SSL(
host=creds["host"], port=creds["port"], timeout=15
)
else:
client = aioimaplib.IMAP4(
host=creds["host"], port=creds["port"], timeout=15
)
await client.wait_hello_from_server()
status, _ = await client.login(creds["user"], creds["password"])
if status != "OK":
await client.logout()
return ConnectionTestResult(
success=False,
message=f"Login fallito: {status}",
latency_ms=round((time.monotonic() - start) * 1000, 1),
)
caps = list(client.protocol.capabilities) if hasattr(client.protocol, "capabilities") else []
await client.logout()
latency = round((time.monotonic() - start) * 1000, 1)
return ConnectionTestResult(
success=True,
message="Connessione IMAP riuscita",
latency_ms=latency,
capabilities=caps,
)
except asyncio.TimeoutError:
return ConnectionTestResult(
success=False,
message="Timeout connessione IMAP (15s)",
latency_ms=round((time.monotonic() - start) * 1000, 1),
)
except Exception as e:
return ConnectionTestResult(
success=False,
message=f"Errore connessione IMAP: {e}",
latency_ms=round((time.monotonic() - start) * 1000, 1),
)
async def _test_smtp(self, mailbox: Mailbox) -> ConnectionTestResult:
"""Testa connessione SMTP solo EHLO/NOOP, nessun invio (ADR-002)."""
try:
import aiosmtplib
except ImportError:
return ConnectionTestResult(
success=False,
message="aiosmtplib non installato. Installare nella fase SMTP.",
)
creds = self.decrypt_smtp_credentials(mailbox)
start = time.monotonic()
try:
smtp = aiosmtplib.SMTP(
hostname=creds["host"],
port=creds["port"],
use_tls=creds["use_tls"],
timeout=15,
)
await smtp.connect()
await smtp.login(creds["user"], creds["password"])
await smtp.quit()
return ConnectionTestResult(
success=True,
message="Connessione SMTP riuscita",
latency_ms=round((time.monotonic() - start) * 1000, 1),
)
except Exception as e:
return ConnectionTestResult(
success=False,
message=f"Errore connessione SMTP: {e}",
latency_ms=round((time.monotonic() - start) * 1000, 1),
)
# ─── Helper per costruire MailboxResponse ─────────────────────────────────
@staticmethod
def to_response_dict(mailbox: Mailbox) -> dict:
"""
Costruisce un dict con i dati decrittati per la risposta API.
Le credenziali (user/pass) non sono incluse.
"""
imap_host = decrypt_credential(mailbox.imap_host_enc)
imap_port = int(decrypt_credential(mailbox.imap_port_enc))
smtp_host = decrypt_credential(mailbox.smtp_host_enc)
smtp_port = int(decrypt_credential(mailbox.smtp_port_enc))
return {
"id": mailbox.id,
"tenant_id": mailbox.tenant_id,
"email_address": mailbox.email_address,
"display_name": mailbox.display_name,
"provider": mailbox.provider,
"imap_host": imap_host,
"imap_port": imap_port,
"imap_use_ssl": mailbox.imap_use_ssl,
"smtp_host": smtp_host,
"smtp_port": smtp_port,
"smtp_use_tls": mailbox.smtp_use_tls,
"status": mailbox.status,
"last_sync_at": mailbox.last_sync_at,
"last_sync_uid": mailbox.last_sync_uid,
"sync_error_msg": mailbox.sync_error_msg,
"sync_error_count": mailbox.sync_error_count,
"created_by": mailbox.created_by,
"created_at": mailbox.created_at,
"updated_at": mailbox.updated_at,
}