Multitenancy

This commit is contained in:
2026-03-19 18:06:44 +01:00
parent 106ed50361
commit e594defc00
15 changed files with 1090 additions and 37 deletions
@@ -0,0 +1,153 @@
"""RLS completa su tutte le tabelle applicative + ruolo pechub_app
Revision ID: 0006
Revises: 0005
Create Date: 2026-03-19 00:00:00.000000
Aggiunge:
- ENABLE ROW LEVEL SECURITY + policy USING (tenant_id = ...) su tutte le
tabelle applicative che mancavano (attachments, send_jobs, archival_batches,
archival_dips, labels, mailbox_permissions, virtual_boxes,
notification_channels, notification_rules, notification_log, tenant_settings)
- Crea il ruolo PostgreSQL applicativo 'pechub_app' (non-superuser) con
GRANT SELECT/INSERT/UPDATE/DELETE su tutte le tabelle.
Il ruolo viene usato dalla connessione applicativa (DATABASE_URL) mentre
Alembic continua ad usare l'utente superuser (DATABASE_URL_SYNC).
Nota: la migrazione usa DO $$ BEGIN ... EXCEPTION WHEN ... END $$ per
rendere idempotente la creazione del ruolo e delle policy.
"""
from alembic import op
revision = "0006"
down_revision = "0005"
branch_labels = None
depends_on = None
# Tabelle con tenant_id che devono avere RLS (escluse quelle già gestite in 0001)
_RLS_TABLES = [
"attachments",
"send_jobs",
"archival_batches",
"archival_dips",
"labels",
"mailbox_permissions",
"virtual_boxes",
"notification_channels",
"notification_rules",
"notification_log",
"tenant_settings",
]
# Tutte le tabelle su cui pechub_app ha SELECT/INSERT/UPDATE/DELETE
_ALL_APP_TABLES = [
"tenants",
"users",
"refresh_tokens",
"mailboxes",
"messages",
"attachments",
"send_jobs",
"archival_batches",
"archival_batch_messages",
"archival_dips",
"labels",
"message_labels",
"mailbox_permissions",
"virtual_boxes",
"virtual_box_rules",
"virtual_box_assignments",
"virtual_box_mailboxes",
"notification_channels",
"notification_rules",
"notification_log",
"audit_log",
"tenant_settings",
]
def upgrade() -> None:
# ── 1. Abilita RLS + policy su tabelle mancanti ───────────────────────────
for table in _RLS_TABLES:
op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY")
# Politica idempotente: elimina la policy se esiste e la ricrea
op.execute(
f"""
DO $$
BEGIN
-- Elimina policy esistente (se presente da un run precedente)
DROP POLICY IF EXISTS {table}_tenant_isolation ON {table};
-- Crea la nuova policy
CREATE POLICY {table}_tenant_isolation ON {table}
USING (
tenant_id = NULLIF(
current_setting('app.current_tenant_id', TRUE), ''
)::UUID
);
END
$$;
"""
)
# ── 2. Crea ruolo applicativo pechub_app (non-superuser) ──────────────────
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_roles WHERE rolname = 'pechub_app'
) THEN
CREATE USER pechub_app WITH PASSWORD 'pechub_app_password'
NOSUPERUSER NOCREATEDB NOCREATEROLE;
RAISE NOTICE 'Ruolo pechub_app creato';
ELSE
RAISE NOTICE 'Ruolo pechub_app già esistente skip';
END IF;
END
$$;
"""
)
# ── 3. Permessi al ruolo applicativo ──────────────────────────────────────
op.execute("GRANT CONNECT ON DATABASE pechub TO pechub_app")
op.execute("GRANT USAGE ON SCHEMA public TO pechub_app")
for table in _ALL_APP_TABLES:
op.execute(
f"GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE {table} TO pechub_app"
)
# Sequenze (per BIGSERIAL come audit_log.id)
op.execute("GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO pechub_app")
# Default privileges: le tabelle create nelle migrazioni future ricevono
# automaticamente i permessi per pechub_app
op.execute(
"""
ALTER DEFAULT PRIVILEGES FOR ROLE pechub IN SCHEMA public
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO pechub_app;
"""
)
op.execute(
"""
ALTER DEFAULT PRIVILEGES FOR ROLE pechub IN SCHEMA public
GRANT USAGE, SELECT ON SEQUENCES TO pechub_app;
"""
)
def downgrade() -> None:
# Rimuovi policy e disabilita RLS
for table in _RLS_TABLES:
op.execute(
f"DROP POLICY IF EXISTS {table}_tenant_isolation ON {table}"
)
op.execute(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY")
# Revoca permessi e rimuovi ruolo
op.execute("REVOKE ALL ON ALL TABLES IN SCHEMA public FROM pechub_app")
op.execute("REVOKE ALL ON ALL SEQUENCES IN SCHEMA public FROM pechub_app")
op.execute("REVOKE USAGE ON SCHEMA public FROM pechub_app")
op.execute("REVOKE CONNECT ON DATABASE pechub FROM pechub_app")
op.execute("DROP USER IF EXISTS pechub_app")
+20 -13
View File
@@ -1,36 +1,44 @@
"""
Router tenant gestione organizzazioni (solo super_admin).
Protezione doppia:
1. require_super_admin JWT con ruolo super_admin
2. verify_admin_key Header X-Admin-Key (se configurata in produzione)
Endpoint:
GET /api/v1/tenants → lista tenant
GET /api/v1/tenants → lista tenant con statistiche
POST /api/v1/tenants → crea tenant + admin
GET /api/v1/tenants/{id} → dettaglio tenant
PATCH /api/v1/tenants/{id} → modifica tenant
GET /api/v1/tenants/{id} → dettaglio tenant con statistiche
PATCH /api/v1/tenants/{id} → modifica tenant (incluso is_active per sospensione)
"""
import uuid
from typing import Annotated
from fastapi import APIRouter
from fastapi import APIRouter, Depends
from app.dependencies import SuperAdminUser, DB
from app.dependencies import DB, SuperAdminUser, verify_admin_key
from app.schemas.tenant import TenantCreateRequest, TenantResponse, TenantUpdateRequest
from app.services.tenant_service import TenantService
router = APIRouter(prefix="/tenants", tags=["Tenant (super-admin)"])
router = APIRouter(
prefix="/tenants",
tags=["Tenant (super-admin)"],
dependencies=[Depends(verify_admin_key)], # X-Admin-Key su tutti gli endpoint
)
@router.get(
"",
response_model=list[TenantResponse],
summary="Lista tutti i tenant",
summary="Lista tutti i tenant con statistiche",
)
async def list_tenants(
_: SuperAdminUser,
db: DB,
) -> list[TenantResponse]:
service = TenantService(db)
tenants = await service.list_tenants()
return [TenantResponse.model_validate(t) for t in tenants]
return await service.list_tenants_with_stats()
@router.post(
@@ -52,7 +60,7 @@ async def create_tenant(
@router.get(
"/{tenant_id}",
response_model=TenantResponse,
summary="Dettaglio tenant",
summary="Dettaglio tenant con statistiche",
)
async def get_tenant(
tenant_id: uuid.UUID,
@@ -60,14 +68,13 @@ async def get_tenant(
db: DB,
) -> TenantResponse:
service = TenantService(db)
tenant = await service.get_tenant(tenant_id)
return TenantResponse.model_validate(tenant)
return await service.get_tenant_with_stats(tenant_id)
@router.patch(
"/{tenant_id}",
response_model=TenantResponse,
summary="Modifica tenant",
summary="Modifica tenant (nome, piano, limiti, sospensione)",
)
async def update_tenant(
tenant_id: uuid.UUID,
+5
View File
@@ -47,6 +47,11 @@ class Settings(BaseSettings):
minio_bucket: str = "pechub"
minio_use_ssl: bool = False
# ── Admin sicurezza ───────────────────────────────────────────────────────
# Header X-Admin-Key richiesto sugli endpoint /api/v1/tenants
# Se vuoto → protezione disabilitata (solo sviluppo)
admin_secret_key: str = ""
# ── CORS ──────────────────────────────────────────────────────────────────
cors_origins: str = "http://localhost:3000,http://localhost:5173"
+8
View File
@@ -53,6 +53,14 @@ class AccountDisabledError(HTTPException):
)
class TenantSuspendedError(HTTPException):
def __init__(self) -> None:
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant sospeso",
)
class TOTPRequiredError(HTTPException):
def __init__(self) -> None:
super().__init__(
+55 -4
View File
@@ -1,19 +1,21 @@
"""
Dependency FastAPI get_db, get_current_user, require_admin, RLS middleware.
Dependency FastAPI get_db, get_current_user, get_current_tenant, role guards.
"""
import uuid
from typing import Annotated
from fastapi import Depends, Request
from fastapi import Depends, Header, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ForbiddenError, TokenInvalidError
from app.config import get_settings
from app.core.exceptions import ForbiddenError, TenantSuspendedError, TokenInvalidError
from app.core.security import decode_token
from app.database import get_db
from app.models.tenant import Tenant
from app.models.user import User
from sqlalchemy import select
@@ -58,7 +60,7 @@ async def get_current_user(
) -> User:
"""
Estrae e valida il JWT dall'header Authorization: Bearer <token>.
Carica l'utente dal DB e imposta RLS.
Carica il tenant (verifica is_active), l'utente dal DB e imposta RLS.
"""
token = credentials.credentials
@@ -85,6 +87,13 @@ async def get_current_user(
# Imposta RLS per questo tenant (no-op su SQLite/test)
await _set_rls_tenant_id(db, tenant_id)
# Verifica tenant esiste e non è sospeso
tenant = await db.get(Tenant, tenant_id)
if not tenant:
raise TokenInvalidError()
if not tenant.is_active:
raise TenantSuspendedError()
# Carica utente
result = await db.execute(
select(User).where(User.id == user_id, User.tenant_id == tenant_id)
@@ -98,9 +107,32 @@ async def get_current_user(
from app.core.exceptions import AccountDisabledError
raise AccountDisabledError()
# Attacca il tenant all'utente per uso nei router (evita doppio caricamento)
user._current_tenant = tenant # type: ignore[attr-defined]
return user
# ─── Tenant corrente ──────────────────────────────────────────────────────────
async def get_current_tenant(
current_user: Annotated[User, Depends(get_current_user)],
db: AsyncSession = Depends(get_db),
) -> Tenant:
"""
Restituisce l'oggetto Tenant dell'utente autenticato.
Il Tenant è già stato validato in get_current_user() questa dependency
utilizza la cache della sessione SQLAlchemy (identity map) per evitare
un secondo accesso al DB.
"""
# Usa la cache della sessione ORM (non emette una seconda query)
tenant = await db.get(Tenant, current_user.tenant_id)
if not tenant:
raise TokenInvalidError()
return tenant
# ─── Role guards ──────────────────────────────────────────────────────────────
async def require_admin(
@@ -121,8 +153,27 @@ async def require_super_admin(
return current_user
# ─── Protezione endpoint admin con X-Admin-Key header ─────────────────────────
async def verify_admin_key(
x_admin_key: str = Header(default="", alias="X-Admin-Key"),
) -> None:
"""
Verifica l'header X-Admin-Key per gli endpoint di amministrazione tenant.
Se ADMIN_SECRET_KEY non è configurata (ambiente di sviluppo), il check
viene saltato per facilitare lo sviluppo locale.
"""
settings = get_settings()
if not settings.admin_secret_key:
# Sviluppo: nessuna chiave configurata → accesso libero
return
if x_admin_key != settings.admin_secret_key:
raise ForbiddenError("X-Admin-Key non valida o mancante")
# ─── Tipo annotato per ridurre boilerplate negli endpoint ─────────────────────
CurrentUser = Annotated[User, Depends(get_current_user)]
CurrentTenant = Annotated[Tenant, Depends(get_current_tenant)]
AdminUser = Annotated[User, Depends(require_admin)]
SuperAdminUser = Annotated[User, Depends(require_super_admin)]
DB = Annotated[AsyncSession, Depends(get_db)]
+4
View File
@@ -51,4 +51,8 @@ class TenantResponse(BaseModel):
created_at: datetime
updated_at: datetime
# Statistiche opzionali (popolate dalla lista)
user_count: int = 0
mailbox_count: int = 0
model_config = {"from_attributes": True}
+63 -2
View File
@@ -4,14 +4,15 @@ Servizio tenant gestione organizzazioni (solo super_admin).
import uuid
from sqlalchemy import select
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictError, NotFoundError
from app.core.security import hash_password
from app.models.mailbox import Mailbox
from app.models.tenant import Tenant
from app.models.user import User
from app.schemas.tenant import TenantCreateRequest, TenantUpdateRequest
from app.schemas.tenant import TenantCreateRequest, TenantResponse, TenantUpdateRequest
class TenantService:
@@ -56,12 +57,72 @@ class TenantService:
raise NotFoundError("tenant")
return tenant
async def get_tenant_with_stats(self, tenant_id: uuid.UUID) -> TenantResponse:
"""Restituisce il tenant con conteggi utenti e caselle."""
tenant = await self.get_tenant(tenant_id)
user_count = (
await self.db.execute(
select(func.count(User.id)).where(
User.tenant_id == tenant_id,
User.is_active == True, # noqa: E712
)
)
).scalar_one()
mailbox_count = (
await self.db.execute(
select(func.count(Mailbox.id)).where(
Mailbox.tenant_id == tenant_id,
Mailbox.status != "deleted",
)
)
).scalar_one()
resp = TenantResponse.model_validate(tenant)
resp.user_count = user_count
resp.mailbox_count = mailbox_count
return resp
async def list_tenants(self) -> list[Tenant]:
result = await self.db.execute(
select(Tenant).order_by(Tenant.created_at.desc())
)
return list(result.scalars().all())
async def list_tenants_with_stats(self) -> list[TenantResponse]:
"""
Restituisce tutti i tenant con conteggi utenti e caselle in una
singola query efficiente (LEFT JOIN con GROUP BY).
"""
stmt = (
select(
Tenant,
func.count(User.id.distinct()).label("user_count"),
func.count(Mailbox.id.distinct()).label("mailbox_count"),
)
.outerjoin(
User,
(User.tenant_id == Tenant.id) & (User.is_active == True), # noqa: E712
)
.outerjoin(
Mailbox,
(Mailbox.tenant_id == Tenant.id) & (Mailbox.status != "deleted"),
)
.group_by(Tenant.id)
.order_by(Tenant.created_at.desc())
)
rows = (await self.db.execute(stmt)).all()
result = []
for row in rows:
tenant_obj, user_count, mailbox_count = row
resp = TenantResponse.model_validate(tenant_obj)
resp.user_count = user_count or 0
resp.mailbox_count = mailbox_count or 0
result.append(resp)
return result
async def update_tenant(
self, tenant_id: uuid.UUID, data: TenantUpdateRequest
) -> Tenant: