Files
PecHub/backend/app/services/virtual_box_service.py
T
2026-03-19 11:41:10 +01:00

349 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Servizio Virtual Box CRUD, gestione assegnazioni utente e caselle reali.
"""
import uuid
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.exceptions import ConflictError, ForbiddenError, NotFoundError
from app.models.mailbox import Mailbox
from app.models.user import User
from app.models.virtual_box import VirtualBox, VirtualBoxAssignment, VirtualBoxRule
from app.schemas.virtual_box import (
AssignedUserResponse,
VirtualBoxCreate,
VirtualBoxResponse,
VirtualBoxRuleResponse,
VirtualBoxUpdate,
)
class VirtualBoxService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ─── CRUD VirtualBox ──────────────────────────────────────────────────────
async def create(
self,
tenant_id: uuid.UUID,
data: VirtualBoxCreate,
created_by: uuid.UUID,
) -> VirtualBox:
# Verifica unicità nome nel tenant
existing = await self.db.execute(
select(VirtualBox).where(
VirtualBox.tenant_id == tenant_id,
VirtualBox.name == data.name,
)
)
if existing.scalar_one_or_none():
raise ConflictError(f"Virtual Box con nome '{data.name}' già esistente")
vbox = VirtualBox(
tenant_id=tenant_id,
name=data.name,
description=data.description,
label=data.label,
created_by=created_by,
)
self.db.add(vbox)
await self.db.flush()
# Crea le regole
for rule_data in data.rules:
rule = VirtualBoxRule(
virtual_box_id=vbox.id,
field=rule_data.field,
operator=rule_data.operator,
value=rule_data.value,
date_from=rule_data.date_from,
date_to=rule_data.date_to,
)
self.db.add(rule)
# Associa le caselle reali
if data.mailbox_ids:
mailboxes = await self._load_mailboxes(data.mailbox_ids, tenant_id)
vbox.mailboxes = mailboxes
await self.db.flush()
return await self._load_full(vbox.id)
async def list_vboxes(
self,
tenant_id: uuid.UUID,
page: int = 1,
page_size: int = 20,
active_only: bool = False,
) -> tuple[list[VirtualBox], int]:
query = select(VirtualBox).where(VirtualBox.tenant_id == tenant_id)
if active_only:
query = query.where(VirtualBox.is_active == True)
query = query.order_by(VirtualBox.created_at.desc())
count_result = await self.db.execute(
select(func.count()).select_from(query.subquery())
)
total = count_result.scalar_one()
query = query.offset((page - 1) * page_size).limit(page_size)
query = query.options(
selectinload(VirtualBox.rules),
selectinload(VirtualBox.assignments),
selectinload(VirtualBox.mailboxes),
)
result = await self.db.execute(query)
items = list(result.scalars().all())
return items, total
async def get(self, vbox_id: uuid.UUID, tenant_id: uuid.UUID) -> VirtualBox:
vbox = await self._load_full(vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
return vbox
async def update(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
data: VirtualBoxUpdate,
) -> VirtualBox:
vbox = await self._load_full(vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
if data.name is not None:
# Verifica unicità nuovo nome
existing = await self.db.execute(
select(VirtualBox).where(
VirtualBox.tenant_id == tenant_id,
VirtualBox.name == data.name,
VirtualBox.id != vbox_id,
)
)
if existing.scalar_one_or_none():
raise ConflictError(f"Virtual Box con nome '{data.name}' già esistente")
vbox.name = data.name
if data.description is not None:
vbox.description = data.description
if data.label is not None:
vbox.label = data.label
if data.is_active is not None:
vbox.is_active = data.is_active
# Aggiorna le caselle associate se fornito
if data.mailbox_ids is not None:
mailboxes = await self._load_mailboxes(data.mailbox_ids, tenant_id)
vbox.mailboxes = mailboxes
await self.db.flush()
return await self._load_full(vbox_id)
async def delete(self, vbox_id: uuid.UUID, tenant_id: uuid.UUID) -> None:
vbox = await self.db.get(VirtualBox, vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
await self.db.delete(vbox)
# ─── Gestione Regole ─────────────────────────────────────────────────────
async def replace_rules(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
rules_data: list,
) -> VirtualBox:
"""Sostituisce tutte le regole di una VBox."""
vbox = await self.db.get(VirtualBox, vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
# Rimuovi regole esistenti
await self.db.execute(
delete(VirtualBoxRule).where(VirtualBoxRule.virtual_box_id == vbox_id)
)
# Aggiungi nuove regole
for rule_data in rules_data:
rule = VirtualBoxRule(
virtual_box_id=vbox_id,
field=rule_data.field,
operator=rule_data.operator,
value=rule_data.value,
date_from=rule_data.date_from,
date_to=rule_data.date_to,
)
self.db.add(rule)
await self.db.flush()
return await self._load_full(vbox_id)
# ─── Gestione Caselle Reali ───────────────────────────────────────────────
async def set_mailboxes(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
mailbox_ids: list[uuid.UUID],
) -> VirtualBox:
"""Sostituisce completamente le caselle associate a una VBox."""
vbox = await self._load_full(vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
mailboxes = await self._load_mailboxes(mailbox_ids, tenant_id)
vbox.mailboxes = mailboxes
await self.db.flush()
return await self._load_full(vbox_id)
async def list_mailboxes(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> list[Mailbox]:
"""Restituisce le caselle associate a una VBox."""
vbox = await self._load_full(vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
return vbox.mailboxes or []
# ─── Gestione Assegnazioni ────────────────────────────────────────────────
async def assign_users(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
user_ids: list[uuid.UUID],
assigned_by: uuid.UUID,
) -> list[VirtualBoxAssignment]:
vbox = await self.db.get(VirtualBox, vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
# Recupera assegnazioni esistenti
existing_result = await self.db.execute(
select(VirtualBoxAssignment.user_id).where(
VirtualBoxAssignment.virtual_box_id == vbox_id
)
)
existing_user_ids = {row[0] for row in existing_result.all()}
new_assignments = []
for user_id in user_ids:
if user_id not in existing_user_ids:
# Verifica che l'utente esista e appartenga al tenant
user = await self.db.get(User, user_id)
if not user or user.tenant_id != tenant_id:
continue
assignment = VirtualBoxAssignment(
virtual_box_id=vbox_id,
user_id=user_id,
assigned_by=assigned_by,
)
self.db.add(assignment)
new_assignments.append(assignment)
await self.db.flush()
return new_assignments
async def unassign_user(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
user_id: uuid.UUID,
) -> None:
vbox = await self.db.get(VirtualBox, vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
result = await self.db.execute(
delete(VirtualBoxAssignment).where(
VirtualBoxAssignment.virtual_box_id == vbox_id,
VirtualBoxAssignment.user_id == user_id,
)
)
if result.rowcount == 0:
raise NotFoundError("assegnazione")
async def list_assigned_users(
self,
vbox_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> list[dict]:
vbox = await self.db.get(VirtualBox, vbox_id)
if not vbox or vbox.tenant_id != tenant_id:
raise NotFoundError("Virtual Box")
result = await self.db.execute(
select(VirtualBoxAssignment, User)
.join(User, VirtualBoxAssignment.user_id == User.id)
.where(VirtualBoxAssignment.virtual_box_id == vbox_id)
)
return [
{
"user_id": assignment.user_id,
"user_email": user.email,
"user_full_name": user.full_name,
"assigned_at": assignment.assigned_at,
}
for assignment, user in result.all()
]
async def list_user_vboxes(
self,
user_id: uuid.UUID,
tenant_id: uuid.UUID,
) -> list[VirtualBox]:
"""Restituisce le VBox assegnate a un utente specifico."""
result = await self.db.execute(
select(VirtualBox)
.join(VirtualBoxAssignment, VirtualBox.id == VirtualBoxAssignment.virtual_box_id)
.where(
VirtualBoxAssignment.user_id == user_id,
VirtualBox.tenant_id == tenant_id,
VirtualBox.is_active == True,
)
.options(
selectinload(VirtualBox.rules),
selectinload(VirtualBox.mailboxes),
selectinload(VirtualBox.assignments), # necessario per _to_response()
)
)
return list(result.scalars().all())
# ─── Private ─────────────────────────────────────────────────────────────
async def _load_full(self, vbox_id: uuid.UUID) -> VirtualBox | None:
result = await self.db.execute(
select(VirtualBox)
.where(VirtualBox.id == vbox_id)
.options(
selectinload(VirtualBox.rules),
selectinload(VirtualBox.assignments),
selectinload(VirtualBox.mailboxes),
)
)
return result.scalar_one_or_none()
async def _load_mailboxes(
self,
mailbox_ids: list[uuid.UUID],
tenant_id: uuid.UUID,
) -> list[Mailbox]:
"""Carica le mailbox dal DB, filtrando per tenant."""
if not mailbox_ids:
return []
result = await self.db.execute(
select(Mailbox).where(
Mailbox.id.in_(mailbox_ids),
Mailbox.tenant_id == tenant_id,
)
)
return list(result.scalars().all())