Files
2026-03-27 14:58:12 +01:00

178 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Servizio utenti CRUD utenti per admin del tenant.
"""
import math
import uuid
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError, TenantLimitExceededError
from app.core.security import hash_password
from app.models.tenant import Tenant
from app.models.user import User
from app.schemas.user import UserCreateRequest, UserUpdateRequest
from app.services.audit_service import log_audit
class UserService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def create_user(
self,
tenant_id: uuid.UUID,
data: UserCreateRequest,
created_by: User,
) -> User:
"""Crea un nuovo utente nel tenant. Solo admin può farlo."""
# Verifica limite utenti del piano
tenant = await self.db.get(Tenant, tenant_id)
if not tenant:
raise NotFoundError("tenant")
user_count_result = await self.db.execute(
select(func.count()).where(User.tenant_id == tenant_id, User.is_active == True)
)
count = user_count_result.scalar_one()
if count >= tenant.max_users:
raise TenantLimitExceededError("utenti", tenant.max_users)
# Verifica email univoca nel tenant
existing = await self.db.execute(
select(User).where(
User.tenant_id == tenant_id,
User.email == data.email.lower(),
)
)
if existing.scalar_one_or_none():
raise ConflictError(f"Email '{data.email}' già registrata in questo tenant")
# Un admin non può creare un super_admin
if data.role == "super_admin" and not created_by.is_super_admin:
raise ForbiddenError("Non puoi creare utenti super_admin")
user = User(
tenant_id=tenant_id,
email=data.email.lower(),
password_hash=hash_password(data.password),
full_name=data.full_name,
role=data.role,
)
self.db.add(user)
await self.db.flush() # ottieni l'ID
await log_audit(
self.db,
"user.created",
tenant_id=tenant_id,
user_id=created_by.id,
resource_type="user",
resource_id=user.id,
payload={"email": user.email, "role": user.role},
)
return user
async def get_user(self, user_id: uuid.UUID, tenant_id: uuid.UUID) -> User:
result = await self.db.execute(
select(User).where(User.id == user_id, User.tenant_id == tenant_id)
)
user = result.scalar_one_or_none()
if not user:
raise NotFoundError("utente")
return user
async def list_users(
self,
tenant_id: uuid.UUID,
page: int = 1,
page_size: int = 25,
) -> tuple[list[User], int]:
"""Restituisce lista utenti paginata + totale."""
offset = (page - 1) * page_size
total_result = await self.db.execute(
select(func.count()).where(User.tenant_id == tenant_id)
)
total = total_result.scalar_one()
users_result = await self.db.execute(
select(User)
.where(User.tenant_id == tenant_id)
.order_by(User.created_at.desc())
.offset(offset)
.limit(page_size)
)
users = list(users_result.scalars().all())
return users, total
async def update_user(
self,
user_id: uuid.UUID,
tenant_id: uuid.UUID,
data: UserUpdateRequest,
updated_by: User,
) -> User:
user = await self.get_user(user_id, tenant_id)
# Non si può modificare un super_admin
if user.is_super_admin and not updated_by.is_super_admin:
raise ForbiddenError("Non puoi modificare un super_admin")
changes: dict = {}
if data.full_name is not None:
changes["full_name"] = data.full_name
user.full_name = data.full_name
if data.role is not None:
changes["role"] = data.role
user.role = data.role
if data.is_active is not None:
changes["is_active"] = data.is_active
user.is_active = data.is_active
await log_audit(
self.db,
"user.updated",
tenant_id=tenant_id,
user_id=updated_by.id,
resource_type="user",
resource_id=user_id,
payload={"changes": changes},
)
return user
async def reset_password(
self,
user_id: uuid.UUID,
tenant_id: uuid.UUID,
new_password: str,
) -> None:
user = await self.get_user(user_id, tenant_id)
user.password_hash = hash_password(new_password)
async def delete_user(
self,
user_id: uuid.UUID,
tenant_id: uuid.UUID,
deleted_by: User,
) -> None:
user = await self.get_user(user_id, tenant_id)
if user.id == deleted_by.id:
raise ForbiddenError("Non puoi eliminare il tuo stesso account")
if user.is_super_admin:
raise ForbiddenError("Non puoi eliminare un super_admin")
# Soft delete (disabilita invece di eliminare)
user.is_active = False
await log_audit(
self.db,
"user.deleted",
tenant_id=tenant_id,
user_id=deleted_by.id,
resource_type="user",
resource_id=user_id,
payload={"email": user.email},
)