""" Servizio permessi granulari – gestione accessi utente × casella (Fase 1-A). Gerarchia: super_admin / admin → accesso implicito a tutto (no record in mailbox_permissions) supervisor / operator / readonly → richiedono record esplicito """ import uuid from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError, PermissionDeniedError from app.models.mailbox import Mailbox from app.models.permission import MailboxPermission from app.models.user import User from app.schemas.permission import PermissionGrantRequest class PermissionService: def __init__(self, db: AsyncSession) -> None: self.db = db # ─── Verifica accessi ───────────────────────────────────────────────────── async def get_visible_mailboxes( self, user: User ) -> list[uuid.UUID]: """Restituisce gli UUID delle caselle visibili all'utente. Admin e supervisor vedono tutte le caselle del tenant. Operator e readonly vedono solo le caselle con can_read=True esplicito. """ if user.role in ("super_admin", "admin", "supervisor"): # Admin e supervisor vedono tutte le caselle del tenant result = await self.db.execute( select(Mailbox.id).where( Mailbox.tenant_id == user.tenant_id, Mailbox.status != "deleted", ) ) return [row[0] for row in result.all()] # Operator e readonly: solo caselle con can_read=True esplicito result = await self.db.execute( select(MailboxPermission.mailbox_id).where( MailboxPermission.user_id == user.id, MailboxPermission.can_read == True, ) ) return [row[0] for row in result.all()] async def check_can_read( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente puo' leggere i messaggi della casella. Admin e supervisor hanno accesso implicito a tutte le caselle del tenant. Operator e readonly richiedono permesso esplicito can_read. """ if user.role in ("super_admin", "admin", "supervisor"): # Admin e supervisor: verifica solo che la casella appartenga al tenant return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_read async def check_can_send( self, user: User, mailbox_id: uuid.UUID ) -> bool: """ Verifica se l'utente puo' inviare dalla casella. L'accesso in invio e' concesso se: 1. L'utente e' admin del tenant, oppure 2. L'utente ha un permesso diretto can_send sulla casella, oppure 3. L'utente e' assegnato a una Virtual Box attiva che include la casella. Nota: il supervisor NON ha invio implicito – richiede can_send esplicito come operator, ma diversamente da operator vede tutte le caselle. """ if user.role in ("super_admin", "admin"): return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) if perm is not None and perm.can_send: return True # Fallback: verifica accesso tramite Virtual Box return await self._check_vbox_mailbox_access(user.id, mailbox_id, user.tenant_id) async def check_can_manage( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente può gestire la configurazione della casella.""" if user.role in ("super_admin", "admin"): return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_manage async def check_can_conserve( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente puo' spostare messaggi nella cartella Conservazione. Admin/super_admin: accesso implicito sempre. Supervisor: richiede permesso esplicito can_conserve=True. Operator/readonly: non autorizzati (richiedono permesso esplicito). """ if user.role in ("super_admin", "admin"): return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) # Supervisor, operator e readonly richiedono record esplicito perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_conserve async def require_can_read(self, user: User, mailbox_id: uuid.UUID) -> None: """Solleva 403 se l'utente non può leggere.""" if not await self.check_can_read(user, mailbox_id): raise PermissionDeniedError("casella") async def require_can_send(self, user: User, mailbox_id: uuid.UUID) -> None: if not await self.check_can_send(user, mailbox_id): raise PermissionDeniedError("casella (invio)") async def require_can_conserve(self, user: User, mailbox_id: uuid.UUID) -> None: """Solleva 403 se l'utente non puo' spostare messaggi in Conservazione.""" if not await self.check_can_conserve(user, mailbox_id): raise PermissionDeniedError("casella (conservazione)") # ─── CRUD permessi ──────────────────────────────────────────────────────── async def grant_permission( self, tenant_id: uuid.UUID, mailbox_id: uuid.UUID, user_id: uuid.UUID, data: PermissionGrantRequest, granted_by: User, ) -> MailboxPermission: """ Crea o aggiorna un permesso utente su una casella. Solo admin può gestire i permessi. """ if not granted_by.is_admin: raise ForbiddenError("Solo gli amministratori possono gestire i permessi") # Verifica che casella e utente appartengano al tenant mailbox = await self.db.get(Mailbox, mailbox_id) if not mailbox or mailbox.tenant_id != tenant_id: raise NotFoundError("casella") target_user = await self.db.get(User, user_id) if not target_user or target_user.tenant_id != tenant_id: raise NotFoundError("utente") # Non serve permesso esplicito per admin if target_user.role in ("super_admin", "admin"): raise ForbiddenError("Gli admin hanno già accesso implicito a tutte le caselle") # Cerca permesso esistente (upsert) existing = await self._get_permission(user_id, mailbox_id) if existing: existing.can_read = data.can_read existing.can_send = data.can_send existing.can_manage = data.can_manage existing.can_conserve = data.can_conserve existing.granted_by = granted_by.id return existing perm = MailboxPermission( tenant_id=tenant_id, user_id=user_id, mailbox_id=mailbox_id, can_read=data.can_read, can_send=data.can_send, can_manage=data.can_manage, can_conserve=data.can_conserve, granted_by=granted_by.id, ) self.db.add(perm) await self.db.flush() return perm async def revoke_permission( self, mailbox_id: uuid.UUID, user_id: uuid.UUID, revoked_by: User, ) -> None: if not revoked_by.is_admin: raise ForbiddenError("Solo gli amministratori possono revocare i permessi") result = await self.db.execute( delete(MailboxPermission).where( MailboxPermission.mailbox_id == mailbox_id, MailboxPermission.user_id == user_id, ) ) if result.rowcount == 0: raise NotFoundError("permesso") async def list_mailbox_users( self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID ) -> list[dict]: """Ritorna tutti gli utenti con permesso esplicito su questa casella.""" result = await self.db.execute( select(MailboxPermission, User) .join(User, MailboxPermission.user_id == User.id) .where( MailboxPermission.mailbox_id == mailbox_id, MailboxPermission.tenant_id == tenant_id, ) ) rows = result.all() return [ { "user_id": perm.user_id, "user_email": user.email, "user_full_name": user.full_name, "user_role": user.role, "can_read": perm.can_read, "can_send": perm.can_send, "can_manage": perm.can_manage, "can_conserve": perm.can_conserve, "granted_at": perm.granted_at, } for perm, user in rows ] async def list_user_mailboxes( self, user_id: uuid.UUID, tenant_id: uuid.UUID ) -> list[dict]: """Ritorna tutte le caselle accessibili a un utente (permessi espliciti).""" result = await self.db.execute( select(MailboxPermission, Mailbox) .join(Mailbox, MailboxPermission.mailbox_id == Mailbox.id) .where( MailboxPermission.user_id == user_id, MailboxPermission.tenant_id == tenant_id, MailboxPermission.can_read == True, ) ) rows = result.all() return [ { "mailbox_id": perm.mailbox_id, "mailbox_email": mailbox.email_address, "mailbox_display_name": mailbox.display_name, "can_read": perm.can_read, "can_send": perm.can_send, "can_manage": perm.can_manage, "can_conserve": perm.can_conserve, } for perm, mailbox in rows ] # ─── Private ────────────────────────────────────────────────────────────── async def _get_permission( self, user_id: uuid.UUID, mailbox_id: uuid.UUID ) -> MailboxPermission | None: result = await self.db.execute( select(MailboxPermission).where( MailboxPermission.user_id == user_id, MailboxPermission.mailbox_id == mailbox_id, ) ) return result.scalar_one_or_none() async def _mailbox_belongs_to_tenant( self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID ) -> bool: result = await self.db.execute( select(Mailbox.id).where( Mailbox.id == mailbox_id, Mailbox.tenant_id == tenant_id, Mailbox.status != "deleted", ) ) return result.scalar_one_or_none() is not None async def _check_vbox_mailbox_access( self, user_id: uuid.UUID, mailbox_id: uuid.UUID, tenant_id: uuid.UUID, ) -> bool: """ Verifica se l'utente ha accesso a una casella tramite Virtual Box. Restituisce True se l'utente è assegnato ad almeno una VBox attiva che include la casella specificata. """ from app.models.virtual_box import ( VirtualBox, VirtualBoxAssignment, virtual_box_mailboxes, ) result = await self.db.execute( select(VirtualBox.id) .join( VirtualBoxAssignment, VirtualBox.id == VirtualBoxAssignment.virtual_box_id, ) .join( virtual_box_mailboxes, VirtualBox.id == virtual_box_mailboxes.c.virtual_box_id, ) .where( VirtualBoxAssignment.user_id == user_id, virtual_box_mailboxes.c.mailbox_id == mailbox_id, VirtualBox.tenant_id == tenant_id, VirtualBox.is_active == True, ) .limit(1) ) return result.scalar_one_or_none() is not None