Files
PecHub/backend/app/services/permission_service.py
T
2026-03-27 14:43:42 +01:00

296 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Servizio permessi granulari gestione accessi utente × casella (Fase 1-A).
Gerarchia:
super_admin / admin → accesso implicito a tutto (no record in mailbox_permissions)
supervisor / operator / readonly → richiedono record esplicito
"""
import uuid
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError, PermissionDeniedError
from app.models.mailbox import Mailbox
from app.models.permission import MailboxPermission
from app.models.user import User
from app.schemas.permission import PermissionGrantRequest
class PermissionService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ─── Verifica accessi ─────────────────────────────────────────────────────
async def get_visible_mailboxes(
self, user: User
) -> list[uuid.UUID]:
"""Restituisce gli UUID delle caselle visibili all'utente.
Admin e supervisor vedono tutte le caselle del tenant.
Operator e readonly vedono solo le caselle con can_read=True esplicito.
"""
if user.role in ("super_admin", "admin", "supervisor"):
# Admin e supervisor vedono tutte le caselle del tenant
result = await self.db.execute(
select(Mailbox.id).where(
Mailbox.tenant_id == user.tenant_id,
Mailbox.status != "deleted",
)
)
return [row[0] for row in result.all()]
# Operator e readonly: solo caselle con can_read=True esplicito
result = await self.db.execute(
select(MailboxPermission.mailbox_id).where(
MailboxPermission.user_id == user.id,
MailboxPermission.can_read == True,
)
)
return [row[0] for row in result.all()]
async def check_can_read(
self, user: User, mailbox_id: uuid.UUID
) -> bool:
"""Verifica se l'utente puo' leggere i messaggi della casella.
Admin e supervisor hanno accesso implicito a tutte le caselle del tenant.
Operator e readonly richiedono permesso esplicito can_read.
"""
if user.role in ("super_admin", "admin", "supervisor"):
# Admin e supervisor: verifica solo che la casella appartenga al tenant
return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id)
perm = await self._get_permission(user.id, mailbox_id)
return perm is not None and perm.can_read
async def check_can_send(
self, user: User, mailbox_id: uuid.UUID
) -> bool:
"""
Verifica se l'utente puo' inviare dalla casella.
L'accesso in invio e' concesso se:
1. L'utente e' admin del tenant, oppure
2. L'utente ha un permesso diretto can_send sulla casella, oppure
3. L'utente e' assegnato a una Virtual Box attiva che include la casella.
Nota: il supervisor NON ha invio implicito richiede can_send esplicito
come operator, ma diversamente da operator vede tutte le caselle.
"""
if user.role in ("super_admin", "admin"):
return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id)
perm = await self._get_permission(user.id, mailbox_id)
if perm is not None and perm.can_send:
return True
# Fallback: verifica accesso tramite Virtual Box
return await self._check_vbox_mailbox_access(user.id, mailbox_id, user.tenant_id)
async def check_can_manage(
self, user: User, mailbox_id: uuid.UUID
) -> bool:
"""Verifica se l'utente può gestire la configurazione della casella."""
if user.role in ("super_admin", "admin"):
return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id)
perm = await self._get_permission(user.id, mailbox_id)
return perm is not None and perm.can_manage
async def require_can_read(self, user: User, mailbox_id: uuid.UUID) -> None:
"""Solleva 403 se l'utente non può leggere."""
if not await self.check_can_read(user, mailbox_id):
raise PermissionDeniedError("casella")
async def require_can_send(self, user: User, mailbox_id: uuid.UUID) -> None:
if not await self.check_can_send(user, mailbox_id):
raise PermissionDeniedError("casella (invio)")
# ─── CRUD permessi ────────────────────────────────────────────────────────
async def grant_permission(
self,
tenant_id: uuid.UUID,
mailbox_id: uuid.UUID,
user_id: uuid.UUID,
data: PermissionGrantRequest,
granted_by: User,
) -> MailboxPermission:
"""
Crea o aggiorna un permesso utente su una casella.
Solo admin può gestire i permessi.
"""
if not granted_by.is_admin:
raise ForbiddenError("Solo gli amministratori possono gestire i permessi")
# Verifica che casella e utente appartengano al tenant
mailbox = await self.db.get(Mailbox, mailbox_id)
if not mailbox or mailbox.tenant_id != tenant_id:
raise NotFoundError("casella")
target_user = await self.db.get(User, user_id)
if not target_user or target_user.tenant_id != tenant_id:
raise NotFoundError("utente")
# Non serve permesso esplicito per admin
if target_user.role in ("super_admin", "admin"):
raise ForbiddenError("Gli admin hanno già accesso implicito a tutte le caselle")
# Cerca permesso esistente (upsert)
existing = await self._get_permission(user_id, mailbox_id)
if existing:
existing.can_read = data.can_read
existing.can_send = data.can_send
existing.can_manage = data.can_manage
existing.granted_by = granted_by.id
return existing
perm = MailboxPermission(
tenant_id=tenant_id,
user_id=user_id,
mailbox_id=mailbox_id,
can_read=data.can_read,
can_send=data.can_send,
can_manage=data.can_manage,
granted_by=granted_by.id,
)
self.db.add(perm)
await self.db.flush()
return perm
async def revoke_permission(
self,
mailbox_id: uuid.UUID,
user_id: uuid.UUID,
revoked_by: User,
) -> None:
if not revoked_by.is_admin:
raise ForbiddenError("Solo gli amministratori possono revocare i permessi")
result = await self.db.execute(
delete(MailboxPermission).where(
MailboxPermission.mailbox_id == mailbox_id,
MailboxPermission.user_id == user_id,
)
)
if result.rowcount == 0:
raise NotFoundError("permesso")
async def list_mailbox_users(
self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID
) -> list[dict]:
"""Ritorna tutti gli utenti con permesso esplicito su questa casella."""
result = await self.db.execute(
select(MailboxPermission, User)
.join(User, MailboxPermission.user_id == User.id)
.where(
MailboxPermission.mailbox_id == mailbox_id,
MailboxPermission.tenant_id == tenant_id,
)
)
rows = result.all()
return [
{
"user_id": perm.user_id,
"user_email": user.email,
"user_full_name": user.full_name,
"user_role": user.role,
"can_read": perm.can_read,
"can_send": perm.can_send,
"can_manage": perm.can_manage,
"granted_at": perm.granted_at,
}
for perm, user in rows
]
async def list_user_mailboxes(
self, user_id: uuid.UUID, tenant_id: uuid.UUID
) -> list[dict]:
"""Ritorna tutte le caselle accessibili a un utente (permessi espliciti)."""
result = await self.db.execute(
select(MailboxPermission, Mailbox)
.join(Mailbox, MailboxPermission.mailbox_id == Mailbox.id)
.where(
MailboxPermission.user_id == user_id,
MailboxPermission.tenant_id == tenant_id,
MailboxPermission.can_read == True,
)
)
rows = result.all()
return [
{
"mailbox_id": perm.mailbox_id,
"mailbox_email": mailbox.email_address,
"mailbox_display_name": mailbox.display_name,
"can_read": perm.can_read,
"can_send": perm.can_send,
"can_manage": perm.can_manage,
}
for perm, mailbox in rows
]
# ─── Private ──────────────────────────────────────────────────────────────
async def _get_permission(
self, user_id: uuid.UUID, mailbox_id: uuid.UUID
) -> MailboxPermission | None:
result = await self.db.execute(
select(MailboxPermission).where(
MailboxPermission.user_id == user_id,
MailboxPermission.mailbox_id == mailbox_id,
)
)
return result.scalar_one_or_none()
async def _mailbox_belongs_to_tenant(
self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID
) -> bool:
result = await self.db.execute(
select(Mailbox.id).where(
Mailbox.id == mailbox_id,
Mailbox.tenant_id == tenant_id,
Mailbox.status != "deleted",
)
)
return result.scalar_one_or_none() is not None
async def _check_vbox_mailbox_access(
self,
user_id: uuid.UUID,
mailbox_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> bool:
"""
Verifica se l'utente ha accesso a una casella tramite Virtual Box.
Restituisce True se l'utente è assegnato ad almeno una VBox attiva
che include la casella specificata.
"""
from app.models.virtual_box import (
VirtualBox,
VirtualBoxAssignment,
virtual_box_mailboxes,
)
result = await self.db.execute(
select(VirtualBox.id)
.join(
VirtualBoxAssignment,
VirtualBox.id == VirtualBoxAssignment.virtual_box_id,
)
.join(
virtual_box_mailboxes,
VirtualBox.id == virtual_box_mailboxes.c.virtual_box_id,
)
.where(
VirtualBoxAssignment.user_id == user_id,
virtual_box_mailboxes.c.mailbox_id == mailbox_id,
VirtualBox.tenant_id == tenant_id,
VirtualBox.is_active == True,
)
.limit(1)
)
return result.scalar_one_or_none() is not None