""" Servizio permessi granulari – gestione accessi utente × casella (Fase 1-A). Gerarchia: super_admin / admin → accesso implicito a tutto (no record in mailbox_permissions) supervisor / operator / readonly → richiedono record esplicito """ import uuid from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError, PermissionDeniedError from app.models.mailbox import Mailbox from app.models.permission import MailboxPermission from app.models.user import User from app.schemas.permission import PermissionGrantRequest class PermissionService: def __init__(self, db: AsyncSession) -> None: self.db = db # ─── Verifica accessi ───────────────────────────────────────────────────── async def get_visible_mailboxes( self, user: User ) -> list[uuid.UUID]: """Restituisce gli UUID delle caselle visibili all'utente.""" if user.role in ("super_admin", "admin"): # Admin vede tutte le caselle del tenant result = await self.db.execute( select(Mailbox.id).where( Mailbox.tenant_id == user.tenant_id, Mailbox.status != "deleted", ) ) return [row[0] for row in result.all()] # Operatori: solo caselle con can_read=True result = await self.db.execute( select(MailboxPermission.mailbox_id).where( MailboxPermission.user_id == user.id, MailboxPermission.can_read == True, ) ) return [row[0] for row in result.all()] async def check_can_read( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente può leggere i messaggi della casella.""" if user.role in ("super_admin", "admin"): # Verifica solo che la casella appartenga al tenant return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_read async def check_can_send( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente può inviare dalla casella.""" if user.role in ("super_admin", "admin"): return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_send async def check_can_manage( self, user: User, mailbox_id: uuid.UUID ) -> bool: """Verifica se l'utente può gestire la configurazione della casella.""" if user.role in ("super_admin", "admin"): return await self._mailbox_belongs_to_tenant(mailbox_id, user.tenant_id) perm = await self._get_permission(user.id, mailbox_id) return perm is not None and perm.can_manage async def require_can_read(self, user: User, mailbox_id: uuid.UUID) -> None: """Solleva 403 se l'utente non può leggere.""" if not await self.check_can_read(user, mailbox_id): raise PermissionDeniedError("casella") async def require_can_send(self, user: User, mailbox_id: uuid.UUID) -> None: if not await self.check_can_send(user, mailbox_id): raise PermissionDeniedError("casella (invio)") # ─── CRUD permessi ──────────────────────────────────────────────────────── async def grant_permission( self, tenant_id: uuid.UUID, mailbox_id: uuid.UUID, user_id: uuid.UUID, data: PermissionGrantRequest, granted_by: User, ) -> MailboxPermission: """ Crea o aggiorna un permesso utente su una casella. Solo admin può gestire i permessi. """ if not granted_by.is_admin: raise ForbiddenError("Solo gli amministratori possono gestire i permessi") # Verifica che casella e utente appartengano al tenant mailbox = await self.db.get(Mailbox, mailbox_id) if not mailbox or mailbox.tenant_id != tenant_id: raise NotFoundError("casella") target_user = await self.db.get(User, user_id) if not target_user or target_user.tenant_id != tenant_id: raise NotFoundError("utente") # Non serve permesso esplicito per admin if target_user.role in ("super_admin", "admin"): raise ForbiddenError("Gli admin hanno già accesso implicito a tutte le caselle") # Cerca permesso esistente (upsert) existing = await self._get_permission(user_id, mailbox_id) if existing: existing.can_read = data.can_read existing.can_send = data.can_send existing.can_manage = data.can_manage existing.granted_by = granted_by.id return existing perm = MailboxPermission( tenant_id=tenant_id, user_id=user_id, mailbox_id=mailbox_id, can_read=data.can_read, can_send=data.can_send, can_manage=data.can_manage, granted_by=granted_by.id, ) self.db.add(perm) await self.db.flush() return perm async def revoke_permission( self, mailbox_id: uuid.UUID, user_id: uuid.UUID, revoked_by: User, ) -> None: if not revoked_by.is_admin: raise ForbiddenError("Solo gli amministratori possono revocare i permessi") result = await self.db.execute( delete(MailboxPermission).where( MailboxPermission.mailbox_id == mailbox_id, MailboxPermission.user_id == user_id, ) ) if result.rowcount == 0: raise NotFoundError("permesso") async def list_mailbox_users( self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID ) -> list[dict]: """Ritorna tutti gli utenti con permesso esplicito su questa casella.""" result = await self.db.execute( select(MailboxPermission, User) .join(User, MailboxPermission.user_id == User.id) .where( MailboxPermission.mailbox_id == mailbox_id, MailboxPermission.tenant_id == tenant_id, ) ) rows = result.all() return [ { "user_id": perm.user_id, "user_email": user.email, "user_full_name": user.full_name, "user_role": user.role, "can_read": perm.can_read, "can_send": perm.can_send, "can_manage": perm.can_manage, "granted_at": perm.granted_at, } for perm, user in rows ] async def list_user_mailboxes( self, user_id: uuid.UUID, tenant_id: uuid.UUID ) -> list[dict]: """Ritorna tutte le caselle accessibili a un utente (permessi espliciti).""" result = await self.db.execute( select(MailboxPermission, Mailbox) .join(Mailbox, MailboxPermission.mailbox_id == Mailbox.id) .where( MailboxPermission.user_id == user_id, MailboxPermission.tenant_id == tenant_id, MailboxPermission.can_read == True, ) ) rows = result.all() return [ { "mailbox_id": perm.mailbox_id, "mailbox_email": mailbox.email_address, "mailbox_display_name": mailbox.display_name, "can_read": perm.can_read, "can_send": perm.can_send, "can_manage": perm.can_manage, } for perm, mailbox in rows ] # ─── Private ────────────────────────────────────────────────────────────── async def _get_permission( self, user_id: uuid.UUID, mailbox_id: uuid.UUID ) -> MailboxPermission | None: result = await self.db.execute( select(MailboxPermission).where( MailboxPermission.user_id == user_id, MailboxPermission.mailbox_id == mailbox_id, ) ) return result.scalar_one_or_none() async def _mailbox_belongs_to_tenant( self, mailbox_id: uuid.UUID, tenant_id: uuid.UUID ) -> bool: result = await self.db.execute( select(Mailbox.id).where( Mailbox.id == mailbox_id, Mailbox.tenant_id == tenant_id, Mailbox.status != "deleted", ) ) return result.scalar_one_or_none() is not None