Files
PecHub/worker/app/smtp/receipt_watcher.py
2026-03-18 18:16:44 +01:00

95 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Job arq: watch_receipt attende la ricevuta di accettazione per una PEC inviata.
Viene enqueued da send_pec dopo un invio riuscito con un defer di 24 ore.
Se dopo 24h nessuna ricevuta (accettazione o avvenuta_consegna) è arrivata
tramite IMAP sync, imposta lo stato del messaggio a 'anomaly' e pubblica
un evento WebSocket all'admin del tenant.
Flow:
send_pec → invio OK → enqueue watch_receipt (defer 24h)
IMAP sync → ricevuta arriva → aggiorna Message.state a 'accepted'/'delivered'
watch_receipt (dopo 24h) → verifica se state == 'accepted'/'delivered'
→ no → state = 'anomaly' + WS event
"""
import json
import logging
import uuid as uuid_module
from typing import Any
from sqlalchemy import select
from app.database import AsyncSessionLocal
from app.models import Message
logger = logging.getLogger(__name__)
# Stati che indicano ricezione ricevuta (impostati da IMAP sync via pec_parser)
_ACCEPTED_STATES = {"accepted", "delivered"}
async def watch_receipt(ctx: dict[str, Any], message_id: str) -> dict:
"""
Job arq: verifica se il messaggio outbound ha ricevuto accettazione.
Args:
ctx: contesto arq (redis, ecc.)
message_id: UUID del messaggio outbound da monitorare
Returns:
dict con esito del controllo
"""
redis_client = ctx.get("redis")
async with AsyncSessionLocal() as db:
msg = await db.get(Message, uuid_module.UUID(message_id))
if not msg:
logger.warning(f"[watch_receipt] Messaggio {message_id} non trovato")
return {"status": "error", "message": "Messaggio non trovato"}
if msg.direction != "outbound":
return {"status": "skipped", "message": "Non è un messaggio outbound"}
if msg.state in _ACCEPTED_STATES:
# Ricevuta già arrivata tramite IMAP sync: OK
logger.info(
f"[watch_receipt] Messaggio {message_id} ha ricevuto "
f"accettazione (state={msg.state!r})"
)
return {"status": "ok", "state": msg.state}
# Nessuna ricevuta in 24h → anomalia
logger.warning(
f"[watch_receipt] Nessuna accettazione in 24h per {message_id} "
f"(state={msg.state!r}, mailbox={msg.mailbox_id})"
)
prev_state = msg.state
msg.state = "anomaly"
await db.commit()
# Pubblica evento WebSocket al tenant
if redis_client:
event = {
"type": "message:anomaly",
"message_id": message_id,
"mailbox_id": str(msg.mailbox_id),
"subject": msg.subject,
"reason": "Nessuna ricevuta di accettazione entro 24 ore",
"previous_state": prev_state,
}
channel = f"ws:tenant:{msg.tenant_id}"
try:
await redis_client.publish(channel, json.dumps(event, default=str))
logger.debug(f"[watch_receipt] Evento anomalia pubblicato su {channel}")
except Exception as e:
logger.error(f"[watch_receipt] Errore pubblicazione Redis: {e}")
return {
"status": "anomaly",
"message_id": message_id,
"reason": "Nessuna ricevuta di accettazione entro 24 ore",
}