Files
2026-03-27 20:59:06 +01:00

374 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
SendService logica di business per l'invio PEC.
Responsabilità:
1. Valida permessi (check_can_send) sulla casella selezionata
2. Crea il record Message (direction=outbound, state=queued)
3. [Opzionale] Carica gli allegati su MinIO e crea i record Attachment
4. Crea il record SendJob (status=pending)
5. Enqueue il job arq 'send_pec' tramite il pool Redis/arq
6. Ritorna SendJob
Il worker (arq) gestisce la connessione SMTP, il retry e la scrittura
del raw EML su MinIO.
"""
import asyncio
import hashlib
import uuid
from datetime import datetime, timezone
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ForbiddenError, NotFoundError
from app.models.mailbox import Mailbox
from app.models.message import Attachment, Message, SendJob
from app.models.user import User
from app.schemas.send import SendJobResponse, SendPecRequest
from app.services.permission_service import PermissionService
# ─── Pool arq (singleton lazy) ────────────────────────────────────────────────
_arq_pool = None
_arq_pool_lock = asyncio.Lock()
async def _get_arq_pool():
"""Restituisce il pool arq condiviso, creandolo se necessario."""
global _arq_pool
if _arq_pool is not None:
return _arq_pool
async with _arq_pool_lock:
if _arq_pool is None:
from arq import create_pool
from arq.connections import RedisSettings
from app.config import get_settings
settings = get_settings()
import urllib.parse
parsed = urllib.parse.urlparse(settings.redis_url)
rs = RedisSettings(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
database=int(parsed.path.lstrip("/") or "0"),
password=parsed.password or None,
)
_arq_pool = await create_pool(rs)
return _arq_pool
async def close_arq_pool() -> None:
"""Chiude il pool arq alla shutdown dell'applicazione."""
global _arq_pool
if _arq_pool is not None:
await _arq_pool.aclose()
_arq_pool = None
# ─── Helper: HTML → testo semplice ────────────────────────────────────────────
def _html_to_plain(html: str) -> str:
"""
Converte HTML in testo semplice usando html.parser della stdlib.
Usato per generare la parte text/plain quando viene fornito solo body_html.
"""
from html.parser import HTMLParser
BLOCK_TAGS = {
"p", "div", "br", "li", "h1", "h2", "h3", "h4", "h5", "h6",
"blockquote", "pre", "hr", "tr",
}
class _Extractor(HTMLParser):
def __init__(self) -> None:
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str) -> None:
self._parts.append(data)
def handle_starttag(self, tag: str, attrs) -> None: # noqa: ANN001
if tag.lower() in BLOCK_TAGS:
self._parts.append("\n")
def text(self) -> str:
return "".join(self._parts).strip()
extractor = _Extractor()
extractor.feed(html)
return extractor.text()
# ─── SendService ──────────────────────────────────────────────────────────────
class SendService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ── Crea e accoda un invio ────────────────────────────────────────────────
async def create_send_job(
self,
current_user: User,
data: SendPecRequest,
attachments: list[dict] | None = None,
) -> SendJob:
"""
Crea Message + (Attachment*) + SendJob e accoda il job di invio.
Args:
current_user: utente autenticato che richiede l'invio
data: dati della PEC da inviare
attachments: lista opzionale di dict {filename, content: bytes, content_type}
Returns:
SendJob appena creato
Raises:
NotFoundError: casella non trovata o non appartenente al tenant
ForbiddenError: utente senza can_send sulla casella
"""
# ── Verifica casella ──────────────────────────────────────────────────
mailbox = await self.db.get(Mailbox, data.mailbox_id)
if (
not mailbox
or mailbox.tenant_id != current_user.tenant_id
or mailbox.status == "deleted"
):
raise NotFoundError("casella PEC mittente")
if mailbox.status != "active":
raise ForbiddenError(
f"La casella è in stato '{mailbox.status}' e non può inviare"
)
# ── Verifica permesso can_send ────────────────────────────────────────
if not current_user.is_admin:
perm_svc = PermissionService(self.db)
if not await perm_svc.check_can_send(current_user, data.mailbox_id):
raise ForbiddenError(
"Non hai il permesso di inviare da questa casella"
)
# ── Ricava body_text ──────────────────────────────────────────────────
body_text = data.body_text or ""
body_html: str | None = getattr(data, "body_html", None) or data.body_html # type: ignore[attr-defined]
# Se il body_text è vuoto ma abbiamo HTML, genera il plain text
if not body_text and body_html:
body_text = _html_to_plain(body_html)
# ── Crea il messaggio outbound ────────────────────────────────────────
now = datetime.now(tz=timezone.utc)
has_files = bool(attachments)
# Invio differito: il messaggio parte in stato 'draft' se programmato
scheduled_at = getattr(data, "scheduled_at", None)
is_scheduled = scheduled_at is not None and scheduled_at > now
message = Message(
tenant_id=current_user.tenant_id,
mailbox_id=data.mailbox_id,
direction="outbound",
pec_type="posta_certificata",
state="draft" if is_scheduled else "queued",
subject=data.subject,
from_address=mailbox.email_address,
to_addresses=[str(a) for a in data.to_addresses],
cc_addresses=[str(a) for a in data.cc_addresses] if data.cc_addresses else [],
body_text=body_text,
body_html=body_html,
has_attachments=has_files,
sent_at=None,
received_at=None,
)
# Collegamento a messaggio originale (per risposta/threading)
if data.reply_to_message_id:
parent = await self.db.get(Message, data.reply_to_message_id)
if parent and parent.tenant_id == current_user.tenant_id:
message.parent_message_id = data.reply_to_message_id
self.db.add(message)
await self.db.flush() # Ottieni message.id
# ── Carica allegati su MinIO e crea record Attachment ─────────────────
if has_files:
await self._upload_attachments(
message=message,
attachments=attachments, # type: ignore[arg-type]
tenant_id=current_user.tenant_id,
mailbox_id=data.mailbox_id,
)
# ── Crea il SendJob ───────────────────────────────────────────────────
job = SendJob(
tenant_id=current_user.tenant_id,
mailbox_id=data.mailbox_id,
message_id=message.id,
status="pending",
attempt_count=0,
max_attempts=5,
created_by=current_user.id,
queued_at=now,
scheduled_at=scheduled_at if is_scheduled else None,
)
self.db.add(job)
await self.db.flush()
# ── Enqueue job arq ───────────────────────────────────────────────────
try:
arq_pool = await _get_arq_pool()
if is_scheduled and scheduled_at:
# Invio differito: defer_until = scheduled_at
await arq_pool.enqueue_job(
"send_pec",
str(job.id),
_defer_until=scheduled_at,
)
else:
await arq_pool.enqueue_job("send_pec", str(job.id))
except Exception as e:
from app.core.logging import get_logger
logger = get_logger(__name__)
logger.warning(
f"[send_service] Impossibile enqueue send_pec job {job.id}: {e}. "
"Il job resterà in stato 'pending' per pickup manuale."
)
return job
# ── Upload allegati ────────────────────────────────────────────────────────
async def _upload_attachments(
self,
message: Message,
attachments: list[dict],
tenant_id: uuid.UUID,
mailbox_id: uuid.UUID,
) -> None:
"""
Carica ogni allegato su MinIO e crea il record Attachment nel DB.
Gestisce gli errori di upload singolarmente: un allegato che non
riesce a caricarsi non blocca gli altri.
"""
from app.core.logging import get_logger
from app.storage.minio_client import upload_attachment as minio_upload
logger = get_logger(__name__)
for att in attachments:
filename: str = att.get("filename") or "allegato"
content: bytes = att.get("content") or b""
content_type: str = att.get("content_type") or "application/octet-stream"
if not content:
continue
try:
storage_path = await minio_upload(
tenant_id=str(tenant_id),
mailbox_id=str(mailbox_id),
message_id=str(message.id),
filename=filename,
content=content,
content_type=content_type,
)
checksum = hashlib.sha256(content).hexdigest()
attachment_record = Attachment(
tenant_id=tenant_id,
message_id=message.id,
filename=filename,
content_type=content_type,
size_bytes=len(content),
storage_path=storage_path,
checksum_sha256=checksum,
)
self.db.add(attachment_record)
logger.debug(
f"[send_service] Allegato caricato: {storage_path} "
f"({len(content)} bytes)"
)
except Exception as upload_err:
logger.error(
f"[send_service] Errore upload allegato '{filename}': {upload_err}"
)
# Non sollevare: l'invio continuerà senza questo allegato
await self.db.flush()
# ── Lista job di invio ────────────────────────────────────────────────────
async def list_send_jobs(
self,
tenant_id: uuid.UUID,
page: int = 1,
page_size: int = 50,
mailbox_id: uuid.UUID | None = None,
status_filter: str | None = None,
) -> tuple[list[SendJob], int]:
"""Lista i job di invio del tenant con filtri opzionali."""
base_q = select(SendJob).where(SendJob.tenant_id == tenant_id)
if mailbox_id:
base_q = base_q.where(SendJob.mailbox_id == mailbox_id)
if status_filter:
base_q = base_q.where(SendJob.status == status_filter)
count_q = select(func.count()).select_from(base_q.subquery())
total = (await self.db.execute(count_q)).scalar_one()
items_q = (
base_q.order_by(SendJob.queued_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
result = await self.db.execute(items_q)
items = list(result.scalars().all())
return items, total
# ── Get singolo job ───────────────────────────────────────────────────────
async def get_send_job(
self,
job_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> SendJob:
"""Carica un singolo SendJob verificando l'appartenenza al tenant."""
job = await self.db.get(SendJob, job_id)
if not job or job.tenant_id != tenant_id:
raise NotFoundError("job di invio")
return job
# ── Annulla job (solo se pending) ─────────────────────────────────────────
async def cancel_send_job(
self,
job_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> SendJob:
"""Annulla un job di invio se è ancora in stato 'pending' o 'retrying'."""
job = await self.get_send_job(job_id, tenant_id)
if job.status not in ("pending", "retrying"):
raise ForbiddenError(
f"Impossibile annullare: il job è in stato '{job.status}'"
)
job.status = "failed"
job.last_error = "Annullato dall'utente"
if job.message_id:
msg = await self.db.get(Message, job.message_id)
if msg and msg.state in ("queued", "draft"):
msg.state = "failed"
await self.db.flush()
return job