Files
2026-03-19 18:06:44 +01:00

143 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Servizio tenant gestione organizzazioni (solo super_admin).
"""
import uuid
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictError, NotFoundError
from app.core.security import hash_password
from app.models.mailbox import Mailbox
from app.models.tenant import Tenant
from app.models.user import User
from app.schemas.tenant import TenantCreateRequest, TenantResponse, TenantUpdateRequest
class TenantService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create_tenant(self, data: TenantCreateRequest) -> tuple[Tenant, User]:
"""Crea un nuovo tenant con il suo utente admin iniziale."""
# Verifica slug univoco
existing = await self.db.execute(
select(Tenant).where(Tenant.slug == data.slug)
)
if existing.scalar_one_or_none():
raise ConflictError(f"Slug '{data.slug}' già in uso")
tenant = Tenant(
slug=data.slug,
name=data.name,
plan=data.plan,
max_mailboxes=data.max_mailboxes,
max_users=data.max_users,
)
self.db.add(tenant)
await self.db.flush() # ottieni tenant.id
# Crea utente admin iniziale
admin = User(
tenant_id=tenant.id,
email=data.admin_email.lower(),
password_hash=hash_password(data.admin_password),
full_name=data.admin_full_name,
role="admin",
)
self.db.add(admin)
await self.db.flush()
return tenant, admin
async def get_tenant(self, tenant_id: uuid.UUID) -> Tenant:
tenant = await self.db.get(Tenant, tenant_id)
if not tenant:
raise NotFoundError("tenant")
return tenant
async def get_tenant_with_stats(self, tenant_id: uuid.UUID) -> TenantResponse:
"""Restituisce il tenant con conteggi utenti e caselle."""
tenant = await self.get_tenant(tenant_id)
user_count = (
await self.db.execute(
select(func.count(User.id)).where(
User.tenant_id == tenant_id,
User.is_active == True, # noqa: E712
)
)
).scalar_one()
mailbox_count = (
await self.db.execute(
select(func.count(Mailbox.id)).where(
Mailbox.tenant_id == tenant_id,
Mailbox.status != "deleted",
)
)
).scalar_one()
resp = TenantResponse.model_validate(tenant)
resp.user_count = user_count
resp.mailbox_count = mailbox_count
return resp
async def list_tenants(self) -> list[Tenant]:
result = await self.db.execute(
select(Tenant).order_by(Tenant.created_at.desc())
)
return list(result.scalars().all())
async def list_tenants_with_stats(self) -> list[TenantResponse]:
"""
Restituisce tutti i tenant con conteggi utenti e caselle in una
singola query efficiente (LEFT JOIN con GROUP BY).
"""
stmt = (
select(
Tenant,
func.count(User.id.distinct()).label("user_count"),
func.count(Mailbox.id.distinct()).label("mailbox_count"),
)
.outerjoin(
User,
(User.tenant_id == Tenant.id) & (User.is_active == True), # noqa: E712
)
.outerjoin(
Mailbox,
(Mailbox.tenant_id == Tenant.id) & (Mailbox.status != "deleted"),
)
.group_by(Tenant.id)
.order_by(Tenant.created_at.desc())
)
rows = (await self.db.execute(stmt)).all()
result = []
for row in rows:
tenant_obj, user_count, mailbox_count = row
resp = TenantResponse.model_validate(tenant_obj)
resp.user_count = user_count or 0
resp.mailbox_count = mailbox_count or 0
result.append(resp)
return result
async def update_tenant(
self, tenant_id: uuid.UUID, data: TenantUpdateRequest
) -> Tenant:
tenant = await self.get_tenant(tenant_id)
if data.name is not None:
tenant.name = data.name
if data.plan is not None:
tenant.plan = data.plan
if data.is_active is not None:
tenant.is_active = data.is_active
if data.max_mailboxes is not None:
tenant.max_mailboxes = data.max_mailboxes
if data.max_users is not None:
tenant.max_users = data.max_users
return tenant