Files
PecHub/worker/app/jobs/apply_routing_rules.py
2026-03-27 20:59:06 +01:00

235 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Job arq: apply_routing_rules applica le regole di smistamento automatico.
Viene accodato da sync.py dopo il salvataggio di ogni messaggio inbound.
Logica:
1. Carica le regole attive del tenant ordinate per priority
2. Per ogni regola valuta le condizioni (AND)
3. Se match: esegue le azioni (apply_label, mark_read, mark_starred, notify_webhook)
4. Se stop_processing=True, interrompe la catena
"""
import logging
import re
import uuid as uuid_module
from typing import Any
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
from app.models import Message
logger = logging.getLogger(__name__)
# ─── Job principale ───────────────────────────────────────────────────────────
async def apply_routing_rules(ctx: dict[str, Any], message_id: str) -> dict:
"""
Valuta le regole di smistamento automatico per un messaggio.
Args:
ctx: contesto arq
message_id: UUID del messaggio da processare
Returns:
dict con: matched_rules, actions_applied
"""
msg_uuid = uuid_module.UUID(message_id)
async with AsyncSessionLocal() as db:
# Carica il messaggio
msg = await db.get(Message, msg_uuid)
if not msg:
logger.warning(f"[routing_rules] Messaggio {message_id} non trovato")
return {"status": "skipped", "reason": "message_not_found"}
# Solo messaggi inbound di tipo posta_certificata
if msg.direction != "inbound" or msg.pec_type != "posta_certificata":
return {"status": "skipped", "reason": "not_inbound_pec"}
# Carica regole attive del tenant ordinate per priority ASC
rules_result = await db.execute(
text("""
SELECT r.id, r.name, r.priority, r.stop_processing,
COALESCE(
json_agg(
json_build_object('field', c.field, 'operator', c.operator, 'value', c.value)
ORDER BY c.id
) FILTER (WHERE c.id IS NOT NULL),
'[]'::json
) AS conditions,
COALESCE(
json_agg(
json_build_object('action_type', a.action_type, 'action_value', a.action_value)
ORDER BY a.id
) FILTER (WHERE a.id IS NOT NULL),
'[]'::json
) AS actions
FROM routing_rules r
LEFT JOIN routing_rule_conditions c ON c.rule_id = r.id
LEFT JOIN routing_rule_actions a ON a.rule_id = r.id
WHERE r.tenant_id = :tenant_id
AND r.is_active = true
GROUP BY r.id, r.name, r.priority, r.stop_processing
ORDER BY r.priority ASC
"""),
{"tenant_id": str(msg.tenant_id)},
)
rules = rules_result.mappings().all()
matched_count = 0
actions_applied: list[str] = []
for rule in rules:
conditions = rule["conditions"]
if not conditions:
continue
# Valuta condizioni (AND)
if not _evaluate_conditions(msg, conditions):
continue
matched_count += 1
logger.info(
f"[routing_rules] Regola '{rule['name']}' (priority={rule['priority']}) "
f"match per messaggio {message_id}"
)
# Esegui azioni
for action in rule["actions"]:
applied = await _apply_action(db, msg, action["action_type"], action["action_value"])
if applied:
actions_applied.append(action["action_type"])
if rule["stop_processing"]:
break
if matched_count > 0:
await db.commit()
return {
"status": "ok",
"message_id": message_id,
"matched_rules": matched_count,
"actions_applied": actions_applied,
}
# ─── Valutazione condizioni ────────────────────────────────────────────────────
def _get_field_value(msg: Message, field: str) -> str:
if field == "from_address":
return (msg.from_address or "").lower()
elif field == "to_address":
return " ".join(msg.to_addresses or []).lower()
elif field == "subject":
return (msg.subject or "").lower()
elif field == "mailbox_id":
return str(msg.mailbox_id)
elif field == "pec_type":
return msg.pec_type or ""
return ""
def _evaluate_condition(field_value: str, operator: str, value: str) -> bool:
v = value.lower()
fv = field_value.lower()
if operator == "contains":
return v in fv
elif operator == "not_contains":
return v not in fv
elif operator == "equals":
return fv == v
elif operator == "starts_with":
return fv.startswith(v)
elif operator == "ends_with":
return fv.endswith(v)
elif operator == "regex":
try:
return bool(re.search(value, field_value, re.IGNORECASE))
except re.error:
return False
return False
def _evaluate_conditions(msg: Message, conditions: list[dict]) -> bool:
"""Valuta AND tra tutte le condizioni."""
for cond in conditions:
field_val = _get_field_value(msg, cond["field"])
if not _evaluate_condition(field_val, cond["operator"], cond["value"]):
return False
return True
# ─── Esecuzione azioni ─────────────────────────────────────────────────────────
async def _apply_action(
db: AsyncSession,
msg: Message,
action_type: str,
action_value: str | None,
) -> bool:
"""Esegue una singola azione. Restituisce True se applicata."""
try:
if action_type == "apply_label" and action_value:
return await _action_apply_label(db, msg, uuid_module.UUID(action_value))
elif action_type == "mark_read":
if not msg.is_read:
msg.is_read = True
return True
elif action_type == "mark_starred":
if not msg.is_starred:
msg.is_starred = True
return True
elif action_type == "notify_webhook" and action_value:
await _action_notify_webhook(msg, action_value)
return True
except Exception as e:
logger.warning(f"[routing_rules] Errore azione {action_type}: {e}")
return False
async def _action_apply_label(
db: AsyncSession, msg: Message, label_id: uuid_module.UUID
) -> bool:
"""Applica un'etichetta al messaggio se non gia' applicata."""
# Verifica che la label esista e appartenga al tenant
label_check = await db.execute(
text("SELECT id FROM labels WHERE id = :lid AND tenant_id = :tid"),
{"lid": str(label_id), "tid": str(msg.tenant_id)},
)
if not label_check.fetchone():
return False
# Inserisci con ON CONFLICT DO NOTHING per idempotenza
await db.execute(
text("""
INSERT INTO message_labels (message_id, label_id)
VALUES (:msg_id, :label_id)
ON CONFLICT DO NOTHING
"""),
{"msg_id": str(msg.id), "label_id": str(label_id)},
)
logger.debug(f"[routing_rules] Etichetta {label_id} applicata a {msg.id}")
return True
async def _action_notify_webhook(msg: Message, url: str) -> None:
"""Invia notifica webhook per il messaggio."""
import aiohttp
payload = {
"event": "routing_rule_match",
"message_id": str(msg.id),
"subject": msg.subject,
"from_address": msg.from_address,
"pec_type": msg.pec_type,
}
try:
async with aiohttp.ClientSession() as session:
await session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=5))
except Exception as e:
logger.warning(f"[routing_rules] Webhook {url} fallito: {e}")