mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
1208 lines
44 KiB
Python
1208 lines
44 KiB
Python
"""
|
||
Router messaggi PEC.
|
||
|
||
Fornisce:
|
||
- GET /messages - lista messaggi con filtri (inbox/sent/search/...)
|
||
- GET /messages/{id} - singolo messaggio
|
||
- PATCH /messages/{id} - aggiorna flags (is_read, is_starred, is_archived, is_trashed,
|
||
is_pending_conservation, is_conserved)
|
||
- PATCH /messages/bulk - aggiorna in blocco piu messaggi
|
||
- GET /messages/{id}/attachments - lista allegati
|
||
- GET /messages/{id}/attachments/{att_id}/download - scarica allegato da MinIO
|
||
- GET /messages/{id}/receipts - ricevute (messaggi figlio)
|
||
|
||
Permessi:
|
||
- Admin: accede a tutti i messaggi del proprio tenant.
|
||
- Operator/Supervisor/Readonly: solo i messaggi delle caselle su cui
|
||
hanno almeno il permesso can_read.
|
||
- is_pending_conservation / is_conserved: richiedono can_conserve
|
||
(implicito per admin, esplicito per supervisor/operator).
|
||
"""
|
||
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, Query, Request, status
|
||
from fastapi.responses import StreamingResponse
|
||
from sqlalchemy import func, or_, select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy.orm import selectinload
|
||
|
||
from app.services.audit_service import get_real_ip
|
||
from app.services.search_service import SearchService
|
||
|
||
from app.config import get_settings
|
||
from app.core.exceptions import ForbiddenError, NotFoundError
|
||
from app.database import get_db
|
||
from app.dependencies import CurrentUser, DB
|
||
from app.models.label import Label
|
||
from app.models.message import Attachment, Message
|
||
from app.schemas.message import (
|
||
AttachmentMatchInfo,
|
||
AttachmentResponse,
|
||
MessageBulkUpdateRequest,
|
||
MessageBulkUpdateResponse,
|
||
MessageListResponse,
|
||
MessageResponse,
|
||
MessageUpdateRequest,
|
||
SearchMatchInfo,
|
||
)
|
||
|
||
router = APIRouter(prefix="/messages", tags=["Messages"])
|
||
settings = get_settings()
|
||
|
||
|
||
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
||
|
||
def _apply_vbox_rule(q, field: str, operator: str, value: str):
|
||
"""
|
||
Applica una singola regola di Virtual Box alla query SQLAlchemy.
|
||
|
||
field : subject | from_address | to_address | imap_folder
|
||
operator : contains | equals | starts_with | ends_with | regex
|
||
"""
|
||
if field == "subject":
|
||
col = Message.subject
|
||
elif field == "from_address":
|
||
col = Message.from_address
|
||
elif field == "to_address":
|
||
arr_text = func.array_to_string(Message.to_addresses, ",")
|
||
if operator == "contains":
|
||
return q.where(arr_text.ilike(f"%{value}%"))
|
||
elif operator == "equals":
|
||
return q.where(arr_text.ilike(value))
|
||
elif operator == "starts_with":
|
||
return q.where(arr_text.ilike(f"{value}%"))
|
||
elif operator == "ends_with":
|
||
return q.where(arr_text.ilike(f"%{value}"))
|
||
elif operator == "regex":
|
||
return q.where(arr_text.op("~*")(value))
|
||
return q
|
||
elif field == "imap_folder":
|
||
col = Message.imap_folder
|
||
else:
|
||
return q
|
||
|
||
if operator == "contains":
|
||
return q.where(col.ilike(f"%{value}%"))
|
||
elif operator == "equals":
|
||
return q.where(func.lower(col) == value.lower())
|
||
elif operator == "starts_with":
|
||
return q.where(col.ilike(f"{value}%"))
|
||
elif operator == "ends_with":
|
||
return q.where(col.ilike(f"%{value}"))
|
||
elif operator == "regex":
|
||
return q.where(col.op("~*")(value))
|
||
return q
|
||
|
||
|
||
async def _get_visible_mailbox_ids(
|
||
user, db: AsyncSession
|
||
) -> Optional[list[uuid.UUID]]:
|
||
"""
|
||
Per utenti non-admin/supervisor restituisce la lista di mailbox_id accessibili.
|
||
Restituisce None se l'utente e' admin o supervisor (accesso illimitato al tenant).
|
||
"""
|
||
if user.is_supervisor_or_admin:
|
||
return None
|
||
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
return await perm_svc.get_visible_mailboxes(user)
|
||
|
||
|
||
async def _resolve_message(
|
||
message_id: uuid.UUID,
|
||
current_user,
|
||
db: AsyncSession,
|
||
) -> Message:
|
||
"""Carica il messaggio e verifica i permessi di accesso."""
|
||
result = await db.execute(
|
||
select(Message)
|
||
.where(
|
||
Message.id == message_id,
|
||
Message.tenant_id == current_user.tenant_id,
|
||
)
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
message = result.scalar_one_or_none()
|
||
if not message:
|
||
raise NotFoundError(f"Messaggio {message_id} non trovato")
|
||
|
||
if not current_user.is_admin:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
has_direct_access = await perm_svc.check_can_read(current_user, message.mailbox_id)
|
||
|
||
if not has_direct_access:
|
||
from app.models.virtual_box import (
|
||
VirtualBox,
|
||
VirtualBoxAssignment,
|
||
virtual_box_mailboxes,
|
||
)
|
||
vbox_result = await db.execute(
|
||
select(VirtualBox.id)
|
||
.join(
|
||
VirtualBoxAssignment,
|
||
VirtualBox.id == VirtualBoxAssignment.virtual_box_id,
|
||
)
|
||
.join(
|
||
virtual_box_mailboxes,
|
||
VirtualBox.id == virtual_box_mailboxes.c.virtual_box_id,
|
||
)
|
||
.where(
|
||
VirtualBoxAssignment.user_id == current_user.id,
|
||
virtual_box_mailboxes.c.mailbox_id == message.mailbox_id,
|
||
VirtualBox.tenant_id == current_user.tenant_id,
|
||
VirtualBox.is_active == True,
|
||
)
|
||
.limit(1)
|
||
)
|
||
has_vbox_access = vbox_result.scalar_one_or_none() is not None
|
||
|
||
if not has_vbox_access:
|
||
raise ForbiddenError("Accesso al messaggio non autorizzato")
|
||
|
||
return message
|
||
|
||
|
||
# ─── Endpoints ───────────────────────────────────────────────────────────────
|
||
|
||
@router.get("", response_model=MessageListResponse)
|
||
async def list_messages(
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
# Filtri
|
||
vbox_id: Optional[uuid.UUID] = Query(None, description="Filtra per Virtual Box assegnata"),
|
||
mailbox_id: Optional[uuid.UUID] = Query(None),
|
||
direction: Optional[str] = Query(None, pattern="^(inbound|outbound)$"),
|
||
state: Optional[str] = Query(None),
|
||
is_read: Optional[bool] = Query(None),
|
||
is_starred: Optional[bool] = Query(None),
|
||
is_archived: Optional[bool] = Query(False),
|
||
is_trashed: Optional[bool] = Query(False),
|
||
is_pending_conservation: Optional[bool] = Query(None, description="Filtra per messaggi in attesa di conservazione"),
|
||
is_conserved: Optional[bool] = Query(None, description="Filtra per messaggi gia' conservati"),
|
||
search: Optional[str] = Query(None, max_length=500),
|
||
pec_type: Optional[str] = Query(None),
|
||
date_from: Optional[datetime] = Query(None, description="Data minima (received_at o sent_at)"),
|
||
date_to: Optional[datetime] = Query(None, description="Data massima (received_at o sent_at)"),
|
||
page: int = Query(1, ge=1),
|
||
page_size: int = Query(50, ge=1, le=200),
|
||
) -> MessageListResponse:
|
||
"""
|
||
Elenca i messaggi PEC con filtri opzionali.
|
||
|
||
- `is_archived=False` (default) esclude i messaggi archiviati.
|
||
- `is_trashed=False` (default) esclude i messaggi nel cestino.
|
||
- `is_pending_conservation` filtra messaggi da conservare (cartella Da Conservare).
|
||
- `is_conserved` filtra messaggi gia' conservati (cartella Storico).
|
||
- `search` usa ricerca full-text (tsvector) con fallback ILIKE.
|
||
"""
|
||
visible_mailbox_ids = await _get_visible_mailbox_ids(current_user, db)
|
||
|
||
# ── Filtro Virtual Box ────────────────────────────────────────────────────
|
||
vbox_rules: list = []
|
||
if vbox_id is not None:
|
||
from app.models.virtual_box import VirtualBox, VirtualBoxAssignment
|
||
|
||
vbox_result = await db.execute(
|
||
select(VirtualBox)
|
||
.where(
|
||
VirtualBox.id == vbox_id,
|
||
VirtualBox.tenant_id == current_user.tenant_id,
|
||
VirtualBox.is_active == True,
|
||
)
|
||
.options(
|
||
selectinload(VirtualBox.rules),
|
||
selectinload(VirtualBox.mailboxes),
|
||
)
|
||
)
|
||
vbox = vbox_result.scalar_one_or_none()
|
||
if not vbox:
|
||
raise NotFoundError("Virtual Box")
|
||
|
||
if not current_user.is_admin:
|
||
assign_result = await db.execute(
|
||
select(VirtualBoxAssignment).where(
|
||
VirtualBoxAssignment.virtual_box_id == vbox_id,
|
||
VirtualBoxAssignment.user_id == current_user.id,
|
||
)
|
||
)
|
||
if not assign_result.scalar_one_or_none():
|
||
raise ForbiddenError("Virtual Box non accessibile")
|
||
|
||
if vbox.mailboxes:
|
||
visible_mailbox_ids = [m.id for m in vbox.mailboxes]
|
||
|
||
vbox_rules = vbox.rules or []
|
||
|
||
# Query base
|
||
q = select(Message).where(
|
||
Message.tenant_id == current_user.tenant_id,
|
||
Message.parent_message_id.is_(None),
|
||
)
|
||
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
return MessageListResponse(items=[], total=0, page=page, page_size=page_size)
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
|
||
if mailbox_id is not None:
|
||
if visible_mailbox_ids is not None and mailbox_id not in visible_mailbox_ids:
|
||
raise ForbiddenError("Accesso alla casella non autorizzato")
|
||
q = q.where(Message.mailbox_id == mailbox_id)
|
||
|
||
if direction is not None:
|
||
q = q.where(Message.direction == direction)
|
||
|
||
if state is not None:
|
||
q = q.where(Message.state == state)
|
||
|
||
if pec_type is not None:
|
||
q = q.where(Message.pec_type == pec_type)
|
||
|
||
if is_read is not None:
|
||
q = q.where(Message.is_read == is_read)
|
||
|
||
if is_starred is not None:
|
||
q = q.where(Message.is_starred == is_starred)
|
||
|
||
if is_archived is not None:
|
||
q = q.where(Message.is_archived == is_archived)
|
||
|
||
if is_trashed is not None:
|
||
q = q.where(Message.is_trashed == is_trashed)
|
||
|
||
# ── Filtri Conservazione ──────────────────────────────────────────────────
|
||
if is_pending_conservation is not None:
|
||
q = q.where(Message.is_pending_conservation == is_pending_conservation)
|
||
|
||
if is_conserved is not None:
|
||
q = q.where(Message.is_conserved == is_conserved)
|
||
|
||
# ── Full-text search ──────────────────────────────────────────────────────
|
||
if search:
|
||
from sqlalchemy import case as sa_case
|
||
|
||
tsquery = func.websearch_to_tsquery("italian", search)
|
||
term_like = f"%{search}%"
|
||
q = q.where(
|
||
or_(
|
||
Message.search_vector.op("@@")(tsquery),
|
||
Message.search_vector.is_(None) & or_(
|
||
Message.subject.ilike(term_like),
|
||
Message.from_address.ilike(term_like),
|
||
Message.body_text.ilike(term_like),
|
||
),
|
||
)
|
||
)
|
||
|
||
# ── Filtri data ───────────────────────────────────────────────────────────
|
||
if date_from:
|
||
q = q.where(or_(Message.received_at >= date_from, Message.sent_at >= date_from))
|
||
if date_to:
|
||
q = q.where(or_(Message.received_at <= date_to, Message.sent_at <= date_to))
|
||
|
||
for rule in vbox_rules:
|
||
q = _apply_vbox_rule(q, rule.field, rule.operator, rule.value)
|
||
|
||
count_q = select(func.count()).select_from(q.subquery())
|
||
total = (await db.execute(count_q)).scalar_one()
|
||
|
||
if search:
|
||
from sqlalchemy import case as sa_case
|
||
|
||
tsquery_ord = func.websearch_to_tsquery("italian", search)
|
||
rank_expr = sa_case(
|
||
(Message.search_vector.isnot(None), func.ts_rank(Message.search_vector, tsquery_ord)),
|
||
else_=0.0,
|
||
)
|
||
order_clauses = [rank_expr.desc(), Message.received_at.desc().nullslast(), Message.created_at.desc()]
|
||
else:
|
||
order_clauses = [Message.received_at.desc().nullslast(), Message.created_at.desc()]
|
||
|
||
q = (
|
||
q.options(selectinload(Message.labels))
|
||
.order_by(*order_clauses)
|
||
.offset((page - 1) * page_size)
|
||
.limit(page_size)
|
||
)
|
||
|
||
result = await db.execute(q)
|
||
items = list(result.scalars().all())
|
||
|
||
# ── Popola search_match per i risultati di ricerca ────────────────────────
|
||
if search and items:
|
||
term_lower = search.lower()
|
||
msg_ids = [m.id for m in items]
|
||
term_like = f"%{search}%"
|
||
|
||
# Query batch: allegati con extracted_text che matcha il termine
|
||
att_result = await db.execute(
|
||
select(Attachment.id, Attachment.message_id, Attachment.filename)
|
||
.where(
|
||
Attachment.message_id.in_(msg_ids),
|
||
Attachment.extracted_text.ilike(term_like),
|
||
)
|
||
)
|
||
# Mappa message_id → lista di AttachmentMatchInfo che matchano
|
||
att_matches: dict[uuid.UUID, list[AttachmentMatchInfo]] = {}
|
||
for row in att_result.fetchall():
|
||
att_id, msg_id, filename = row
|
||
att_matches.setdefault(msg_id, []).append(
|
||
AttachmentMatchInfo(id=att_id, filename=filename)
|
||
)
|
||
|
||
message_responses: list[MessageResponse] = []
|
||
for m in items:
|
||
resp = MessageResponse.model_validate(m)
|
||
in_subject = bool(m.subject and term_lower in m.subject.lower())
|
||
in_body = bool(m.body_text and term_lower in m.body_text.lower())
|
||
in_attachments = att_matches.get(m.id, [])
|
||
resp.search_match = SearchMatchInfo(
|
||
in_subject=in_subject,
|
||
in_body=in_body,
|
||
in_attachments=in_attachments,
|
||
)
|
||
message_responses.append(resp)
|
||
else:
|
||
message_responses = [MessageResponse.model_validate(m) for m in items]
|
||
|
||
return MessageListResponse(
|
||
items=message_responses,
|
||
total=total,
|
||
page=page,
|
||
page_size=page_size,
|
||
)
|
||
|
||
|
||
@router.patch("/bulk", response_model=MessageBulkUpdateResponse)
|
||
async def bulk_update_messages(
|
||
request: Request,
|
||
data: MessageBulkUpdateRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageBulkUpdateResponse:
|
||
"""
|
||
Aggiorna in blocco i flag operativi di piu messaggi.
|
||
|
||
Per is_pending_conservation=True o is_conserved=True richiede can_conserve.
|
||
"""
|
||
from app.services.audit_service import log_audit
|
||
|
||
if not data.ids:
|
||
return MessageBulkUpdateResponse(updated=0, items=[])
|
||
|
||
result = await db.execute(
|
||
select(Message).where(
|
||
Message.id.in_(data.ids),
|
||
Message.tenant_id == current_user.tenant_id,
|
||
)
|
||
)
|
||
messages = list(result.scalars().all())
|
||
|
||
# Filtra per permessi di lettura
|
||
if not current_user.is_admin:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
visible = await perm_svc.get_visible_mailboxes(current_user)
|
||
visible_set = set(visible) if visible else set()
|
||
messages = [m for m in messages if m.mailbox_id in visible_set]
|
||
|
||
# Se si tenta di modificare flag di conservazione, verifica can_conserve
|
||
conservation_change = (
|
||
data.is_pending_conservation is not None or data.is_conserved is not None
|
||
)
|
||
if conservation_change and messages:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
# Verifica per ogni casella unica coinvolta
|
||
mailbox_ids = {m.mailbox_id for m in messages}
|
||
for mb_id in mailbox_ids:
|
||
can = await perm_svc.check_can_conserve(current_user, mb_id)
|
||
if not can:
|
||
raise ForbiddenError(
|
||
"Permesso 'conservazione' non assegnato per questa casella"
|
||
)
|
||
|
||
now = datetime.now(timezone.utc)
|
||
for message in messages:
|
||
if data.is_read is not None:
|
||
message.is_read = data.is_read
|
||
if data.is_starred is not None:
|
||
message.is_starred = data.is_starred
|
||
if data.is_archived is not None:
|
||
message.is_archived = data.is_archived
|
||
if data.is_archived and not message.archived_at:
|
||
message.archived_at = now
|
||
elif not data.is_archived:
|
||
message.archived_at = None
|
||
if data.is_trashed is not None:
|
||
message.is_trashed = data.is_trashed
|
||
if data.is_trashed and not message.trashed_at:
|
||
message.trashed_at = now
|
||
elif not data.is_trashed:
|
||
message.trashed_at = None
|
||
if data.is_pending_conservation is not None:
|
||
message.is_pending_conservation = data.is_pending_conservation
|
||
if data.is_pending_conservation and not message.pending_conservation_at:
|
||
message.pending_conservation_at = now
|
||
elif not data.is_pending_conservation:
|
||
message.pending_conservation_at = None
|
||
if data.is_conserved is not None:
|
||
message.is_conserved = data.is_conserved
|
||
if data.is_conserved and not message.conserved_at:
|
||
message.conserved_at = now
|
||
elif not data.is_conserved:
|
||
message.conserved_at = None
|
||
|
||
# Registra evento audit bulk
|
||
if messages:
|
||
changes: dict = {}
|
||
for field in ("is_read", "is_starred", "is_archived", "is_trashed",
|
||
"is_pending_conservation", "is_conserved"):
|
||
v = getattr(data, field, None)
|
||
if v is not None:
|
||
changes[field] = v
|
||
await log_audit(
|
||
db,
|
||
"message.bulk_updated",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
resource_type="message",
|
||
ip_address=get_real_ip(request),
|
||
user_agent=request.headers.get("user-agent"),
|
||
payload={
|
||
"count": len(messages),
|
||
"message_ids": [str(m.id) for m in messages],
|
||
"changes": changes,
|
||
},
|
||
)
|
||
|
||
await db.commit()
|
||
|
||
if messages:
|
||
updated_ids = [m.id for m in messages]
|
||
refreshed_result = await db.execute(
|
||
select(Message)
|
||
.where(Message.id.in_(updated_ids))
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
messages = list(refreshed_result.scalars().all())
|
||
|
||
return MessageBulkUpdateResponse(
|
||
updated=len(messages),
|
||
items=[MessageResponse.model_validate(m) for m in messages],
|
||
)
|
||
|
||
|
||
@router.get("/{message_id}", response_model=MessageResponse)
|
||
async def get_message(
|
||
request: Request,
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageResponse:
|
||
"""Carica un messaggio per ID."""
|
||
from app.services.audit_service import log_audit
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
await log_audit(
|
||
db,
|
||
"message.opened",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
resource_type="message",
|
||
resource_id=message.id,
|
||
ip_address=get_real_ip(request),
|
||
user_agent=request.headers.get("user-agent"),
|
||
payload={
|
||
"subject": message.subject,
|
||
"from_address": message.from_address,
|
||
"direction": message.direction,
|
||
"mailbox_id": str(message.mailbox_id),
|
||
},
|
||
)
|
||
await db.commit()
|
||
return MessageResponse.model_validate(message)
|
||
|
||
|
||
@router.patch("/{message_id}", response_model=MessageResponse)
|
||
async def update_message(
|
||
request: Request,
|
||
message_id: uuid.UUID,
|
||
data: MessageUpdateRequest,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> MessageResponse:
|
||
"""
|
||
Aggiorna i flag operativi di un messaggio.
|
||
|
||
Per is_pending_conservation=True o is_conserved=True richiede can_conserve.
|
||
"""
|
||
from app.services.audit_service import log_audit
|
||
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
# Verifica permesso conservazione se necessario
|
||
if data.is_pending_conservation is not None or data.is_conserved is not None:
|
||
from app.services.permission_service import PermissionService
|
||
perm_svc = PermissionService(db)
|
||
await perm_svc.require_can_conserve(current_user, message.mailbox_id)
|
||
|
||
now = datetime.now(timezone.utc)
|
||
ip = get_real_ip(request)
|
||
ua = request.headers.get("user-agent")
|
||
base_payload = {"subject": message.subject, "mailbox_id": str(message.mailbox_id)}
|
||
|
||
# Mappa flag → coppia (action_true, action_false)
|
||
_FLAG_ACTIONS: dict[str, tuple[str, str]] = {
|
||
"is_read": ("message.read", "message.unread"),
|
||
"is_starred": ("message.starred", "message.unstarred"),
|
||
"is_archived": ("message.archived", "message.unarchived"),
|
||
"is_trashed": ("message.trashed", "message.restored"),
|
||
"is_pending_conservation": ("message.pending_conservation","message.conservation_cancelled"),
|
||
"is_conserved": ("message.conserved", "message.conservation_removed"),
|
||
}
|
||
|
||
if data.is_read is not None:
|
||
message.is_read = data.is_read
|
||
if data.is_starred is not None:
|
||
message.is_starred = data.is_starred
|
||
if data.is_archived is not None:
|
||
message.is_archived = data.is_archived
|
||
if data.is_archived and not message.archived_at:
|
||
message.archived_at = now
|
||
elif not data.is_archived:
|
||
message.archived_at = None
|
||
if data.is_trashed is not None:
|
||
message.is_trashed = data.is_trashed
|
||
if data.is_trashed and not message.trashed_at:
|
||
message.trashed_at = now
|
||
elif not data.is_trashed:
|
||
message.trashed_at = None
|
||
if data.is_pending_conservation is not None:
|
||
message.is_pending_conservation = data.is_pending_conservation
|
||
if data.is_pending_conservation and not message.pending_conservation_at:
|
||
message.pending_conservation_at = now
|
||
elif not data.is_pending_conservation:
|
||
message.pending_conservation_at = None
|
||
if data.is_conserved is not None:
|
||
message.is_conserved = data.is_conserved
|
||
if data.is_conserved and not message.conserved_at:
|
||
message.conserved_at = now
|
||
elif not data.is_conserved:
|
||
message.conserved_at = None
|
||
|
||
# Registra un evento di audit per ogni flag modificato
|
||
for field, (action_true, action_false) in _FLAG_ACTIONS.items():
|
||
value = getattr(data, field, None)
|
||
if value is not None:
|
||
action = action_true if value else action_false
|
||
await log_audit(
|
||
db,
|
||
action,
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
resource_type="message",
|
||
resource_id=message_id,
|
||
ip_address=ip,
|
||
user_agent=ua,
|
||
payload=base_payload,
|
||
)
|
||
|
||
await db.commit()
|
||
refreshed = await db.execute(
|
||
select(Message)
|
||
.where(Message.id == message_id)
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
message = refreshed.scalar_one()
|
||
return MessageResponse.model_validate(message)
|
||
|
||
|
||
@router.get("/{message_id}/attachments", response_model=list[AttachmentResponse])
|
||
async def list_attachments(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> list[AttachmentResponse]:
|
||
"""Elenca gli allegati di un messaggio."""
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Attachment)
|
||
.where(Attachment.message_id == message.id)
|
||
.order_by(Attachment.created_at)
|
||
)
|
||
attachments = list(result.scalars().all())
|
||
return [AttachmentResponse.model_validate(a) for a in attachments]
|
||
|
||
|
||
@router.get("/{message_id}/attachments/{attachment_id}/download")
|
||
async def download_attachment(
|
||
request: Request,
|
||
message_id: uuid.UUID,
|
||
attachment_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> StreamingResponse:
|
||
"""Scarica un allegato direttamente da MinIO."""
|
||
from app.services.audit_service import log_audit
|
||
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Attachment).where(
|
||
Attachment.id == attachment_id,
|
||
Attachment.message_id == message_id,
|
||
)
|
||
)
|
||
attachment = result.scalar_one_or_none()
|
||
if not attachment:
|
||
raise NotFoundError(f"Allegato {attachment_id} non trovato")
|
||
|
||
await log_audit(
|
||
db,
|
||
"message.attachment_downloaded",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
resource_type="attachment",
|
||
resource_id=attachment.id,
|
||
ip_address=get_real_ip(request),
|
||
user_agent=request.headers.get("user-agent"),
|
||
payload={
|
||
"filename": attachment.filename,
|
||
"message_id": str(message_id),
|
||
"size_bytes": attachment.size_bytes,
|
||
},
|
||
)
|
||
await db.commit()
|
||
|
||
try:
|
||
from miniopy_async import Minio
|
||
|
||
client = Minio(
|
||
endpoint=settings.minio_endpoint,
|
||
access_key=settings.minio_access_key,
|
||
secret_key=settings.minio_secret_key,
|
||
secure=settings.minio_use_ssl,
|
||
)
|
||
|
||
storage_path = attachment.storage_path
|
||
response = await client.get_object(settings.minio_bucket, storage_path)
|
||
|
||
content_type = attachment.content_type or "application/octet-stream"
|
||
filename = attachment.filename.replace('"', "'")
|
||
|
||
async def _stream():
|
||
async for chunk in response.content.iter_chunked(65536):
|
||
yield chunk
|
||
response.close()
|
||
|
||
return StreamingResponse(
|
||
_stream(),
|
||
media_type=content_type,
|
||
headers={
|
||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||
"Content-Length": str(attachment.size_bytes or ""),
|
||
},
|
||
)
|
||
except Exception as e:
|
||
from app.core.logging import get_logger
|
||
logger = get_logger(__name__)
|
||
logger.error(f"Errore download allegato {attachment_id}: {e}")
|
||
raise NotFoundError("File non disponibile al momento")
|
||
|
||
|
||
@router.get("/{message_id}/download-package")
|
||
async def download_package(
|
||
request: Request,
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> StreamingResponse:
|
||
"""Scarica un archivio ZIP con tutti i file originali della PEC."""
|
||
import io
|
||
import zipfile as _zipfile
|
||
|
||
from app.services.audit_service import log_audit
|
||
from miniopy_async import Minio
|
||
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
client = Minio(
|
||
endpoint=settings.minio_endpoint,
|
||
access_key=settings.minio_access_key,
|
||
secret_key=settings.minio_secret_key,
|
||
secure=settings.minio_use_ssl,
|
||
)
|
||
|
||
buf = io.BytesIO()
|
||
|
||
async def _read_minio(path: str) -> bytes:
|
||
try:
|
||
resp = await client.get_object(settings.minio_bucket, path)
|
||
data = await resp.content.read()
|
||
resp.close()
|
||
return data
|
||
except Exception:
|
||
return b""
|
||
|
||
with _zipfile.ZipFile(buf, mode="w", compression=_zipfile.ZIP_DEFLATED) as zf:
|
||
att_result = await db.execute(
|
||
select(Attachment)
|
||
.where(Attachment.message_id == message.id)
|
||
.order_by(Attachment.created_at)
|
||
)
|
||
main_attachments = list(att_result.scalars().all())
|
||
|
||
for att in main_attachments:
|
||
data = await _read_minio(att.storage_path)
|
||
if data:
|
||
zf.writestr(att.filename, data)
|
||
|
||
if message.raw_eml_path:
|
||
data = await _read_minio(message.raw_eml_path)
|
||
if data:
|
||
eml_name = message.raw_eml_path.rsplit("/", 1)[-1]
|
||
if not eml_name.endswith(".eml"):
|
||
eml_name = "messaggio_originale.eml"
|
||
existing = {info.filename for info in zf.infolist()}
|
||
if eml_name not in existing:
|
||
zf.writestr(eml_name, data)
|
||
|
||
if message.direction == "outbound":
|
||
receipts_result = await db.execute(
|
||
select(Message)
|
||
.where(Message.parent_message_id == message.id)
|
||
.order_by(Message.received_at.asc().nullslast(), Message.created_at.asc())
|
||
)
|
||
receipts = list(receipts_result.scalars().all())
|
||
|
||
for receipt in receipts:
|
||
pec_type = receipt.pec_type or "ricevuta"
|
||
folder = f"ricevute/{pec_type}"
|
||
|
||
r_att_result = await db.execute(
|
||
select(Attachment)
|
||
.where(Attachment.message_id == receipt.id)
|
||
.order_by(Attachment.created_at)
|
||
)
|
||
r_attachments = list(r_att_result.scalars().all())
|
||
|
||
for att in r_attachments:
|
||
data = await _read_minio(att.storage_path)
|
||
if data:
|
||
zip_path = f"{folder}/{att.filename}"
|
||
existing = {info.filename for info in zf.infolist()}
|
||
final_path = zip_path
|
||
counter = 1
|
||
while final_path in existing:
|
||
name, _, ext = att.filename.rpartition(".")
|
||
final_path = f"{folder}/{name}_{counter}.{ext}" if ext else f"{folder}/{att.filename}_{counter}"
|
||
counter += 1
|
||
zf.writestr(final_path, data)
|
||
|
||
if receipt.raw_eml_path:
|
||
data = await _read_minio(receipt.raw_eml_path)
|
||
if data:
|
||
eml_name = receipt.raw_eml_path.rsplit("/", 1)[-1]
|
||
if not eml_name.endswith(".eml"):
|
||
eml_name = f"{pec_type}.eml"
|
||
zip_path = f"{folder}/{eml_name}"
|
||
existing = {info.filename for info in zf.infolist()}
|
||
if zip_path not in existing:
|
||
zf.writestr(zip_path, data)
|
||
|
||
buf.seek(0)
|
||
zip_bytes = buf.getvalue()
|
||
|
||
safe_subject = (message.subject or "pec").replace("/", "_").replace("\\", "_")[:50]
|
||
zip_filename = f"pec_{safe_subject}.zip"
|
||
|
||
await log_audit(
|
||
db,
|
||
"message.package_downloaded",
|
||
tenant_id=current_user.tenant_id,
|
||
user_id=current_user.id,
|
||
resource_type="message",
|
||
resource_id=message.id,
|
||
ip_address=get_real_ip(request),
|
||
user_agent=request.headers.get("user-agent"),
|
||
payload={
|
||
"subject": message.subject,
|
||
"zip_filename": zip_filename,
|
||
"zip_size_bytes": len(zip_bytes),
|
||
},
|
||
)
|
||
await db.commit()
|
||
|
||
return StreamingResponse(
|
||
iter([zip_bytes]),
|
||
media_type="application/zip",
|
||
headers={
|
||
"Content-Disposition": f'attachment; filename="{zip_filename}"',
|
||
"Content-Length": str(len(zip_bytes)),
|
||
},
|
||
)
|
||
|
||
|
||
@router.get("/{message_id}/receipts", response_model=list[MessageResponse])
|
||
async def list_receipts(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> list[MessageResponse]:
|
||
"""Elenca le ricevute associate a un messaggio outbound."""
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Message)
|
||
.where(Message.parent_message_id == message_id)
|
||
.options(selectinload(Message.labels))
|
||
.order_by(Message.received_at.asc().nullslast(), Message.created_at.asc())
|
||
)
|
||
receipts = list(result.scalars().all())
|
||
return [MessageResponse.model_validate(r) for r in receipts]
|
||
|
||
|
||
# ─── Feature 3: Thread/conversazioni ─────────────────────────────────────────
|
||
|
||
@router.get("/{message_id}/thread", response_model=list[MessageResponse])
|
||
async def get_thread(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> list[MessageResponse]:
|
||
"""
|
||
Restituisce l'intera conversazione (thread) di cui fa parte il messaggio.
|
||
|
||
Risale alla radice della conversazione (risalendo i parent_message_id),
|
||
poi carica tutti i messaggi del thread ordinati cronologicamente.
|
||
Esclude le ricevute PEC (pec_type != posta_certificata).
|
||
"""
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
# Risale alla radice del thread
|
||
root_id = message.id
|
||
visited: set[uuid.UUID] = {message.id}
|
||
current = message
|
||
while current.parent_message_id and current.parent_message_id not in visited:
|
||
visited.add(current.parent_message_id)
|
||
parent_result = await db.execute(
|
||
select(Message).where(
|
||
Message.id == current.parent_message_id,
|
||
Message.tenant_id == current_user.tenant_id,
|
||
)
|
||
)
|
||
parent = parent_result.scalar_one_or_none()
|
||
if not parent:
|
||
break
|
||
current = parent
|
||
root_id = current.id
|
||
|
||
# Carica ricorsivamente tutti i messaggi del thread dalla radice
|
||
# Limita a posta_certificata (esclude accettazioni, consegne, ecc.)
|
||
thread_messages: list[Message] = []
|
||
|
||
async def _collect(msg_id: uuid.UUID) -> None:
|
||
result = await db.execute(
|
||
select(Message)
|
||
.where(
|
||
Message.id == msg_id,
|
||
Message.tenant_id == current_user.tenant_id,
|
||
Message.pec_type == "posta_certificata",
|
||
)
|
||
.options(selectinload(Message.labels))
|
||
)
|
||
msg = result.scalar_one_or_none()
|
||
if msg:
|
||
thread_messages.append(msg)
|
||
|
||
children_result = await db.execute(
|
||
select(Message)
|
||
.where(
|
||
Message.parent_message_id == msg_id,
|
||
Message.tenant_id == current_user.tenant_id,
|
||
Message.pec_type == "posta_certificata",
|
||
)
|
||
.options(selectinload(Message.labels))
|
||
.order_by(Message.received_at.asc().nullslast(), Message.created_at.asc())
|
||
)
|
||
children = list(children_result.scalars().all())
|
||
for child in children:
|
||
await _collect(child.id)
|
||
|
||
await _collect(root_id)
|
||
|
||
# Ordina cronologicamente
|
||
thread_messages.sort(
|
||
key=lambda m: m.received_at or m.sent_at or m.created_at
|
||
)
|
||
|
||
return [MessageResponse.model_validate(m) for m in thread_messages]
|
||
|
||
|
||
# ─── Feature 7: Preview allegati (presigned URL) ──────────────────────────────
|
||
|
||
@router.get("/{message_id}/attachments/{attachment_id}/preview-url")
|
||
async def get_attachment_preview_url(
|
||
message_id: uuid.UUID,
|
||
attachment_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> dict:
|
||
"""
|
||
Restituisce una presigned URL MinIO per la preview inline dell'allegato.
|
||
|
||
La URL e' valida per 5 minuti. Supporta PDF e immagini.
|
||
Per altri tipi di file reindirizza al download normale.
|
||
"""
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Attachment).where(
|
||
Attachment.id == attachment_id,
|
||
Attachment.message_id == message_id,
|
||
)
|
||
)
|
||
attachment = result.scalar_one_or_none()
|
||
if not attachment:
|
||
raise NotFoundError(f"Allegato {attachment_id} non trovato")
|
||
|
||
content_type = attachment.content_type or "application/octet-stream"
|
||
previewable = (
|
||
content_type.startswith("image/") or
|
||
content_type == "application/pdf"
|
||
)
|
||
|
||
if not previewable:
|
||
return {
|
||
"previewable": False,
|
||
"content_type": content_type,
|
||
"filename": attachment.filename,
|
||
}
|
||
|
||
try:
|
||
from datetime import timedelta as _timedelta
|
||
from miniopy_async import Minio
|
||
|
||
client = Minio(
|
||
endpoint=settings.minio_endpoint,
|
||
access_key=settings.minio_access_key,
|
||
secret_key=settings.minio_secret_key,
|
||
secure=settings.minio_use_ssl,
|
||
)
|
||
presigned_url = await client.presigned_get_object(
|
||
settings.minio_bucket,
|
||
attachment.storage_path,
|
||
expires=_timedelta(minutes=5),
|
||
)
|
||
return {
|
||
"previewable": True,
|
||
"content_type": content_type,
|
||
"filename": attachment.filename,
|
||
"url": presigned_url,
|
||
}
|
||
except Exception as e:
|
||
from app.core.logging import get_logger
|
||
logger = get_logger(__name__)
|
||
logger.error(f"Errore generazione presigned URL allegato {attachment_id}: {e}")
|
||
return {
|
||
"previewable": False,
|
||
"content_type": content_type,
|
||
"filename": attachment.filename,
|
||
}
|
||
|
||
|
||
# ─── Feature 7b: Serve allegato inline per preview ───────────────────────────
|
||
|
||
@router.get("/{message_id}/attachments/{attachment_id}/inline")
|
||
async def inline_attachment(
|
||
message_id: uuid.UUID,
|
||
attachment_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
):
|
||
"""
|
||
Serve l'allegato con Content-Disposition: inline per la preview nel browser.
|
||
|
||
Legge il file da MinIO internamente e lo restituisce attraverso il backend,
|
||
evitando i problemi di accessibilita' con i presigned URL di MinIO in ambienti Docker.
|
||
Supporta PDF e immagini; per altri tipi restituisce 404.
|
||
"""
|
||
await _resolve_message(message_id, current_user, db)
|
||
|
||
result = await db.execute(
|
||
select(Attachment).where(
|
||
Attachment.id == attachment_id,
|
||
Attachment.message_id == message_id,
|
||
)
|
||
)
|
||
attachment = result.scalar_one_or_none()
|
||
if not attachment:
|
||
raise NotFoundError(f"Allegato {attachment_id} non trovato")
|
||
|
||
content_type = attachment.content_type or "application/octet-stream"
|
||
filename = attachment.filename.replace('"', "'")
|
||
|
||
try:
|
||
from miniopy_async import Minio
|
||
|
||
client = Minio(
|
||
endpoint=settings.minio_endpoint,
|
||
access_key=settings.minio_access_key,
|
||
secret_key=settings.minio_secret_key,
|
||
secure=settings.minio_use_ssl,
|
||
)
|
||
response = await client.get_object(settings.minio_bucket, attachment.storage_path)
|
||
data = await response.content.read()
|
||
response.close()
|
||
return StreamingResponse(
|
||
iter([data]),
|
||
media_type=content_type,
|
||
headers={
|
||
"Content-Disposition": f'inline; filename="{filename}"',
|
||
"Content-Length": str(len(data)),
|
||
},
|
||
)
|
||
except Exception as e:
|
||
from app.core.logging import get_logger
|
||
logger = get_logger(__name__)
|
||
logger.error(f"Errore inline allegato {attachment_id}: {e}")
|
||
raise NotFoundError("File non disponibile al momento")
|
||
|
||
|
||
# ─── Feature 8: Stampa/export HTML ────────────────────────────────────────────
|
||
|
||
@router.get("/{message_id}/print")
|
||
async def print_message(
|
||
message_id: uuid.UUID,
|
||
current_user: CurrentUser,
|
||
db: DB,
|
||
) -> "HTMLResponse":
|
||
"""
|
||
Restituisce una rappresentazione HTML ottimizzata per la stampa del messaggio.
|
||
|
||
Include: intestazione, corpo, lista allegati, albero ricevute.
|
||
Pronto per window.print() o salvataggio come PDF tramite browser.
|
||
"""
|
||
from fastapi.responses import HTMLResponse
|
||
|
||
message = await _resolve_message(message_id, current_user, db)
|
||
|
||
att_result = await db.execute(
|
||
select(Attachment).where(Attachment.message_id == message.id).order_by(Attachment.created_at)
|
||
)
|
||
attachments = list(att_result.scalars().all())
|
||
|
||
receipts_html = ""
|
||
if message.direction == "outbound":
|
||
rec_result = await db.execute(
|
||
select(Message)
|
||
.where(Message.parent_message_id == message.id)
|
||
.order_by(Message.received_at.asc().nullslast(), Message.created_at.asc())
|
||
)
|
||
receipts = list(rec_result.scalars().all())
|
||
|
||
PEC_TYPE_LABELS = {
|
||
"accettazione": "Accettazione",
|
||
"avvenuta_consegna": "Avvenuta consegna",
|
||
"non_accettazione": "Non accettazione",
|
||
"mancata_consegna": "Mancata consegna",
|
||
"errore_consegna": "Errore consegna",
|
||
"presa_in_carico": "Presa in carico",
|
||
"preavviso_mancata_consegna": "Preavviso mancata consegna",
|
||
"rilevazione_virus": "Rilevazione virus",
|
||
}
|
||
|
||
receipt_rows = ""
|
||
for r in receipts:
|
||
label = PEC_TYPE_LABELS.get(r.pec_type, r.pec_type)
|
||
date_str = ""
|
||
if r.received_at:
|
||
date_str = r.received_at.strftime("%d/%m/%Y %H:%M:%S")
|
||
receipt_rows += f"<tr><td>{label}</td><td>{date_str}</td></tr>"
|
||
|
||
if receipt_rows:
|
||
receipts_html = f"""
|
||
<section>
|
||
<h3>Tracciamento invio</h3>
|
||
<table>
|
||
<thead><tr><th>Tipo ricevuta</th><th>Data</th></tr></thead>
|
||
<tbody>{receipt_rows}</tbody>
|
||
</table>
|
||
</section>"""
|
||
|
||
att_rows = ""
|
||
for att in attachments:
|
||
size_str = f"{att.size_bytes:,} byte" if att.size_bytes else ""
|
||
att_rows += f"<li>{att.filename} ({att.content_type or ''}) {size_str}</li>"
|
||
|
||
att_html = f"<section><h3>Allegati ({len(attachments)})</h3><ul>{att_rows}</ul></section>" if attachments else ""
|
||
|
||
from_label = "Da" if message.direction == "inbound" else "A"
|
||
from_val = message.from_address if message.direction == "inbound" else ", ".join(message.to_addresses or [])
|
||
date_val = ""
|
||
date_field = message.received_at or message.sent_at or message.created_at
|
||
if date_field:
|
||
date_val = date_field.strftime("%d/%m/%Y %H:%M:%S")
|
||
|
||
body_html = ""
|
||
if message.body_html:
|
||
body_html = f"<div class='body'>{message.body_html}</div>"
|
||
elif message.body_text:
|
||
body_html = f"<pre class='body'>{message.body_text}</pre>"
|
||
|
||
deadline_html = ""
|
||
if message.deadline_at:
|
||
dl_str = message.deadline_at.strftime("%d/%m/%Y %H:%M")
|
||
deadline_html = f"<p class='deadline'><strong>Scadenza:</strong> {dl_str}</p>"
|
||
if message.deadline_note:
|
||
deadline_html += f"<p><em>Nota scadenza: {message.deadline_note}</em></p>"
|
||
|
||
html = f"""<!DOCTYPE html>
|
||
<html lang="it">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<title>PEC - {message.subject or '(nessun oggetto)'}</title>
|
||
<style>
|
||
body {{ font-family: Arial, sans-serif; font-size: 12pt; margin: 2cm; color: #000; }}
|
||
h1 {{ font-size: 16pt; border-bottom: 2px solid #333; padding-bottom: 8px; }}
|
||
h3 {{ font-size: 13pt; margin-top: 20px; border-bottom: 1px solid #ccc; }}
|
||
.meta {{ background: #f5f5f5; border: 1px solid #ddd; padding: 12px; margin-bottom: 16px; }}
|
||
.meta p {{ margin: 4px 0; }}
|
||
.body {{ border: 1px solid #ddd; padding: 12px; white-space: pre-wrap; word-wrap: break-word; }}
|
||
table {{ border-collapse: collapse; width: 100%; }}
|
||
th, td {{ border: 1px solid #ccc; padding: 6px 10px; text-align: left; }}
|
||
th {{ background: #eee; }}
|
||
ul {{ padding-left: 20px; }}
|
||
.deadline {{ color: #c00; font-weight: bold; }}
|
||
@media print {{ body {{ margin: 1cm; }} }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>{message.subject or '(nessun oggetto)'}</h1>
|
||
<div class="meta">
|
||
<p><strong>{from_label}:</strong> {from_val}</p>
|
||
{'<p><strong>Da:</strong> ' + message.from_address + '</p>' if message.direction == "outbound" and message.from_address else ''}
|
||
{'<p><strong>A:</strong> ' + ', '.join(message.to_addresses or []) + '</p>' if message.direction == "inbound" and message.to_addresses else ''}
|
||
{'<p><strong>Cc:</strong> ' + ', '.join(message.cc_addresses or []) + '</p>' if message.cc_addresses else ''}
|
||
<p><strong>Data:</strong> {date_val}</p>
|
||
<p><strong>Stato:</strong> {message.state}</p>
|
||
<p><strong>Tipo:</strong> {message.pec_type}</p>
|
||
</div>
|
||
{deadline_html}
|
||
{body_html}
|
||
{att_html}
|
||
{receipts_html}
|
||
<p style="margin-top: 30px; font-size: 9pt; color: #888;">
|
||
Documento generato da PEChub il {date_val} – ID messaggio: {message.id}
|
||
</p>
|
||
</body>
|
||
</html>"""
|
||
|
||
return HTMLResponse(content=html, media_type="text/html; charset=utf-8")
|