Files
PecHub/worker/app/archival/conservatore_client.py
T
2026-03-19 14:43:36 +01:00

380 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Client conservatore AgID supporta modalità mock e produzione.
Modalità mock (default in sviluppo):
- Simula localmente tutte le chiamate al conservatore
- Restituisce risposte sintetiche plausibili (RdV false)
- Non effettua alcuna chiamata di rete esterna
- Utile per sviluppo, test e demo
Modalità produzione:
- Esegue chiamate HTTP reali all'endpoint AgID del conservatore configurato
- Usa le credenziali cifrate recuperate dalle impostazioni del tenant
- Autenticazione HTTP Basic (standard AgID per versamenti SIP)
Come switchare da mock a produzione:
L'admin del tenant configura la modalità dalla pagina Impostazioni del
frontend → sezione "Archiviazione Sostitutiva".
Le credenziali vengono salvate cifrate nel DB (AES-256-GCM, ADR-002).
Il worker legge la configurazione a runtime dalla tabella tenant_settings.
Interfaccia pubblica (stessa per entrambe le modalità):
client = ConservatoreClient.from_tenant_credentials(creds)
result = await client.upload_versamento(sip_path, sip_bytes)
status = await client.get_versamento_status(versamento_id)
dip = await client.get_dip(versamento_id)
"""
from __future__ import annotations
import hashlib
import uuid
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from app.config import get_settings # noqa: F401 (futuro uso logger)
settings = get_settings()
# ─── DTO risultati ────────────────────────────────────────────────────────────
@dataclass
class VersamentoResult:
"""Risultato dell'upload di un pacchetto SIP al conservatore."""
success: bool
versamento_id: str | None = None
message: str = ""
raw_response: dict[str, Any] = field(default_factory=dict)
@dataclass
class VersamentoStatus:
"""Stato di un versamento in corso o completato."""
versamento_id: str
status: str # pending | processing | accepted | rejected
message: str = ""
rdv_available: bool = False
raw_response: dict[str, Any] = field(default_factory=dict)
@dataclass
class DipResult:
"""Risultato di una richiesta DIP (Dissemination Information Package)."""
success: bool
dip_id: str | None = None
download_url: str | None = None
message: str = ""
# ─── Client base (interfaccia) ────────────────────────────────────────────────
class _BaseConservatoreClient:
"""Interfaccia comune per mock e produzione."""
async def upload_versamento(
self,
sip_path: str,
sip_bytes: bytes,
tenant_id: uuid.UUID,
) -> VersamentoResult:
raise NotImplementedError
async def get_versamento_status(
self, versamento_id: str
) -> VersamentoStatus:
raise NotImplementedError
async def get_dip(self, versamento_id: str) -> DipResult:
raise NotImplementedError
# ─── Implementazione MOCK ─────────────────────────────────────────────────────
class MockConservatoreClient(_BaseConservatoreClient):
"""
Conservatore simulato nessuna chiamata di rete.
Simula tempi realistici e risponde con dati fittizi ma strutturalmente
corretti, in modo che il resto del codice possa essere testato end-to-end
senza dipendenze esterne.
"""
async def upload_versamento(
self,
sip_path: str,
sip_bytes: bytes,
tenant_id: uuid.UUID,
) -> VersamentoResult:
"""Simula upload SIP: genera un versamento_id deterministico."""
checksum = hashlib.sha256(sip_bytes).hexdigest()[:16]
tenant_prefix = str(tenant_id)[:8]
versamento_id = f"MOCK-{tenant_prefix}-{checksum}"
return VersamentoResult(
success=True,
versamento_id=versamento_id,
message="[MOCK] Versamento accettato dal conservatore simulato",
raw_response={
"versamento_id": versamento_id,
"stato": "accepted",
"timestamp": datetime.now(UTC).isoformat(),
"mock": True,
},
)
async def get_versamento_status(
self, versamento_id: str
) -> VersamentoStatus:
"""Simula polling stato: risponde sempre 'accepted' con RdV disponibile."""
return VersamentoStatus(
versamento_id=versamento_id,
status="accepted",
message="[MOCK] Versamento confermato",
rdv_available=True,
raw_response={
"versamento_id": versamento_id,
"stato": "accepted",
"rdv_disponibile": True,
"mock": True,
},
)
async def get_dip(self, versamento_id: str) -> DipResult:
"""Simula richiesta DIP."""
dip_id = f"DIP-{versamento_id}"
return DipResult(
success=True,
dip_id=dip_id,
download_url=f"http://mock-conservatore.local/dip/{dip_id}",
message="[MOCK] DIP disponibile",
)
# ─── Implementazione PRODUZIONE ───────────────────────────────────────────────
class ProductionConservatoreClient(_BaseConservatoreClient):
"""
Client HTTP reale per conservatore AgID.
Autenticazione: HTTP Basic (standard AgID CNIPA).
Formato SIP: UNI SInCRO 11386:2023 (pacchetto ZIP con indice XML).
L'URL endpoint e le credenziali arrivano dalla tabella tenant_settings
(decifrate a runtime dal TenantSettingsService).
"""
def __init__(
self,
endpoint: str,
username: str,
password: str,
conservatore_id: str = "production",
timeout_seconds: int = 120,
) -> None:
self.endpoint = endpoint.rstrip("/")
self.username = username
self.password = password
self.conservatore_id = conservatore_id
self.timeout_seconds = timeout_seconds
async def upload_versamento(
self,
sip_path: str,
sip_bytes: bytes,
tenant_id: uuid.UUID,
) -> VersamentoResult:
"""
POST {endpoint}/versamento
Content-Type: application/octet-stream (o multipart/form-data per alcuni provider)
Authorization: Basic base64(user:pass)
"""
try:
import httpx
except ImportError:
return VersamentoResult(
success=False,
message="httpx non installato nel worker. Aggiungere alla dipendenza.",
)
import base64
auth_str = base64.b64encode(
f"{self.username}:{self.password}".encode()
).decode()
try:
async with httpx.AsyncClient(timeout=self.timeout_seconds) as client:
response = await client.post(
f"{self.endpoint}/versamento",
content=sip_bytes,
headers={
"Authorization": f"Basic {auth_str}",
"Content-Type": "application/octet-stream",
"X-Tenant-ID": str(tenant_id),
"X-SIP-Path": sip_path,
},
)
if response.status_code in (200, 201, 202):
data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {}
_fallback_id = str(uuid.uuid4())[:8]
versamento_id = data.get("versamento_id") or data.get("id") or f"VERS-{_fallback_id}"
return VersamentoResult(
success=True,
versamento_id=str(versamento_id),
message="Versamento accettato dal conservatore",
raw_response=data,
)
else:
return VersamentoResult(
success=False,
message=f"Conservatore ha risposto con errore HTTP {response.status_code}: {response.text[:500]}",
raw_response={"status_code": response.status_code, "body": response.text[:500]},
)
except Exception as e:
return VersamentoResult(
success=False,
message=f"Errore di connessione al conservatore: {e}",
)
async def get_versamento_status(
self, versamento_id: str
) -> VersamentoStatus:
"""GET {endpoint}/versamento/{versamento_id}"""
try:
import httpx
except ImportError:
return VersamentoStatus(
versamento_id=versamento_id,
status="unknown",
message="httpx non installato",
)
import base64
auth_str = base64.b64encode(
f"{self.username}:{self.password}".encode()
).decode()
try:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(
f"{self.endpoint}/versamento/{versamento_id}",
headers={"Authorization": f"Basic {auth_str}"},
)
if response.status_code == 200:
data = response.json()
stato = data.get("stato", data.get("status", "unknown"))
return VersamentoStatus(
versamento_id=versamento_id,
status=str(stato),
message=data.get("message", ""),
rdv_available=data.get("rdv_disponibile", False),
raw_response=data,
)
else:
return VersamentoStatus(
versamento_id=versamento_id,
status="error",
message=f"HTTP {response.status_code}: {response.text[:200]}",
)
except Exception as e:
return VersamentoStatus(
versamento_id=versamento_id,
status="error",
message=f"Errore connessione: {e}",
)
async def get_dip(self, versamento_id: str) -> DipResult:
"""POST {endpoint}/dip con il versamento_id"""
try:
import httpx
except ImportError:
return DipResult(success=False, message="httpx non installato")
import base64
auth_str = base64.b64encode(
f"{self.username}:{self.password}".encode()
).decode()
try:
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
f"{self.endpoint}/dip",
json={"versamento_id": versamento_id},
headers={
"Authorization": f"Basic {auth_str}",
"Content-Type": "application/json",
},
)
if response.status_code in (200, 201, 202):
data = response.json()
return DipResult(
success=True,
dip_id=str(data.get("dip_id", "")),
download_url=data.get("download_url"),
message="DIP disponibile",
)
else:
return DipResult(
success=False,
message=f"HTTP {response.status_code}: {response.text[:200]}",
)
except Exception as e:
return DipResult(success=False, message=f"Errore connessione: {e}")
# ─── Factory ──────────────────────────────────────────────────────────────────
class ConservatoreClient:
"""
Factory che istanzia il client corretto in base alla modalità.
Utilizzo dal worker:
creds = await tenant_settings_service.get_conservatore_credentials(tenant_id)
client = ConservatoreClient.from_tenant_credentials(creds)
result = await client.upload_versamento(sip_path, sip_bytes, tenant_id)
"""
@staticmethod
def from_tenant_credentials(creds: dict) -> _BaseConservatoreClient:
"""
Crea il client appropriato dalla configurazione tenant.
Args:
creds: dizionario da TenantSettingsService.get_conservatore_credentials()
con chiavi: mode, conservatore_id, endpoint, username, password
"""
mode = creds.get("mode", "mock")
if mode == "production":
endpoint = creds.get("endpoint")
username = creds.get("username")
password = creds.get("password")
if not endpoint:
raise ValueError(
"Modalità produzione attiva ma conservatore_endpoint non configurato. "
"Verificare le impostazioni del tenant."
)
if not username or not password:
raise ValueError(
"Modalità produzione attiva ma credenziali conservatore mancanti. "
"Configurare username e password nelle impostazioni del tenant."
)
return ProductionConservatoreClient(
endpoint=endpoint,
username=username,
password=password,
conservatore_id=creds.get("conservatore_id", "production"),
)
# Default: modalità mock (sicura per sviluppo)
return MockConservatoreClient()