Files
2026-03-19 14:28:09 +01:00

90 lines
2.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Client MinIO per il backend upload allegati outbound.
Percorso allegati outbound:
tenants/{tenant_id}/mailboxes/{mailbox_id}/attachments/{message_id}/{filename}
"""
import io
import logging
import re
from functools import lru_cache
from miniopy_async import Minio
from app.config import get_settings
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def get_minio_client() -> Minio:
"""Restituisce l'istanza singleton del client MinIO."""
settings = get_settings()
return Minio(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
)
async def upload_attachment(
tenant_id: str,
mailbox_id: str,
message_id: str,
filename: str,
content: bytes,
content_type: str = "application/octet-stream",
) -> str:
"""
Carica un allegato outbound su MinIO.
Args:
tenant_id: UUID del tenant (stringa)
mailbox_id: UUID della casella mittente
message_id: UUID del messaggio associato
filename: Nome file originale
content: Byte del file
content_type: MIME type
Returns:
Percorso oggetto su MinIO (senza nome bucket)
"""
settings = get_settings()
client = get_minio_client()
bucket = settings.minio_bucket
safe_filename = _sanitize_filename(filename)
object_path = (
f"tenants/{tenant_id}/mailboxes/{mailbox_id}"
f"/attachments/{message_id}/{safe_filename}"
)
data_stream = io.BytesIO(content)
await client.put_object(
bucket_name=bucket,
object_name=object_path,
data=data_stream,
length=len(content),
content_type=content_type,
)
logger.debug(
f"Allegato outbound caricato: {object_path} "
f"({len(content)} bytes, {content_type})"
)
return object_path
def _sanitize_filename(filename: str) -> str:
"""Sanitizza il nome file per uso sicuro come path MinIO."""
safe = filename.replace("/", "_").replace("\\", "_").replace("\x00", "")
safe = re.sub(r"[^\w.\-() ]", "_", safe, flags=re.UNICODE)
if len(safe) > 200:
parts = safe.rsplit(".", 1)
if len(parts) == 2:
safe = parts[0][:196] + "." + parts[1]
else:
safe = safe[:200]
return safe or "attachment"