""" Dependency FastAPI – get_db, get_current_user, get_current_tenant, role guards. """ import uuid from typing import Annotated from fastapi import Depends, Header, Request from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from app.config import get_settings from app.core.exceptions import ForbiddenError, TenantSuspendedError, TokenInvalidError from app.core.security import decode_token from app.database import get_db from app.models.tenant import Tenant from app.models.user import User from sqlalchemy import select security = HTTPBearer() # ─── Database con RLS ───────────────────────────────────────────────────────── async def _set_rls_tenant_id(db: AsyncSession, tenant_id: uuid.UUID) -> None: """ Imposta la variabile di sessione PostgreSQL per RLS. È un no-op su SQLite (test environment) poiché SQLite non supporta il comando SET LOCAL né il concetto di Row Level Security. """ try: await db.execute( text(f"SET LOCAL app.current_tenant_id = '{tenant_id!s}'") ) except Exception: # SQLite (usato nei test di integrazione) non supporta SET LOCAL. # In produzione (PostgreSQL) questo comando funziona sempre. pass async def get_db_with_rls( tenant_id: uuid.UUID, db: AsyncSession = Depends(get_db), ) -> AsyncSession: """ Imposta la variabile di sessione PostgreSQL per RLS. Da usare dopo aver estratto il tenant_id dall'utente autenticato. """ await _set_rls_tenant_id(db, tenant_id) return db # ─── Utente corrente ────────────────────────────────────────────────────────── async def get_current_user( credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], db: AsyncSession = Depends(get_db), ) -> User: """ Estrae e valida il JWT dall'header Authorization: Bearer . Carica il tenant (verifica is_active), l'utente dal DB e imposta RLS. """ token = credentials.credentials try: payload = decode_token(token) except JWTError: raise TokenInvalidError() if payload.get("type") != "access": raise TokenInvalidError() user_id_str = payload.get("sub") tenant_id_str = payload.get("tid") if not user_id_str or not tenant_id_str: raise TokenInvalidError() try: user_id = uuid.UUID(user_id_str) tenant_id = uuid.UUID(tenant_id_str) except ValueError: raise TokenInvalidError() # Imposta RLS per questo tenant (no-op su SQLite/test) await _set_rls_tenant_id(db, tenant_id) # Verifica tenant esiste e non è sospeso tenant = await db.get(Tenant, tenant_id) if not tenant: raise TokenInvalidError() if not tenant.is_active: raise TenantSuspendedError() # Carica utente result = await db.execute( select(User).where(User.id == user_id, User.tenant_id == tenant_id) ) user = result.scalar_one_or_none() if not user: raise TokenInvalidError() if not user.is_active: from app.core.exceptions import AccountDisabledError raise AccountDisabledError() # Attacca il tenant all'utente per uso nei router (evita doppio caricamento) user._current_tenant = tenant # type: ignore[attr-defined] return user # ─── Tenant corrente ────────────────────────────────────────────────────────── async def get_current_tenant( current_user: Annotated[User, Depends(get_current_user)], db: AsyncSession = Depends(get_db), ) -> Tenant: """ Restituisce l'oggetto Tenant dell'utente autenticato. Il Tenant è già stato validato in get_current_user() – questa dependency utilizza la cache della sessione SQLAlchemy (identity map) per evitare un secondo accesso al DB. """ # Usa la cache della sessione ORM (non emette una seconda query) tenant = await db.get(Tenant, current_user.tenant_id) if not tenant: raise TokenInvalidError() return tenant # ─── Role guards ────────────────────────────────────────────────────────────── async def require_admin( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """Richiede ruolo admin o super_admin.""" if not current_user.is_admin: raise ForbiddenError("Richiesto ruolo amministratore") return current_user async def require_super_admin( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """Richiede ruolo super_admin.""" if not current_user.is_super_admin: raise ForbiddenError("Richiesto ruolo super_admin") return current_user async def require_supervisor_or_admin( current_user: Annotated[User, Depends(get_current_user)], ) -> User: """ Richiede ruolo supervisor, admin o super_admin. Il supervisor ha accesso in lettura implicito a tutte le caselle del tenant ma non puo' gestire la configurazione (caselle, utenti, permessi, impostazioni). """ if not current_user.is_supervisor_or_admin: raise ForbiddenError("Richiesto ruolo supervisore o amministratore") return current_user # ─── Protezione endpoint admin con X-Admin-Key header ───────────────────────── async def verify_admin_key( x_admin_key: str = Header(default="", alias="X-Admin-Key"), ) -> None: """ Verifica l'header X-Admin-Key per gli endpoint di amministrazione tenant. Se ADMIN_SECRET_KEY non è configurata (ambiente di sviluppo), il check viene saltato per facilitare lo sviluppo locale. """ settings = get_settings() if not settings.admin_secret_key: # Sviluppo: nessuna chiave configurata → accesso libero return if x_admin_key != settings.admin_secret_key: raise ForbiddenError("X-Admin-Key non valida o mancante") # ─── Tipo annotato per ridurre boilerplate negli endpoint ───────────────────── CurrentUser = Annotated[User, Depends(get_current_user)] CurrentTenant = Annotated[Tenant, Depends(get_current_tenant)] AdminUser = Annotated[User, Depends(require_admin)] SuperAdminUser = Annotated[User, Depends(require_super_admin)] SupervisorOrAdminUser = Annotated[User, Depends(require_supervisor_or_admin)] DB = Annotated[AsyncSession, Depends(get_db)]