Files
PecHub/backend/app/api/v1/mailboxes.py
T
2026-03-27 14:20:59 +01:00

300 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Router caselle PEC CRUD + test connessione.
Permessi:
- admin: CRUD completo su tutte le caselle del tenant
- altri ruoli: solo lettura (caselle accessibili tramite PermissionService)
"""
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ForbiddenError
from app.database import get_db
from app.dependencies import AdminUser, CurrentUser, DB
from app.schemas.mailbox import (
ConnectionTestRequest,
ConnectionTestResult,
MailboxCreateRequest,
MailboxListResponse,
MailboxResponse,
MailboxSyncResponse,
MailboxUnreadCountsResponse,
MailboxUpdateRequest,
)
from app.services.mailbox_service import MailboxService
router = APIRouter(prefix="/mailboxes", tags=["Mailboxes"])
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _svc(db: AsyncSession) -> MailboxService:
return MailboxService(db)
def _build_response(mailbox, svc: MailboxService) -> MailboxResponse:
return MailboxResponse(**MailboxService.to_response_dict(mailbox))
# ─── Endpoints ───────────────────────────────────────────────────────────────
@router.post("", response_model=MailboxResponse, status_code=status.HTTP_201_CREATED)
async def create_mailbox(
data: MailboxCreateRequest,
current_user: AdminUser,
db: DB,
) -> MailboxResponse:
"""
Crea una nuova casella PEC.
Richiede ruolo **admin** o **super_admin**.
Le credenziali vengono cifrate con AES-256-GCM prima della persistenza.
"""
svc = _svc(db)
mailbox = await svc.create_mailbox(
tenant_id=current_user.tenant_id,
data=data,
created_by=current_user.id,
)
await db.commit()
return _build_response(mailbox, svc)
@router.get("", response_model=MailboxListResponse)
async def list_mailboxes(
current_user: CurrentUser,
db: DB,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
) -> MailboxListResponse:
"""
Elenca le caselle PEC.
- Admin vede tutte le caselle del tenant.
- Operatori vedono solo le caselle su cui hanno permesso can_read.
"""
svc = _svc(db)
if current_user.is_admin:
# Admin: tutte le caselle del tenant
items, total = await svc.list_mailboxes(
tenant_id=current_user.tenant_id,
page=page,
page_size=page_size,
)
else:
# Operatori: caselle con permesso
from app.services.permission_service import PermissionService
perm_svc = PermissionService(db)
visible_ids = await perm_svc.get_visible_mailboxes(current_user)
if not visible_ids:
return MailboxListResponse(items=[], total=0, page=page, page_size=page_size)
from sqlalchemy import select
from app.models.mailbox import Mailbox
from sqlalchemy import func
q = (
select(Mailbox)
.where(
Mailbox.id.in_(visible_ids),
Mailbox.status != "deleted",
)
.order_by(Mailbox.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
count_q = select(func.count()).select_from(
select(Mailbox.id).where(
Mailbox.id.in_(visible_ids),
Mailbox.status != "deleted",
).subquery()
)
result = await db.execute(q)
items = list(result.scalars().all())
total = (await db.execute(count_q)).scalar_one()
return MailboxListResponse(
items=[_build_response(m, svc) for m in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/unread-counts", response_model=MailboxUnreadCountsResponse)
async def get_unread_counts(
current_user: CurrentUser,
db: DB,
) -> MailboxUnreadCountsResponse:
"""
Restituisce il numero di messaggi non letti per ciascuna casella accessibile.
Usato dalla sidebar per mostrare i badge per casella.
- Admin: conta su tutte le caselle del tenant.
- Operatori: solo le caselle con permesso can_read.
"""
from sqlalchemy import func, select
from app.models.message import Message
# Determina le caselle visibili
if current_user.is_admin:
visible_ids = None # nessun filtro
else:
from app.services.permission_service import PermissionService
perm_svc = PermissionService(db)
visible_ids = await perm_svc.get_visible_mailboxes(current_user)
if not visible_ids:
return MailboxUnreadCountsResponse(counts={})
q = (
select(Message.mailbox_id, func.count().label("cnt"))
.where(
Message.tenant_id == current_user.tenant_id,
Message.is_read == False, # noqa: E712
Message.direction == "inbound",
Message.is_trashed == False, # noqa: E712
Message.is_archived == False, # noqa: E712
Message.parent_message_id.is_(None),
)
.group_by(Message.mailbox_id)
)
if visible_ids is not None:
from app.models.mailbox import Mailbox
q = q.where(Message.mailbox_id.in_(visible_ids))
rows = (await db.execute(q)).all()
counts = {str(row.mailbox_id): row.cnt for row in rows}
return MailboxUnreadCountsResponse(counts=counts)
@router.get("/{mailbox_id}", response_model=MailboxResponse)
async def get_mailbox(
mailbox_id: uuid.UUID,
current_user: CurrentUser,
db: DB,
) -> MailboxResponse:
"""Carica una casella PEC per ID."""
svc = _svc(db)
if not current_user.is_admin:
from app.services.permission_service import PermissionService
perm_svc = PermissionService(db)
if not await perm_svc.check_can_read(current_user, mailbox_id):
raise ForbiddenError("Accesso alla casella non autorizzato")
mailbox = await svc.get_mailbox(mailbox_id, current_user.tenant_id)
return _build_response(mailbox, svc)
@router.put("/{mailbox_id}", response_model=MailboxResponse)
async def update_mailbox(
mailbox_id: uuid.UUID,
data: MailboxUpdateRequest,
current_user: AdminUser,
db: DB,
) -> MailboxResponse:
"""
Aggiorna una casella PEC.
Richiede ruolo **admin** o **super_admin**.
Se vengono fornite nuove credenziali, vengono ri-cifrate.
"""
svc = _svc(db)
mailbox = await svc.update_mailbox(
mailbox_id=mailbox_id,
tenant_id=current_user.tenant_id,
data=data,
)
await db.commit()
return _build_response(mailbox, svc)
@router.delete("/{mailbox_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_mailbox(
mailbox_id: uuid.UUID,
current_user: AdminUser,
db: DB,
) -> None:
"""
Soft-delete di una casella PEC (status=deleted).
Richiede ruolo **admin**.
"""
svc = _svc(db)
await svc.delete_mailbox(mailbox_id, current_user.tenant_id)
await db.commit()
@router.post(
"/{mailbox_id}/test-connection",
response_model=ConnectionTestResult,
)
async def test_mailbox_connection(
mailbox_id: uuid.UUID,
data: ConnectionTestRequest,
current_user: AdminUser,
db: DB,
) -> ConnectionTestResult:
"""
Testa la connessione IMAP o SMTP della casella.
Non invia messaggi solo verifica la connessione.
Richiede ruolo **admin**.
"""
svc = _svc(db)
return await svc.test_connection(
mailbox_id=mailbox_id,
tenant_id=current_user.tenant_id,
data=data,
)
@router.post(
"/{mailbox_id}/sync",
response_model=MailboxSyncResponse,
)
async def force_sync_mailbox(
mailbox_id: uuid.UUID,
current_user: AdminUser,
db: DB,
) -> MailboxSyncResponse:
"""
Forza una sincronizzazione IMAP immediata della casella.
Accoda il job sync_mailbox nel worker tramite arq/Redis.
Utile dopo un errore di connessione o per forzare un aggiornamento.
Richiede ruolo **admin**.
"""
from app.core.exceptions import NotFoundError
svc = _svc(db)
mailbox = await svc.get_mailbox(mailbox_id, current_user.tenant_id)
if mailbox.status == "deleted":
raise NotFoundError("Casella non trovata o eliminata")
try:
from arq.connections import RedisSettings, create_pool as arq_create_pool
from app.config import get_settings
cfg = get_settings()
arq_settings = RedisSettings.from_dsn(cfg.redis_url)
arq_redis = await arq_create_pool(arq_settings)
await arq_redis.enqueue_job("sync_mailbox", str(mailbox_id))
await arq_redis.aclose()
except Exception as exc:
from app.core.logging import get_logger
logger = get_logger(__name__)
logger.error(f"[force_sync] Impossibile accodare job per {mailbox_id}: {exc}")
return MailboxSyncResponse(
status="error",
mailbox_id=mailbox_id,
message=f"Impossibile accodare il job: {exc}",
)
return MailboxSyncResponse(
status="enqueued",
mailbox_id=mailbox_id,
message=f"Sincronizzazione avviata per {mailbox.email_address}",
)