mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
530 lines
19 KiB
Python
530 lines
19 KiB
Python
"""
|
||
Router messaggi PEC.
|
||
|
||
Fornisce:
|
||
- GET /messages – lista messaggi con filtri (inbox/sent/search/...)
|
||
- GET /messages/{id} – singolo messaggio
|
||
- PATCH /messages/{id} – aggiorna flags (is_read, is_starred, is_archived)
|
||
- GET /messages/{id}/attachments – lista allegati
|
||
- GET /messages/{id}/attachments/{att_id}/download – scarica allegato da MinIO
|
||
- GET /messages/{id}/receipts – ricevute (messaggi figlio)
|
||
|
||
Permessi:
|
||
- Admin: accede a tutti i messaggi del proprio tenant.
|
||
- Operator/Supervisor/Readonly: solo i messaggi delle caselle su cui
|
||
hanno almeno il permesso can_read.
|
||
"""
|
||
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, Query, status
|
||
from fastapi.responses import StreamingResponse
|
||
from sqlalchemy import func, or_, select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from app.config import get_settings
|
||
from app.core.exceptions import ForbiddenError, NotFoundError
|
||
from app.database import get_db
|
||
from app.dependencies import CurrentUser, DB
|
||
from app.models.label import Label
|
||
from app.models.message import Attachment, Message
|
||
from app.schemas.message import (
|
||
AttachmentResponse,
|
||
MessageBulkUpdateRequest,
|
||
MessageBulkUpdateResponse,
|
||
MessageListResponse,
|
||
MessageResponse,
|
||
MessageUpdateRequest,
|
||
)
|
||
|
||
router = APIRouter(prefix="/messages", tags=["Messages"])
|
||
settings = get_settings()
|
||
|
||
|
||
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
||
|
||
def _apply_vbox_rule(q, field: str, operator: str, value: str):
|
||
"""
|
||
Applica una singola regola di Virtual Box alla query SQLAlchemy.
|
||
|
||
field : subject | from_address | to_address | imap_folder
|
||
operator : contains | equals | starts_with | ends_with | regex
|
||
"""
|
||
if field == "subject":
|
||
col = Message.subject
|
||
elif field == "from_address":
|
||
col = Message.from_address
|
||
elif field == "to_address":
|
||
# to_addresses è ARRAY(Text) – converte in stringa per il confronto
|
||
arr_text = func.array_to_string(Message.to_addresses, ",")
|
||
if operator == "contains":
|
||
return q.where(arr_text.ilike(f"%{value}%"))
|
||
elif operator == "equals":
|
||
return q.where(arr_text.ilike(value))
|
||
elif operator == "starts_with":
|
||
return q.where(arr_text.ilike(f"{value}%"))
|
||
elif operator == "ends_with":
|
||
return q.where(arr_text.ilike(f"%{value}"))
|
||
elif operator == "regex":
|
||
return q.where(arr_text.op("~*")(value))
|
||
return q
|
||
elif field == "imap_folder":
|
||
col = Message.imap_folder
|
||
else:
|
||
return q # campo non supportato – ignorato
|
||
|
||
if operator == "contains":
|
||
return q.where(col.ilike(f"%{value}%"))
|
||
elif operator == "equals":
|
||
return q.where(func.lower(col) == value.lower())
|
||
elif operator == "starts_with":
|
||
return q.where(col.ilike(f"{value}%"))
|
||
elif operator == "ends_with":
|
||
return q.where(col.ilike(f"%{value}"))
|
||
elif operator == "regex":
|
||
return q.where(col.op("~*")(value))
|
||
return q
|
||
|
||
|
||
async def _get_visible_mailbox_ids(
|
||
user, db: AsyncSession
|
||
) -> Optional[list[uuid.UUID]]:
|
||
"""
|
||
Per utenti non-admin restituisce la lista di mailbox_id accessibili.
|
||
Restituisce None se l'utente è admin (accesso illimitato al tenant).
|
||
"""
|
||
if user.is_admin:
|
||
return None # nessun filtro per admin
|
||
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
return await perm_svc.get_visible_mailboxes(user)
|
||
|
||
|
||
async def _resolve_message(
|
||
message_id: uuid.UUID,
|
||
current_user,
|
||
db: AsyncSession,
|
||
) -> Message:
|
||
"""Carica il messaggio e verifica i permessi di accesso.
|
||
|
||
L'accesso è consentito se:
|
||
1. L'utente è admin del tenant, oppure
|
||
2. L'utente ha un permesso diretto can_read sulla casella, oppure
|
||
3. L'utente è assegnato a una Virtual Box attiva che include la casella.
|
||
"""
|
||
result = await db.execute(
|
||
select(Message)
|
||
.where(
|
||
Message.id == message_id,
|
||
Message.tenant_id == current_user.tenant_id,
|
||
)
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
message = result.scalar_one_or_none()
|
||
if not message:
|
||
raise NotFoundError(f"Messaggio {message_id} non trovato")
|
||
|
||
if not current_user.is_admin:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
has_direct_access = await perm_svc.check_can_read(current_user, message.mailbox_id)
|
||
|
||
if not has_direct_access:
|
||
# Verifica accesso tramite Virtual Box:
|
||
# l'utente deve essere assegnato a una VBox attiva
|
||
# che include la casella del messaggio.
|
||
from app.models.virtual_box import (
|
||
VirtualBox,
|
||
VirtualBoxAssignment,
|
||
virtual_box_mailboxes,
|
||
)
|
||
vbox_result = await db.execute(
|
||
select(VirtualBox.id)
|
||
.join(
|
||
VirtualBoxAssignment,
|
||
VirtualBox.id == VirtualBoxAssignment.virtual_box_id,
|
||
)
|
||
.join(
|
||
virtual_box_mailboxes,
|
||
VirtualBox.id == virtual_box_mailboxes.c.virtual_box_id,
|
||
)
|
||
.where(
|
||
VirtualBoxAssignment.user_id == current_user.id,
|
||
virtual_box_mailboxes.c.mailbox_id == message.mailbox_id,
|
||
VirtualBox.tenant_id == current_user.tenant_id,
|
||
VirtualBox.is_active == True,
|
||
)
|
||
.limit(1)
|
||
)
|
||
has_vbox_access = vbox_result.scalar_one_or_none() is not None
|
||
|
||
if not has_vbox_access:
|
||
raise ForbiddenError("Accesso al messaggio non autorizzato")
|
||
|
||
return message
|
||
|
||
|
||
# ─── Endpoints ───────────────────────────────────────────────────────────────
|
||
|
||
@router.get("", response_model=MessageListResponse)
|
||
async def list_messages(
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
# Filtri
|
||
vbox_id: Optional[uuid.UUID] = Query(None, description="Filtra per Virtual Box assegnata"),
|
||
mailbox_id: Optional[uuid.UUID] = Query(None),
|
||
direction: Optional[str] = Query(None, pattern="^(inbound|outbound)$"),
|
||
state: Optional[str] = Query(None),
|
||
is_read: Optional[bool] = Query(None),
|
||
is_starred: Optional[bool] = Query(None),
|
||
is_archived: Optional[bool] = Query(False),
|
||
search: Optional[str] = Query(None, max_length=200),
|
||
pec_type: Optional[str] = Query(None),
|
||
# Paginazione
|
||
page: int = Query(1, ge=1),
|
||
page_size: int = Query(50, ge=1, le=200),
|
||
) -> MessageListResponse:
|
||
"""
|
||
Elenca i messaggi PEC con filtri opzionali.
|
||
|
||
- `is_archived=False` (default) esclude i messaggi archiviati.
|
||
- `search` cerca su subject, from_address, to_addresses.
|
||
- `vbox_id` filtra per Virtual Box assegnata all'utente corrente.
|
||
"""
|
||
# Determinare le caselle visibili (normale check permessi)
|
||
visible_mailbox_ids = await _get_visible_mailbox_ids(current_user, db)
|
||
|
||
# ── Filtro Virtual Box ────────────────────────────────────────────────────
|
||
vbox_rules: list = []
|
||
if vbox_id is not None:
|
||
from app.models.virtual_box import VirtualBox, VirtualBoxAssignment
|
||
|
||
vbox_result = await db.execute(
|
||
select(VirtualBox)
|
||
.where(
|
||
VirtualBox.id == vbox_id,
|
||
VirtualBox.tenant_id == current_user.tenant_id,
|
||
VirtualBox.is_active == True,
|
||
)
|
||
.options(
|
||
selectinload(VirtualBox.rules),
|
||
selectinload(VirtualBox.mailboxes),
|
||
)
|
||
)
|
||
vbox = vbox_result.scalar_one_or_none()
|
||
if not vbox:
|
||
raise NotFoundError("Virtual Box")
|
||
|
||
# Non-admin: verifica che l'utente sia assegnato alla VBox
|
||
if not current_user.is_admin:
|
||
assign_result = await db.execute(
|
||
select(VirtualBoxAssignment).where(
|
||
VirtualBoxAssignment.virtual_box_id == vbox_id,
|
||
VirtualBoxAssignment.user_id == current_user.id,
|
||
)
|
||
)
|
||
if not assign_result.scalar_one_or_none():
|
||
raise ForbiddenError("Virtual Box non accessibile")
|
||
|
||
# L'assegnazione alla VBox garantisce accesso alle sue caselle:
|
||
# sovrascrive il filtro permessi normali per questa query.
|
||
if vbox.mailboxes:
|
||
visible_mailbox_ids = [m.id for m in vbox.mailboxes]
|
||
# Se la VBox non ha caselle esplicitamente associate,
|
||
# si mantiene il filtro permessi normale (visible_mailbox_ids invariato).
|
||
|
||
vbox_rules = vbox.rules or []
|
||
# ─────────────────────────────────────────────────────────────────────────
|
||
|
||
# Query base
|
||
q = select(Message).where(
|
||
Message.tenant_id == current_user.tenant_id,
|
||
Message.parent_message_id.is_(None), # escludi ricevute (messaggi figlio)
|
||
)
|
||
|
||
# Filtro caselle visibili per non-admin (o dopo override VBox)
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
# Nessuna casella accessibile → lista vuota
|
||
return MessageListResponse(items=[], total=0, page=page, page_size=page_size)
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
|
||
# Filtri opzionali
|
||
if mailbox_id is not None:
|
||
# Verifica che l'utente abbia accesso a questa casella specifica
|
||
if visible_mailbox_ids is not None and mailbox_id not in visible_mailbox_ids:
|
||
raise ForbiddenError("Accesso alla casella non autorizzato")
|
||
q = q.where(Message.mailbox_id == mailbox_id)
|
||
|
||
if direction is not None:
|
||
q = q.where(Message.direction == direction)
|
||
|
||
if state is not None:
|
||
q = q.where(Message.state == state)
|
||
|
||
if pec_type is not None:
|
||
q = q.where(Message.pec_type == pec_type)
|
||
|
||
if is_read is not None:
|
||
q = q.where(Message.is_read == is_read)
|
||
|
||
if is_starred is not None:
|
||
q = q.where(Message.is_starred == is_starred)
|
||
|
||
if is_archived is not None:
|
||
q = q.where(Message.is_archived == is_archived)
|
||
|
||
if search:
|
||
term = f"%{search}%"
|
||
q = q.where(
|
||
or_(
|
||
Message.subject.ilike(term),
|
||
Message.from_address.ilike(term),
|
||
Message.body_text.ilike(term),
|
||
)
|
||
)
|
||
|
||
# Applica le regole della Virtual Box (AND tra le regole)
|
||
for rule in vbox_rules:
|
||
q = _apply_vbox_rule(q, rule.field, rule.operator, rule.value)
|
||
|
||
# Conteggio totale
|
||
count_q = select(func.count()).select_from(q.subquery())
|
||
total = (await db.execute(count_q)).scalar_one()
|
||
|
||
# Ordinamento e paginazione
|
||
q = (
|
||
q.options(selectinload(Message.labels))
|
||
.order_by(
|
||
Message.received_at.desc().nullslast(),
|
||
Message.created_at.desc(),
|
||
)
|
||
.offset((page - 1) * page_size)
|
||
.limit(page_size)
|
||
)
|
||
|
||
result = await db.execute(q)
|
||
items = list(result.scalars().all())
|
||
|
||
return MessageListResponse(
|
||
items=[MessageResponse.model_validate(m) for m in items],
|
||
total=total,
|
||
page=page,
|
||
page_size=page_size,
|
||
)
|
||
|
||
|
||
@router.patch("/bulk", response_model=MessageBulkUpdateResponse)
|
||
async def bulk_update_messages(
|
||
data: MessageBulkUpdateRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageBulkUpdateResponse:
|
||
"""
|
||
Aggiorna in blocco i flag operativi (is_starred, is_archived) di più messaggi.
|
||
|
||
Restituisce il numero di messaggi aggiornati e la lista aggiornata.
|
||
I messaggi non trovati o non accessibili vengono silenziosamente ignorati.
|
||
"""
|
||
if not data.ids:
|
||
return MessageBulkUpdateResponse(updated=0, items=[])
|
||
|
||
# Carica tutti i messaggi del tenant
|
||
result = await db.execute(
|
||
select(Message).where(
|
||
Message.id.in_(data.ids),
|
||
Message.tenant_id == current_user.tenant_id,
|
||
)
|
||
)
|
||
messages = list(result.scalars().all())
|
||
|
||
# Filtra per permessi se non admin
|
||
if not current_user.is_admin:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
visible = await perm_svc.get_visible_mailboxes(current_user)
|
||
visible_set = set(visible) if visible else set()
|
||
messages = [m for m in messages if m.mailbox_id in visible_set]
|
||
|
||
now = datetime.now(timezone.utc)
|
||
for message in messages:
|
||
if data.is_starred is not None:
|
||
message.is_starred = data.is_starred
|
||
if data.is_archived is not None:
|
||
message.is_archived = data.is_archived
|
||
if data.is_archived and not message.archived_at:
|
||
message.archived_at = now
|
||
elif not data.is_archived:
|
||
message.archived_at = None
|
||
|
||
await db.commit()
|
||
|
||
# Ricarica i messaggi aggiornati con selectinload per evitare MissingGreenlet sui labels
|
||
if messages:
|
||
updated_ids = [m.id for m in messages]
|
||
refreshed_result = await db.execute(
|
||
select(Message)
|
||
.where(Message.id.in_(updated_ids))
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
messages = list(refreshed_result.scalars().all())
|
||
|
||
return MessageBulkUpdateResponse(
|
||
updated=len(messages),
|
||
items=[MessageResponse.model_validate(m) for m in messages],
|
||
)
|
||
|
||
|
||
@router.get("/{message_id}", response_model=MessageResponse)
|
||
async def get_message(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageResponse:
|
||
"""Carica un messaggio per ID."""
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
return MessageResponse.model_validate(message)
|
||
|
||
|
||
@router.patch("/{message_id}", response_model=MessageResponse)
|
||
async def update_message(
|
||
message_id: uuid.UUID,
|
||
data: MessageUpdateRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageResponse:
|
||
"""
|
||
Aggiorna i flag operativi di un messaggio:
|
||
is_read, is_starred, is_archived.
|
||
"""
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
if data.is_read is not None:
|
||
message.is_read = data.is_read
|
||
if data.is_starred is not None:
|
||
message.is_starred = data.is_starred
|
||
if data.is_archived is not None:
|
||
message.is_archived = data.is_archived
|
||
if data.is_archived and not message.archived_at:
|
||
message.archived_at = datetime.now(timezone.utc)
|
||
elif not data.is_archived:
|
||
message.archived_at = None
|
||
|
||
await db.commit()
|
||
# Re-query con selectinload per evitare MissingGreenlet sui labels
|
||
refreshed = await db.execute(
|
||
select(Message)
|
||
.where(Message.id == message_id)
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
message = refreshed.scalar_one()
|
||
return MessageResponse.model_validate(message)
|
||
|
||
|
||
@router.get("/{message_id}/attachments", response_model=list[AttachmentResponse])
|
||
async def list_attachments(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> list[AttachmentResponse]:
|
||
"""Elenca gli allegati di un messaggio."""
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Attachment)
|
||
.where(Attachment.message_id == message.id)
|
||
.order_by(Attachment.created_at)
|
||
)
|
||
attachments = list(result.scalars().all())
|
||
return [AttachmentResponse.model_validate(a) for a in attachments]
|
||
|
||
|
||
@router.get("/{message_id}/attachments/{attachment_id}/download")
|
||
async def download_attachment(
|
||
message_id: uuid.UUID,
|
||
attachment_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> StreamingResponse:
|
||
"""
|
||
Scarica un allegato direttamente da MinIO.
|
||
Il file viene streamato al client con i header Content-Disposition corretti.
|
||
"""
|
||
# Verifica accesso al messaggio
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
# Carica allegato
|
||
result = await db.execute(
|
||
select(Attachment).where(
|
||
Attachment.id == attachment_id,
|
||
Attachment.message_id == message_id,
|
||
)
|
||
)
|
||
attachment = result.scalar_one_or_none()
|
||
if not attachment:
|
||
raise NotFoundError(f"Allegato {attachment_id} non trovato")
|
||
|
||
# Stream da MinIO
|
||
try:
|
||
from miniopy_async import Minio
|
||
|
||
client = Minio(
|
||
endpoint=settings.minio_endpoint,
|
||
access_key=settings.minio_access_key,
|
||
secret_key=settings.minio_secret_key,
|
||
secure=settings.minio_use_ssl,
|
||
)
|
||
|
||
# storage_path è del tipo "tenant_id/attachments/filename"
|
||
storage_path = attachment.storage_path
|
||
response = await client.get_object(settings.minio_bucket, storage_path)
|
||
|
||
content_type = attachment.content_type or "application/octet-stream"
|
||
filename = attachment.filename.replace('"', "'")
|
||
|
||
async def _stream():
|
||
async for chunk in response.content.iter_chunked(65536):
|
||
yield chunk
|
||
response.close()
|
||
|
||
return StreamingResponse(
|
||
_stream(),
|
||
media_type=content_type,
|
||
headers={
|
||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||
"Content-Length": str(attachment.size_bytes or ""),
|
||
},
|
||
)
|
||
except Exception as e:
|
||
from app.core.logging import get_logger
|
||
logger = get_logger(__name__)
|
||
logger.error(f"Errore download allegato {attachment_id}: {e}")
|
||
raise NotFoundError("File non disponibile al momento")
|
||
|
||
|
||
@router.get("/{message_id}/receipts", response_model=list[MessageResponse])
|
||
async def list_receipts(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> list[MessageResponse]:
|
||
"""
|
||
Elenca le ricevute associate a un messaggio outbound
|
||
(messaggi con parent_message_id = message_id).
|
||
"""
|
||
# Verifica accesso al messaggio padre
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Message)
|
||
.where(Message.parent_message_id == message_id)
|
||
.options(selectinload(Message.labels))
|
||
.order_by(Message.received_at.asc().nullslast(), Message.created_at.asc())
|
||
)
|
||
receipts = list(result.scalars().all())
|
||
return [MessageResponse.model_validate(r) for r in receipts]
|