Files
PecHub/backend/app/api/v1/settings.py
T
2026-06-18 11:53:18 +02:00

676 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Router Impostazioni Tenant.
Endpoint esistenti:
GET /settings -> legge le impostazioni del tenant corrente (admin)
PUT /settings -> aggiorna le impostazioni del tenant corrente (admin)
Endpoint indicizzazione full-text (Fase 8):
GET /settings/indexing/stats -> statistiche copertura indicizzazione
GET /settings/indexing/status -> stato job reindex in corso
POST /settings/indexing/reindex -> avvia reindex (full o differential)
DELETE /settings/indexing/reindex -> cancella job in corso
Solo admin e super_admin possono accedere.
Il super_admin puo' operare su qualsiasi tenant tramite query param ?tenant_id=<uuid>.
"""
import time
import uuid
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, Query, status
from app.config import get_settings as get_app_settings
from app.dependencies import AdminUser, DB
from app.schemas.tenant_settings import (
ConservationStats,
ConservationStatsResponse,
ConservationTenantBreakdown,
ConservationTriggerResult,
ConservatoreTestResult,
IndexingJobStatus,
IndexingStats,
StartReindexRequest,
StartRescanRequest,
TenantSettingsResponse,
TenantSettingsUpdate,
)
from app.services.indexing_service import IndexingService
from app.services.tenant_settings_service import TenantSettingsService
router = APIRouter(prefix="/settings", tags=["Impostazioni"])
# ─── Helper tenant_id resolution ─────────────────────────────────────────────
def _resolve_tenant_id(
current_user: AdminUser,
tenant_id_param: Optional[uuid.UUID] = None,
) -> uuid.UUID:
"""
Risolve il tenant_id da usare per l'operazione.
- super_admin: puo' passare un tenant_id arbitrario
- admin: usa sempre il proprio tenant_id (tenant_id_param ignorato)
"""
if current_user.role == "super_admin" and tenant_id_param is not None:
return tenant_id_param
return current_user.tenant_id
# ─── Impostazioni generali ────────────────────────────────────────────────────
@router.get(
"",
response_model=TenantSettingsResponse,
summary="Legge le impostazioni del tenant",
description=(
"Restituisce la configurazione operativa del tenant: "
"modalita' archiviazione (mock/produzione), endpoint e stato credenziali conservatore. "
"Il super_admin puo' specificare ?tenant_id=<uuid> per leggere le impostazioni di un tenant arbitrario."
),
)
async def get_settings(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> TenantSettingsResponse:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
service = TenantSettingsService(db)
settings = await service.get_or_create(target_tenant_id)
return TenantSettingsService.to_response(settings)
@router.put(
"",
response_model=TenantSettingsResponse,
summary="Aggiorna le impostazioni del tenant",
description=(
"Aggiorna la configurazione operativa del tenant. "
"Tutti i campi sono opzionali (semantica PATCH). "
"Il passaggio a modalita' 'production' richiede un endpoint conservatore configurato. "
"Il super_admin puo' specificare ?tenant_id=<uuid> per aggiornare le impostazioni di un tenant arbitrario."
),
)
async def update_settings(
body: TenantSettingsUpdate,
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> TenantSettingsResponse:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
service = TenantSettingsService(db)
settings = await service.update(target_tenant_id, body)
await db.commit()
await db.refresh(settings)
return TenantSettingsService.to_response(settings)
# ─── Test connessione conservatore ───────────────────────────────────────────
@router.post(
"/test-conservatore",
response_model=ConservatoreTestResult,
summary="Testa la connessione al conservatore configurato",
description=(
"Verifica che le credenziali salvate per il conservatore siano valide "
"effettuando una chiamata di autenticazione reale. "
"Supporta Aeterna (JWT) e conservatori generici (HTTP Basic). "
"Non modifica alcun dato, non invia pacchetti."
),
)
async def test_conservatore_connection(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> ConservatoreTestResult:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
service = TenantSettingsService(db)
creds = await service.get_conservatore_credentials(target_tenant_id)
if creds.get("mode") != "production":
return ConservatoreTestResult(
success=False,
message="La modalita' di archiviazione e' impostata su 'mock'. "
"Configura l'endpoint e le credenziali, poi imposta la modalita' su 'produzione'.",
)
endpoint = creds.get("endpoint")
username = creds.get("username")
password = creds.get("password")
tenant_slug = creds.get("tenant_slug")
conservatore_id = creds.get("conservatore_id", "")
if not endpoint or not username or not password:
return ConservatoreTestResult(
success=False,
message="Credenziali o endpoint non configurati. "
"Compila tutti i campi obbligatori prima di testare la connessione.",
)
# Rileva Aeterna da conservatore_id o URL
is_aeterna = (
(conservatore_id or "").lower() == "aeterna"
or "aeterna" in (endpoint or "").lower()
or "idrainformatica" in (endpoint or "").lower()
)
t_start = time.monotonic()
if is_aeterna:
# Test Aeterna: POST /api/v1/auth/login + GET /api/v1/auth/me
if not tenant_slug:
return ConservatoreTestResult(
success=False,
message="Provider Aeterna richiede il campo 'Tenant Slug'. Configuralo nelle impostazioni.",
)
try:
import httpx
async with httpx.AsyncClient(timeout=15) as client:
resp_login = await client.post(
f"{endpoint.rstrip('/')}/api/v1/auth/login",
json={
"email": username,
"password": password,
"tenant_slug": tenant_slug,
},
)
latency_ms = int((time.monotonic() - t_start) * 1000)
if resp_login.status_code != 200:
return ConservatoreTestResult(
success=False,
message=f"Login Aeterna fallito (HTTP {resp_login.status_code}): {resp_login.text[:200]}",
latency_ms=latency_ms,
)
login_data = resp_login.json()
token = login_data.get("access_token")
async with httpx.AsyncClient(timeout=10) as client:
resp_me = await client.get(
f"{endpoint.rstrip('/')}/api/v1/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
latency_ms = int((time.monotonic() - t_start) * 1000)
me = resp_me.json() if resp_me.status_code == 200 else {}
return ConservatoreTestResult(
success=resp_me.status_code == 200,
message=(
f"Connessione ad Aeterna riuscita (utente: {me.get('email', '?')})"
if resp_me.status_code == 200
else f"Login riuscito ma /me ha risposto HTTP {resp_me.status_code}"
),
latency_ms=latency_ms,
provider_info={
"platform": "Aeterna",
"tenant_slug": tenant_slug,
"user_email": me.get("email"),
"permissions_count": len(me.get("permissions", [])),
} if me else None,
)
except Exception as e:
return ConservatoreTestResult(
success=False,
message=f"Errore connessione ad Aeterna: {e}",
latency_ms=int((time.monotonic() - t_start) * 1000),
)
else:
# Test generico: HTTP Basic HEAD o GET sull'endpoint
import base64
import httpx
auth_str = base64.b64encode(f"{username}:{password}".encode()).decode()
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(
endpoint,
headers={"Authorization": f"Basic {auth_str}"},
)
latency_ms = int((time.monotonic() - t_start) * 1000)
if response.status_code < 500:
return ConservatoreTestResult(
success=True,
message=f"Endpoint raggiungibile (HTTP {response.status_code})",
latency_ms=latency_ms,
)
else:
return ConservatoreTestResult(
success=False,
message=f"Endpoint ha risposto con errore HTTP {response.status_code}",
latency_ms=latency_ms,
)
except Exception as e:
return ConservatoreTestResult(
success=False,
message=f"Errore connessione: {e}",
latency_ms=int((time.monotonic() - t_start) * 1000),
)
# ─── Indicizzazione full-text ─────────────────────────────────────────────────
@router.get(
"/indexing/stats",
response_model=IndexingStats,
summary="Statistiche indicizzazione full-text",
description=(
"Restituisce il numero di messaggi totali, indicizzati e non indicizzati "
"per il tenant, con percentuale di copertura. "
"Include anche le statistiche sugli allegati PDF/DOCX con testo estratto. "
"Il super_admin puo' specificare ?tenant_id=<uuid> per un tenant arbitrario."
),
)
async def get_indexing_stats(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingStats:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
service = IndexingService(db)
stats = await service.get_stats(target_tenant_id)
return IndexingStats(**stats)
@router.get(
"/indexing/status",
response_model=IndexingJobStatus,
summary="Stato job indicizzazione in corso",
description=(
"Restituisce lo stato del job di reindex per il tenant: "
"idle, running (con progresso), completed, failed o cancelled. "
"Se il job e' running da piu' di 2 ore, il flag is_stale e' True."
),
)
async def get_indexing_status(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.post(
"/indexing/reindex",
response_model=IndexingJobStatus,
status_code=status.HTTP_202_ACCEPTED,
summary="Avvia job di reindex",
description=(
"Avvia un job di reindex full-text in background. "
"mode='differential' indicizza solo i messaggi con search_vector NULL (piu' veloce). "
"mode='full' riscrive il vettore di tutti i messaggi del tenant. "
"Restituisce 409 se un job e' gia' in corso."
),
)
async def start_reindex(
body: StartReindexRequest,
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
try:
await IndexingService.start_reindex(
tenant_id=target_tenant_id,
mode=body.mode,
started_by_email=current_user.email,
redis_url=app_settings.redis_url,
db_url=app_settings.database_url,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(exc),
)
# Ritorna lo stato appena creato
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.delete(
"/indexing/reindex",
response_model=IndexingJobStatus,
summary="Cancella job di reindex in corso",
description=(
"Invia il segnale di cancellazione al job di reindex in corso. "
"Il job si fermera' alla fine del batch corrente (max qualche secondo). "
"Se non c'e' nessun job in corso, ritorna 404."
),
)
async def cancel_reindex(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
cancelled = await IndexingService.cancel_reindex(
target_tenant_id, app_settings.redis_url
)
if not cancelled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nessun job di reindex in corso per questo tenant",
)
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
# ─── Scansione allegati ───────────────────────────────────────────────────────
@router.get(
"/indexing/rescan-status",
response_model=IndexingJobStatus,
summary="Stato job scansione allegati in corso",
description=(
"Restituisce lo stato del job di scansione allegati per il tenant: "
"idle, running (con progresso), completed, failed o cancelled."
),
)
async def get_rescan_status(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
# ─── Conservazione sostitutiva statistiche e trigger ───────────────────────
def _build_conservation_stats(
total: int,
conserved: int,
pending: int,
last_conserved_at,
) -> ConservationStats:
not_queued = max(0, total - conserved - pending)
coverage_pct = round((conserved / total * 100), 1) if total > 0 else 0.0
last_dt = last_conserved_at.isoformat() if last_conserved_at else None
return ConservationStats(
total_messages=total,
conserved=conserved,
pending=pending,
not_queued=not_queued,
coverage_pct=coverage_pct,
last_conserved_at=last_dt,
)
@router.get(
"/conservation/stats",
response_model=ConservationStatsResponse,
summary="Statistiche conservazione sostitutiva",
description=(
"Restituisce il numero di messaggi totali, conservati, in coda e non accodati. "
"admin: statistiche del proprio tenant. "
"super_admin con ?tenant_id=<uuid>: statistiche del tenant specificato. "
"super_admin senza tenant_id: statistiche aggregate su tutti i tenant + breakdown per-tenant."
),
)
async def get_conservation_stats(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare; assente = tutti i tenant",
),
) -> ConservationStatsResponse:
from sqlalchemy import func, select
from app.models.message import Message
from app.models.tenant import Tenant
# Se non e' super_admin, oppure e' super_admin con tenant_id specificato -> singolo tenant
if current_user.role != "super_admin" or tenant_id is not None:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
row = await db.execute(
select(
func.count().label("total"),
func.count().filter(Message.is_conserved == True).label("conserved"), # noqa: E712
func.count().filter(
Message.is_pending_conservation == True, # noqa: E712
Message.is_conserved == False, # noqa: E712
).label("pending"),
func.max(Message.conserved_at).label("last_conserved_at"),
).where(Message.tenant_id == target_tenant_id)
)
r = row.one()
stats = _build_conservation_stats(
total=r.total or 0,
conserved=r.conserved or 0,
pending=r.pending or 0,
last_conserved_at=r.last_conserved_at,
)
return ConservationStatsResponse(stats=stats, per_tenant=None)
# super_admin senza tenant_id: aggregato + per-tenant
rows = await db.execute(
select(
Message.tenant_id,
func.count().label("total"),
func.count().filter(Message.is_conserved == True).label("conserved"), # noqa: E712
func.count().filter(
Message.is_pending_conservation == True, # noqa: E712
Message.is_conserved == False, # noqa: E712
).label("pending"),
func.max(Message.conserved_at).label("last_conserved_at"),
).group_by(Message.tenant_id)
)
tenant_rows = rows.all()
# Carica nomi tenant
all_tenant_ids = [r.tenant_id for r in tenant_rows]
tenant_names: dict[uuid.UUID, tuple[str, str]] = {}
if all_tenant_ids:
t_rows = await db.execute(
select(Tenant.id, Tenant.name, Tenant.slug).where(Tenant.id.in_(all_tenant_ids))
)
for t in t_rows.all():
tenant_names[t.id] = (t.name, t.slug)
per_tenant: list[ConservationTenantBreakdown] = []
agg_total = agg_conserved = agg_pending = 0
agg_last: object = None
for r in tenant_rows:
agg_total += r.total or 0
agg_conserved += r.conserved or 0
agg_pending += r.pending or 0
if r.last_conserved_at and (agg_last is None or r.last_conserved_at > agg_last):
agg_last = r.last_conserved_at
name, slug = tenant_names.get(r.tenant_id, ("(sconosciuto)", ""))
per_tenant.append(ConservationTenantBreakdown(
tenant_id=str(r.tenant_id),
tenant_name=name,
tenant_slug=slug,
stats=_build_conservation_stats(
total=r.total or 0,
conserved=r.conserved or 0,
pending=r.pending or 0,
last_conserved_at=r.last_conserved_at,
),
))
aggregate = _build_conservation_stats(
total=agg_total,
conserved=agg_conserved,
pending=agg_pending,
last_conserved_at=agg_last,
)
return ConservationStatsResponse(stats=aggregate, per_tenant=per_tenant)
@router.post(
"/conservation/trigger",
response_model=ConservationTriggerResult,
status_code=status.HTTP_202_ACCEPTED,
summary="Avvia manualmente il job di conservazione sostitutiva",
description=(
"Accoda immediatamente il job run_conservation nel worker arq. "
"Il job elabora tutti i tenant con messaggi pendenti di conservazione. "
"Utile per eseguire il ciclo di conservazione senza attendere il cron giornaliero. "
"Solo admin e super_admin possono avviarlo."
),
)
async def trigger_conservation(
current_user: AdminUser,
db: DB,
) -> ConservationTriggerResult:
app_settings = get_app_settings()
try:
import urllib.parse
from arq.connections import RedisSettings, create_pool
parsed = urllib.parse.urlparse(app_settings.redis_url)
redis_settings = RedisSettings(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
database=int(parsed.path.lstrip("/") or "0"),
password=parsed.password or None,
)
redis = await create_pool(redis_settings)
job = await redis.enqueue_job("run_conservation")
await redis.aclose()
job_id = job.job_id if job else None
return ConservationTriggerResult(
queued=True,
message="Job di conservazione accodato con successo. Verra' eseguito dal worker entro pochi secondi.",
job_id=job_id,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Impossibile accodare il job: {exc}",
)
@router.post(
"/indexing/rescan",
response_model=IndexingJobStatus,
status_code=status.HTTP_202_ACCEPTED,
summary="Avvia job di scansione allegati",
description=(
"Avvia un job di scansione allegati in background. "
"force=false (default): estrae il testo solo dagli allegati non ancora elaborati. "
"force=true: ri-estrae il testo da tutti gli allegati del tenant. "
"Al termine di ogni batch aggiorna anche il search_vector dei messaggi interessati. "
"Restituisce 409 se un job di scansione o reindex e' gia' in corso."
),
)
async def start_rescan(
body: StartRescanRequest,
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
try:
await IndexingService.start_rescan(
tenant_id=target_tenant_id,
started_by_email=current_user.email,
redis_url=app_settings.redis_url,
db_url=app_settings.database_url,
force=body.force,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(exc),
)
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.delete(
"/indexing/rescan",
response_model=IndexingJobStatus,
summary="Cancella job di scansione allegati in corso",
description=(
"Invia il segnale di cancellazione al job di scansione allegati in corso. "
"Il job si fermera' alla fine del batch corrente. "
"Se non c'e' nessun job in corso, ritorna 404."
),
)
async def cancel_rescan(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
cancelled = await IndexingService.cancel_rescan(
target_tenant_id, app_settings.redis_url
)
if not cancelled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nessun job di scansione allegati in corso per questo tenant",
)
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)