""" SendService – logica di business per l'invio PEC. Responsabilità: 1. Valida permessi (check_can_send) sulla casella selezionata 2. Crea il record Message (direction=outbound, state=queued) 3. [Opzionale] Carica gli allegati su MinIO e crea i record Attachment 4. Crea il record SendJob (status=pending) 5. Enqueue il job arq 'send_pec' tramite il pool Redis/arq 6. Ritorna SendJob Il worker (arq) gestisce la connessione SMTP, il retry e la scrittura del raw EML su MinIO. """ import asyncio import hashlib import uuid from datetime import datetime, timezone from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import ForbiddenError, NotFoundError from app.models.mailbox import Mailbox from app.models.message import Attachment, Message, SendJob from app.models.user import User from app.schemas.send import SendJobResponse, SendPecRequest from app.services.permission_service import PermissionService # ─── Pool arq (singleton lazy) ──────────────────────────────────────────────── _arq_pool = None _arq_pool_lock = asyncio.Lock() async def _get_arq_pool(): """Restituisce il pool arq condiviso, creandolo se necessario.""" global _arq_pool if _arq_pool is not None: return _arq_pool async with _arq_pool_lock: if _arq_pool is None: from arq import create_pool from arq.connections import RedisSettings from app.config import get_settings settings = get_settings() import urllib.parse parsed = urllib.parse.urlparse(settings.redis_url) rs = RedisSettings( host=parsed.hostname or "localhost", port=parsed.port or 6379, database=int(parsed.path.lstrip("/") or "0"), password=parsed.password or None, ) _arq_pool = await create_pool(rs) return _arq_pool async def close_arq_pool() -> None: """Chiude il pool arq alla shutdown dell'applicazione.""" global _arq_pool if _arq_pool is not None: await _arq_pool.aclose() _arq_pool = None # ─── Helper: HTML → testo semplice ──────────────────────────────────────────── def _html_to_plain(html: str) -> str: """ Converte HTML in testo semplice usando html.parser della stdlib. Usato per generare la parte text/plain quando viene fornito solo body_html. """ from html.parser import HTMLParser BLOCK_TAGS = { "p", "div", "br", "li", "h1", "h2", "h3", "h4", "h5", "h6", "blockquote", "pre", "hr", "tr", } class _Extractor(HTMLParser): def __init__(self) -> None: super().__init__() self._parts: list[str] = [] def handle_data(self, data: str) -> None: self._parts.append(data) def handle_starttag(self, tag: str, attrs) -> None: # noqa: ANN001 if tag.lower() in BLOCK_TAGS: self._parts.append("\n") def text(self) -> str: return "".join(self._parts).strip() extractor = _Extractor() extractor.feed(html) return extractor.text() # ─── SendService ────────────────────────────────────────────────────────────── class SendService: def __init__(self, db: AsyncSession) -> None: self.db = db # ── Crea e accoda un invio ──────────────────────────────────────────────── async def create_send_job( self, current_user: User, data: SendPecRequest, attachments: list[dict] | None = None, ) -> SendJob: """ Crea Message + (Attachment*) + SendJob e accoda il job di invio. Args: current_user: utente autenticato che richiede l'invio data: dati della PEC da inviare attachments: lista opzionale di dict {filename, content: bytes, content_type} Returns: SendJob appena creato Raises: NotFoundError: casella non trovata o non appartenente al tenant ForbiddenError: utente senza can_send sulla casella """ # ── Verifica casella ────────────────────────────────────────────────── mailbox = await self.db.get(Mailbox, data.mailbox_id) if ( not mailbox or mailbox.tenant_id != current_user.tenant_id or mailbox.status == "deleted" ): raise NotFoundError("casella PEC mittente") if mailbox.status != "active": raise ForbiddenError( f"La casella è in stato '{mailbox.status}' e non può inviare" ) # ── Verifica permesso can_send ──────────────────────────────────────── if not current_user.is_admin: perm_svc = PermissionService(self.db) if not await perm_svc.check_can_send(current_user, data.mailbox_id): raise ForbiddenError( "Non hai il permesso di inviare da questa casella" ) # ── Ricava body_text ────────────────────────────────────────────────── body_text = data.body_text or "" body_html: str | None = getattr(data, "body_html", None) or data.body_html # type: ignore[attr-defined] # Se il body_text è vuoto ma abbiamo HTML, genera il plain text if not body_text and body_html: body_text = _html_to_plain(body_html) # ── Crea il messaggio outbound ──────────────────────────────────────── now = datetime.now(tz=timezone.utc) has_files = bool(attachments) message = Message( tenant_id=current_user.tenant_id, mailbox_id=data.mailbox_id, direction="outbound", pec_type="posta_certificata", state="queued", subject=data.subject, from_address=mailbox.email_address, to_addresses=[str(a) for a in data.to_addresses], cc_addresses=[str(a) for a in data.cc_addresses] if data.cc_addresses else [], body_text=body_text, body_html=body_html, has_attachments=has_files, sent_at=None, received_at=None, ) # Collegamento a messaggio originale (per risposta/threading) if data.reply_to_message_id: parent = await self.db.get(Message, data.reply_to_message_id) if parent and parent.tenant_id == current_user.tenant_id: message.parent_message_id = data.reply_to_message_id self.db.add(message) await self.db.flush() # Ottieni message.id # ── Carica allegati su MinIO e crea record Attachment ───────────────── if has_files: await self._upload_attachments( message=message, attachments=attachments, # type: ignore[arg-type] tenant_id=current_user.tenant_id, mailbox_id=data.mailbox_id, ) # ── Crea il SendJob ─────────────────────────────────────────────────── job = SendJob( tenant_id=current_user.tenant_id, mailbox_id=data.mailbox_id, message_id=message.id, status="pending", attempt_count=0, max_attempts=5, created_by=current_user.id, queued_at=now, ) self.db.add(job) await self.db.flush() # ── Enqueue job arq ─────────────────────────────────────────────────── try: arq_pool = await _get_arq_pool() await arq_pool.enqueue_job("send_pec", str(job.id)) except Exception as e: from app.core.logging import get_logger logger = get_logger(__name__) logger.warning( f"[send_service] Impossibile enqueue send_pec job {job.id}: {e}. " "Il job resterà in stato 'pending' per pickup manuale." ) return job # ── Upload allegati ──────────────────────────────────────────────────────── async def _upload_attachments( self, message: Message, attachments: list[dict], tenant_id: uuid.UUID, mailbox_id: uuid.UUID, ) -> None: """ Carica ogni allegato su MinIO e crea il record Attachment nel DB. Gestisce gli errori di upload singolarmente: un allegato che non riesce a caricarsi non blocca gli altri. """ from app.core.logging import get_logger from app.storage.minio_client import upload_attachment as minio_upload logger = get_logger(__name__) for att in attachments: filename: str = att.get("filename") or "allegato" content: bytes = att.get("content") or b"" content_type: str = att.get("content_type") or "application/octet-stream" if not content: continue try: storage_path = await minio_upload( tenant_id=str(tenant_id), mailbox_id=str(mailbox_id), message_id=str(message.id), filename=filename, content=content, content_type=content_type, ) checksum = hashlib.sha256(content).hexdigest() attachment_record = Attachment( tenant_id=tenant_id, message_id=message.id, filename=filename, content_type=content_type, size_bytes=len(content), storage_path=storage_path, checksum_sha256=checksum, ) self.db.add(attachment_record) logger.debug( f"[send_service] Allegato caricato: {storage_path} " f"({len(content)} bytes)" ) except Exception as upload_err: logger.error( f"[send_service] Errore upload allegato '{filename}': {upload_err}" ) # Non sollevare: l'invio continuerà senza questo allegato await self.db.flush() # ── Lista job di invio ──────────────────────────────────────────────────── async def list_send_jobs( self, tenant_id: uuid.UUID, page: int = 1, page_size: int = 50, mailbox_id: uuid.UUID | None = None, status_filter: str | None = None, ) -> tuple[list[SendJob], int]: """Lista i job di invio del tenant con filtri opzionali.""" base_q = select(SendJob).where(SendJob.tenant_id == tenant_id) if mailbox_id: base_q = base_q.where(SendJob.mailbox_id == mailbox_id) if status_filter: base_q = base_q.where(SendJob.status == status_filter) count_q = select(func.count()).select_from(base_q.subquery()) total = (await self.db.execute(count_q)).scalar_one() items_q = ( base_q.order_by(SendJob.queued_at.desc()) .offset((page - 1) * page_size) .limit(page_size) ) result = await self.db.execute(items_q) items = list(result.scalars().all()) return items, total # ── Get singolo job ─────────────────────────────────────────────────────── async def get_send_job( self, job_id: uuid.UUID, tenant_id: uuid.UUID, ) -> SendJob: """Carica un singolo SendJob verificando l'appartenenza al tenant.""" job = await self.db.get(SendJob, job_id) if not job or job.tenant_id != tenant_id: raise NotFoundError("job di invio") return job # ── Annulla job (solo se pending) ───────────────────────────────────────── async def cancel_send_job( self, job_id: uuid.UUID, tenant_id: uuid.UUID, ) -> SendJob: """Annulla un job di invio se è ancora in stato 'pending' o 'retrying'.""" job = await self.get_send_job(job_id, tenant_id) if job.status not in ("pending", "retrying"): raise ForbiddenError( f"Impossibile annullare: il job è in stato '{job.status}'" ) job.status = "failed" job.last_error = "Annullato dall'utente" if job.message_id: msg = await self.db.get(Message, job.message_id) if msg and msg.state in ("queued", "draft"): msg.state = "failed" await self.db.flush() return job