mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
"""
|
|
Service per la gestione dei template messaggi (Feature 1).
|
|
"""
|
|
|
|
import uuid
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.exceptions import ConflictError, NotFoundError
|
|
from app.models.template import MessageTemplate
|
|
from app.schemas.template import TemplateCreate, TemplateUpdate
|
|
|
|
|
|
class TemplateService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def list_templates(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
q: str | None = None,
|
|
) -> tuple[list[MessageTemplate], int]:
|
|
query = select(MessageTemplate).where(MessageTemplate.tenant_id == tenant_id)
|
|
if q:
|
|
query = query.where(MessageTemplate.name.ilike(f"%{q}%"))
|
|
query = query.order_by(MessageTemplate.name)
|
|
|
|
count_q = select(func.count()).select_from(query.subquery())
|
|
total = (await self.db.execute(count_q)).scalar_one()
|
|
items = list((await self.db.execute(query)).scalars().all())
|
|
return items, total
|
|
|
|
async def get_template(
|
|
self, tenant_id: uuid.UUID, template_id: uuid.UUID
|
|
) -> MessageTemplate:
|
|
result = await self.db.execute(
|
|
select(MessageTemplate).where(
|
|
MessageTemplate.id == template_id,
|
|
MessageTemplate.tenant_id == tenant_id,
|
|
)
|
|
)
|
|
template = result.scalar_one_or_none()
|
|
if not template:
|
|
raise NotFoundError(f"Template {template_id} non trovato")
|
|
return template
|
|
|
|
async def create_template(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
data: TemplateCreate,
|
|
created_by: uuid.UUID | None = None,
|
|
) -> MessageTemplate:
|
|
# Verifica unicita' nome
|
|
existing = await self.db.execute(
|
|
select(MessageTemplate).where(
|
|
MessageTemplate.tenant_id == tenant_id,
|
|
MessageTemplate.name == data.name,
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
raise ConflictError(f"Template '{data.name}' gia' esistente")
|
|
|
|
template = MessageTemplate(
|
|
tenant_id=tenant_id,
|
|
name=data.name,
|
|
description=data.description,
|
|
subject=data.subject,
|
|
body_text=data.body_text,
|
|
body_html=data.body_html,
|
|
created_by=created_by,
|
|
)
|
|
self.db.add(template)
|
|
await self.db.commit()
|
|
await self.db.refresh(template)
|
|
return template
|
|
|
|
async def update_template(
|
|
self,
|
|
tenant_id: uuid.UUID,
|
|
template_id: uuid.UUID,
|
|
data: TemplateUpdate,
|
|
) -> MessageTemplate:
|
|
template = await self.get_template(tenant_id, template_id)
|
|
|
|
if data.name is not None:
|
|
existing = await self.db.execute(
|
|
select(MessageTemplate).where(
|
|
MessageTemplate.tenant_id == tenant_id,
|
|
MessageTemplate.name == data.name,
|
|
MessageTemplate.id != template_id,
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
raise ConflictError(f"Template '{data.name}' gia' esistente")
|
|
template.name = data.name
|
|
|
|
if data.description is not None:
|
|
template.description = data.description
|
|
if data.subject is not None:
|
|
template.subject = data.subject
|
|
if data.body_text is not None:
|
|
template.body_text = data.body_text
|
|
if data.body_html is not None:
|
|
template.body_html = data.body_html
|
|
|
|
await self.db.commit()
|
|
await self.db.refresh(template)
|
|
return template
|
|
|
|
async def delete_template(
|
|
self, tenant_id: uuid.UUID, template_id: uuid.UUID
|
|
) -> None:
|
|
template = await self.get_template(tenant_id, template_id)
|
|
await self.db.delete(template)
|
|
await self.db.commit()
|