mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 12:45:42 +02:00
145 lines
4.6 KiB
Python
145 lines
4.6 KiB
Python
"""
|
||
WebSocket Connection Manager.
|
||
|
||
Gestisce le connessioni WebSocket raggruppate per tenant.
|
||
Il worker pubblica eventi su Redis (canale ws:tenant:<tenant_id>);
|
||
un task asyncio in background ascolta Redis e fa forward ai client WS.
|
||
|
||
Architettura fan-out:
|
||
Worker → Redis PUBLISH ws:tenant:<tid> <payload>
|
||
Backend → Redis SUBSCRIBE → forward a tutti i WebSocket del tenant
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import uuid
|
||
from collections import defaultdict
|
||
|
||
from fastapi import WebSocket
|
||
|
||
from app.core.logging import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class ConnectionManager:
|
||
"""
|
||
Gestisce N connessioni WebSocket per M tenant.
|
||
Thread-safe per uso in contesto asyncio.
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
# tenant_id (str) → set of WebSocket
|
||
self._connections: dict[str, set[WebSocket]] = defaultdict(set)
|
||
# Lock per modifiche al dizionario
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def connect(self, websocket: WebSocket, tenant_id: uuid.UUID) -> None:
|
||
"""Registra una nuova connessione WS per il tenant."""
|
||
await websocket.accept()
|
||
async with self._lock:
|
||
self._connections[str(tenant_id)].add(websocket)
|
||
logger.info(
|
||
"WebSocket connesso",
|
||
extra={"tenant_id": str(tenant_id), "total": self._count(tenant_id)},
|
||
)
|
||
|
||
async def disconnect(self, websocket: WebSocket, tenant_id: uuid.UUID) -> None:
|
||
"""Rimuove una connessione WS dal tenant."""
|
||
async with self._lock:
|
||
self._connections[str(tenant_id)].discard(websocket)
|
||
if not self._connections[str(tenant_id)]:
|
||
del self._connections[str(tenant_id)]
|
||
logger.info(
|
||
"WebSocket disconnesso",
|
||
extra={"tenant_id": str(tenant_id)},
|
||
)
|
||
|
||
async def broadcast_to_tenant(
|
||
self, tenant_id: uuid.UUID | str, event: dict
|
||
) -> None:
|
||
"""
|
||
Invia un evento JSON a tutti i client connessi del tenant.
|
||
Le connessioni morte vengono rimosse silenziosamente.
|
||
"""
|
||
tid = str(tenant_id)
|
||
async with self._lock:
|
||
connections = set(self._connections.get(tid, set()))
|
||
|
||
if not connections:
|
||
return
|
||
|
||
payload = json.dumps(event, default=str)
|
||
dead = set()
|
||
|
||
for ws in connections:
|
||
try:
|
||
await ws.send_text(payload)
|
||
except Exception:
|
||
dead.add(ws)
|
||
|
||
if dead:
|
||
async with self._lock:
|
||
self._connections[tid] -= dead
|
||
|
||
def _count(self, tenant_id: uuid.UUID) -> int:
|
||
return len(self._connections.get(str(tenant_id), set()))
|
||
|
||
@property
|
||
def total_connections(self) -> int:
|
||
return sum(len(v) for v in self._connections.values())
|
||
|
||
|
||
# Istanza singleton – importata da main.py e dal Redis listener
|
||
manager = ConnectionManager()
|
||
|
||
|
||
# ─── Redis subscriber (background task) ──────────────────────────────────────
|
||
|
||
async def redis_subscriber_loop(redis_url: str) -> None:
|
||
"""
|
||
Task asyncio che si sottoscrive al canale Redis ws:* e
|
||
fa forward degli eventi ai client WebSocket del tenant corretto.
|
||
|
||
Pubblicato dal worker con:
|
||
PUBLISH ws:tenant:<tenant_id> <json_payload>
|
||
"""
|
||
import redis.asyncio as aioredis
|
||
|
||
logger.info("Redis WS subscriber avviato", extra={"url": redis_url})
|
||
|
||
while True:
|
||
try:
|
||
client = aioredis.from_url(redis_url, decode_responses=True)
|
||
pubsub = client.pubsub()
|
||
await pubsub.psubscribe("ws:tenant:*") # pattern subscription
|
||
|
||
async for message in pubsub.listen():
|
||
if message["type"] not in ("pmessage", "message"):
|
||
continue
|
||
|
||
# Estrae tenant_id dal canale: ws:tenant:<uuid>
|
||
channel: str = message.get("channel", "")
|
||
if not channel.startswith("ws:tenant:"):
|
||
continue
|
||
|
||
tenant_id_str = channel.removeprefix("ws:tenant:")
|
||
try:
|
||
tenant_uuid = uuid.UUID(tenant_id_str)
|
||
except ValueError:
|
||
continue
|
||
|
||
try:
|
||
payload = json.loads(message["data"])
|
||
except (json.JSONDecodeError, KeyError):
|
||
continue
|
||
|
||
await manager.broadcast_to_tenant(tenant_uuid, payload)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("Redis WS subscriber terminato")
|
||
return
|
||
except Exception as e:
|
||
logger.error(f"Redis WS subscriber errore: {e}. Riconnessione in 5s...")
|
||
await asyncio.sleep(5)
|