Files
PecHub/backend/app/services/routing_rule_service.py
T
2026-06-18 14:10:26 +02:00

548 lines
20 KiB
Python

"""
Service per la gestione delle regole di smistamento automatico (Feature 2).
Il metodo evaluate_rules() viene chiamato dal worker dopo ogni messaggio inbound.
"""
import re
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.exceptions import NotFoundError
from app.models.label import Label, MessageLabel
from app.models.message import Message
from app.models.routing_rule import RoutingRule, RoutingRuleAction, RoutingRuleCondition
from app.schemas.routing_rule import RoutingRuleCreate, RoutingRuleUpdate
class RoutingRuleService:
def __init__(self, db: AsyncSession):
self.db = db
# ─── CRUD ─────────────────────────────────────────────────────────────────
async def list_rules(
self,
tenant_id: uuid.UUID,
) -> tuple[list[RoutingRule], int]:
query = (
select(RoutingRule)
.where(RoutingRule.tenant_id == tenant_id)
.options(selectinload(RoutingRule.conditions), selectinload(RoutingRule.actions))
.order_by(RoutingRule.priority)
)
count_q = select(func.count()).select_from(
select(RoutingRule).where(RoutingRule.tenant_id == tenant_id).subquery()
)
total = (await self.db.execute(count_q)).scalar_one()
items = list((await self.db.execute(query)).scalars().all())
return items, total
async def get_rule(
self, tenant_id: uuid.UUID, rule_id: uuid.UUID
) -> RoutingRule:
result = await self.db.execute(
select(RoutingRule)
.where(RoutingRule.id == rule_id, RoutingRule.tenant_id == tenant_id)
.options(selectinload(RoutingRule.conditions), selectinload(RoutingRule.actions))
)
rule = result.scalar_one_or_none()
if not rule:
raise NotFoundError(f"Regola {rule_id} non trovata")
return rule
async def create_rule(
self,
tenant_id: uuid.UUID,
data: RoutingRuleCreate,
created_by: uuid.UUID | None = None,
) -> RoutingRule:
rule = RoutingRule(
tenant_id=tenant_id,
name=data.name,
description=data.description,
is_active=data.is_active,
priority=data.priority,
stop_processing=data.stop_processing,
created_by=created_by,
)
self.db.add(rule)
await self.db.flush()
for cond in data.conditions:
self.db.add(RoutingRuleCondition(
rule_id=rule.id,
field=cond.field,
operator=cond.operator,
value=cond.value,
))
for action in data.actions:
self.db.add(RoutingRuleAction(
rule_id=rule.id,
action_type=action.action_type,
action_value=action.action_value,
))
await self.db.commit()
return await self.get_rule(tenant_id, rule.id)
async def update_rule(
self,
tenant_id: uuid.UUID,
rule_id: uuid.UUID,
data: RoutingRuleUpdate,
) -> RoutingRule:
rule = await self.get_rule(tenant_id, rule_id)
if data.name is not None:
rule.name = data.name
if data.description is not None:
rule.description = data.description
if data.is_active is not None:
rule.is_active = data.is_active
if data.priority is not None:
rule.priority = data.priority
if data.stop_processing is not None:
rule.stop_processing = data.stop_processing
# Se condizioni o azioni vengono aggiornate, le sostituisce completamente
if data.conditions is not None:
await self.db.execute(
delete(RoutingRuleCondition).where(RoutingRuleCondition.rule_id == rule_id)
)
for cond in data.conditions:
self.db.add(RoutingRuleCondition(
rule_id=rule_id,
field=cond.field,
operator=cond.operator,
value=cond.value,
))
if data.actions is not None:
await self.db.execute(
delete(RoutingRuleAction).where(RoutingRuleAction.rule_id == rule_id)
)
for action in data.actions:
self.db.add(RoutingRuleAction(
rule_id=rule_id,
action_type=action.action_type,
action_value=action.action_value,
))
await self.db.commit()
return await self.get_rule(tenant_id, rule_id)
async def delete_rule(
self, tenant_id: uuid.UUID, rule_id: uuid.UUID
) -> None:
rule = await self.get_rule(tenant_id, rule_id)
await self.db.delete(rule)
await self.db.commit()
# ─── Motore di valutazione ────────────────────────────────────────────────
async def evaluate_and_apply(
self,
message: Message,
) -> int:
"""
Valuta le regole attive del tenant e applica le azioni su message.
Returns:
Numero di regole che hanno prodotto match.
"""
# Carica regole attive ordinate per priority
result = await self.db.execute(
select(RoutingRule)
.where(
RoutingRule.tenant_id == message.tenant_id,
RoutingRule.is_active == True, # noqa: E712
)
.options(selectinload(RoutingRule.conditions), selectinload(RoutingRule.actions))
.order_by(RoutingRule.priority)
)
rules = list(result.scalars().all())
matched_count = 0
for rule in rules:
if await self._matches(message, rule.conditions):
matched_count += 1
await self._apply_actions(message, rule.actions)
if rule.stop_processing:
break
if matched_count > 0:
await self.db.flush()
return matched_count
async def _matches(
self,
message: Message,
conditions: list[RoutingRuleCondition],
) -> bool:
"""Restituisce True se tutte le condizioni (AND) sono soddisfatte."""
if not conditions:
# Una regola senza condizioni non fa mai match (comportamento sicuro)
return False
for cond in conditions:
# has_label è una condizione speciale: verifica presenza di MessageLabel
if cond.field == "has_label":
if not await self._condition_has_label(message, cond.operator, cond.value):
return False
else:
field_value = self._get_field_value(message, cond.field)
if not self._evaluate_condition(field_value, cond.operator, cond.value):
return False
return True
async def _condition_has_label(
self, message: Message, operator: str, value: str
) -> bool:
"""
Verifica se il messaggio ha (o non ha) una specifica etichetta.
operator 'equals' / 'contains' → True se il messaggio HA la label con UUID=value
operator 'not_contains' → True se il messaggio NON HA la label
"""
try:
label_id = uuid.UUID(value)
except (ValueError, AttributeError):
return False
existing = await self.db.execute(
select(MessageLabel).where(
MessageLabel.message_id == message.id,
MessageLabel.label_id == label_id,
)
)
has_it = existing.scalar_one_or_none() is not None
if operator in ("equals", "contains", "starts_with", "ends_with"):
return has_it
elif operator == "not_contains":
return not has_it
return has_it
def _get_field_value(self, message: Message, field: str) -> str:
"""Estrae il valore del campo dal messaggio come stringa per il confronto."""
if field == "from_address":
return (message.from_address or "").lower()
elif field == "to_address":
return " ".join(message.to_addresses or []).lower()
elif field == "subject":
return (message.subject or "").lower()
elif field == "mailbox_id":
return str(message.mailbox_id)
elif field == "pec_type":
return message.pec_type or ""
# Rischio e Riservatezza (N3)
elif field == "risk_level":
return message.risk_level or ""
elif field == "confidentiality":
return message.confidentiality or ""
# Campi aggiuntivi
elif field == "has_attachments":
return "true" if message.has_attachments else "false"
elif field == "direction":
return message.direction or ""
elif field == "protocol_type":
return message.protocol_type or ""
elif field == "body_contains":
# Restituisce il corpo del messaggio per confronto con contains/regex
return (message.body_text or "").lower()
return ""
def _evaluate_condition(
self, field_value: str, operator: str, value: str
) -> bool:
v = value.lower()
fv = field_value.lower()
if operator == "contains":
return v in fv
elif operator == "not_contains":
return v not in fv
elif operator == "equals":
return fv == v
elif operator == "starts_with":
return fv.startswith(v)
elif operator == "ends_with":
return fv.endswith(v)
elif operator == "regex":
try:
return bool(re.search(value, field_value, re.IGNORECASE))
except re.error:
return False
return False
async def _apply_actions(
self,
message: Message,
actions: list[RoutingRuleAction],
) -> None:
"""Esegue le azioni sulla regola che ha fatto match."""
for action in actions:
try:
if action.action_type == "apply_label" and action.action_value:
await self._action_apply_label(message, uuid.UUID(action.action_value))
elif action.action_type == "apply_taxonomy" and action.action_value:
# Applica un nodo tassonomico: identico a apply_label
await self._action_apply_label(message, uuid.UUID(action.action_value))
elif action.action_type == "assign_vbox" and action.action_value:
await self._action_assign_vbox(message, uuid.UUID(action.action_value))
elif action.action_type == "mark_read":
message.is_read = True
elif action.action_type == "mark_starred":
message.is_starred = True
elif action.action_type == "archive":
message.is_archived = True
message.archived_at = datetime.now(timezone.utc)
elif action.action_type == "mark_for_conservation":
if not message.is_pending_conservation:
message.is_pending_conservation = True
message.pending_conservation_at = datetime.now(timezone.utc)
elif action.action_type == "set_deadline" and action.action_value:
await self._action_set_deadline(message, action.action_value)
elif action.action_type == "add_to_fascicolo" and action.action_value:
await self._action_add_to_fascicolo(message, uuid.UUID(action.action_value))
elif action.action_type == "notify_webhook" and action.action_value:
await self._action_notify_webhook(message, action.action_value)
elif action.action_type == "notify_channel" and action.action_value:
await self._action_notify_channel(message, uuid.UUID(action.action_value))
# Rischio e Riservatezza (N3)
elif action.action_type == "set_risk_level" and action.action_value:
valid_levels = {"low", "medium", "high", "critical"}
if action.action_value in valid_levels:
message.risk_level = action.action_value
elif action.action_type == "set_confidentiality" and action.action_value:
valid_levels = {"public", "internal", "confidential", "secret"}
if action.action_value in valid_levels:
message.confidentiality = action.action_value
except Exception:
# Le azioni non devono interrompere il flusso principale
pass
# ─── Implementazioni azioni ───────────────────────────────────────────────
async def _action_apply_label(
self, message: Message, label_id: uuid.UUID
) -> None:
"""Applica un'etichetta al messaggio (se non gia' presente)."""
# Verifica che la label appartenga al tenant
label = await self.db.execute(
select(Label).where(
Label.id == label_id,
Label.tenant_id == message.tenant_id,
)
)
if not label.scalar_one_or_none():
return
# Verifica che non sia gia' applicata
existing = await self.db.execute(
select(MessageLabel).where(
MessageLabel.message_id == message.id,
MessageLabel.label_id == label_id,
)
)
if not existing.scalar_one_or_none():
self.db.add(MessageLabel(message_id=message.id, label_id=label_id))
async def _action_assign_vbox(
self, message: Message, vbox_id: uuid.UUID
) -> None:
"""
Assegna il messaggio alla Virtual Box trovando la Label associata.
Le Virtual Box sono filtri read-only: per "assegnare" un messaggio a una VBox
si applica la Label il cui nome corrisponde al campo 'label' della VBox.
Se la VBox non ha un campo 'label' o non esiste una Label con quel nome,
l'azione viene saltata silenziosamente.
"""
from app.models.virtual_box import VirtualBox
vbox_result = await self.db.execute(
select(VirtualBox).where(
VirtualBox.id == vbox_id,
VirtualBox.tenant_id == message.tenant_id,
VirtualBox.is_active == True, # noqa: E712
)
)
vbox = vbox_result.scalar_one_or_none()
if not vbox or not vbox.label:
return
# Cerca la Label con lo stesso nome del campo 'label' della VBox
label_result = await self.db.execute(
select(Label).where(
Label.name == vbox.label,
Label.tenant_id == message.tenant_id,
)
)
label = label_result.scalar_one_or_none()
if label:
await self._action_apply_label(message, label.id)
async def _action_set_deadline(
self, message: Message, value: str
) -> None:
"""
Imposta una scadenza relativa al messaggio.
Formati accettati per action_value:
- Numero intero: "30" → +30 giorni
- Suffisso giorni: "+30d" → +30 giorni
- Suffisso settimane:"+4w" → +28 giorni
- Suffisso mesi: "+2m" → +60 giorni
- Suffisso anni: "+1y" → +365 giorni
- JSON: {"days": 30, "note": "Risposta entro 30 giorni"}
Non sovrascrive una scadenza gia' impostata manualmente.
"""
import json as _json
value = value.strip()
days: int | None = None
note: str | None = None
# Prova JSON
try:
parsed = _json.loads(value)
if isinstance(parsed, dict):
days = int(parsed.get("days", 0)) or None
note = parsed.get("note")
except Exception:
pass
# Prova formato stringa: "30", "+30d", "+4w", "+2m", "+1y"
if days is None:
m = re.match(r"^\+?(\d+)([dDwWmMyY]?)$", value)
if m:
n = int(m.group(1))
unit = m.group(2).lower() if m.group(2) else "d"
if unit == "w":
days = n * 7
elif unit == "m":
days = n * 30
elif unit == "y":
days = n * 365
else:
days = n
if days and days > 0:
base = message.received_at or datetime.now(timezone.utc)
message.deadline_at = base + timedelta(days=days)
if note and not message.deadline_note:
message.deadline_note = note
async def _action_add_to_fascicolo(
self, message: Message, fascicolo_id: uuid.UUID
) -> None:
"""
Aggiunge il messaggio a un fascicolo esistente (se non gia' presente).
Verifica che il fascicolo appartenga al tenant e non sia gia' archiviato.
"""
from app.models.fascicolo import Fascicolo, FascicoloMessage
# Verifica che il fascicolo esista, appartenga al tenant e non sia archiviato
fascicolo_result = await self.db.execute(
select(Fascicolo).where(
Fascicolo.id == fascicolo_id,
Fascicolo.tenant_id == message.tenant_id,
Fascicolo.stato != "archiviato",
)
)
if not fascicolo_result.scalar_one_or_none():
return
# Verifica che il messaggio non sia gia' nel fascicolo
existing = await self.db.execute(
select(FascicoloMessage).where(
FascicoloMessage.fascicolo_id == fascicolo_id,
FascicoloMessage.message_id == message.id,
)
)
if not existing.scalar_one_or_none():
self.db.add(FascicoloMessage(
fascicolo_id=fascicolo_id,
message_id=message.id,
))
async def _action_notify_channel(
self, message: Message, channel_id: uuid.UUID
) -> None:
"""
Invia una notifica tramite un canale configurato (webhook/email/telegram/whatsapp).
Crea un NotificationLog con status 'pending' che il worker processerà
tramite il meccanismo di retry esistente.
"""
from app.models.notification import NotificationChannel, NotificationLog
channel_result = await self.db.execute(
select(NotificationChannel).where(
NotificationChannel.id == channel_id,
NotificationChannel.tenant_id == message.tenant_id,
NotificationChannel.is_active == True, # noqa: E712
)
)
channel = channel_result.scalar_one_or_none()
if not channel:
return
# Crea il log per l'invio asincrono da parte del worker
self.db.add(NotificationLog(
tenant_id=message.tenant_id,
channel_id=channel_id,
event_type="routing_rule_match",
event_payload={
"message_id": str(message.id),
"subject": message.subject,
"from_address": message.from_address,
"pec_type": message.pec_type,
"mailbox_id": str(message.mailbox_id),
"direction": message.direction,
},
status="pending",
max_attempts=3,
))
async def _action_notify_webhook(self, message: Message, url: str) -> None:
"""Invia una notifica webhook per il messaggio."""
import aiohttp
import json
payload = {
"event": "routing_rule_match",
"message_id": str(message.id),
"subject": message.subject,
"from_address": message.from_address,
"pec_type": message.pec_type,
}
try:
async with aiohttp.ClientSession() as session:
await session.post(
url,
json=payload,
timeout=aiohttp.ClientTimeout(total=5),
)
except Exception:
pass