""" Router Impostazioni Tenant. Endpoint esistenti: GET /settings -> legge le impostazioni del tenant corrente (admin) PUT /settings -> aggiorna le impostazioni del tenant corrente (admin) Endpoint indicizzazione full-text (Fase 8): GET /settings/indexing/stats -> statistiche copertura indicizzazione GET /settings/indexing/status -> stato job reindex in corso POST /settings/indexing/reindex -> avvia reindex (full o differential) DELETE /settings/indexing/reindex -> cancella job in corso Solo admin e super_admin possono accedere. Il super_admin puo' operare su qualsiasi tenant tramite query param ?tenant_id=. """ import time import uuid from typing import Annotated, Optional from fastapi import APIRouter, HTTPException, Query, status from app.config import get_settings as get_app_settings from app.dependencies import AdminUser, DB from app.schemas.tenant_settings import ( ConservationStats, ConservationStatsResponse, ConservationTenantBreakdown, ConservationTriggerResult, ConservatoreTestResult, IndexingJobStatus, IndexingStats, StartReindexRequest, StartRescanRequest, TenantSettingsResponse, TenantSettingsUpdate, ) from app.services.indexing_service import IndexingService from app.services.tenant_settings_service import TenantSettingsService router = APIRouter(prefix="/settings", tags=["Impostazioni"]) # ─── Helper tenant_id resolution ───────────────────────────────────────────── def _resolve_tenant_id( current_user: AdminUser, tenant_id_param: Optional[uuid.UUID] = None, ) -> uuid.UUID: """ Risolve il tenant_id da usare per l'operazione. - super_admin: puo' passare un tenant_id arbitrario - admin: usa sempre il proprio tenant_id (tenant_id_param ignorato) """ if current_user.role == "super_admin" and tenant_id_param is not None: return tenant_id_param return current_user.tenant_id # ─── Impostazioni generali ──────────────────────────────────────────────────── @router.get( "", response_model=TenantSettingsResponse, summary="Legge le impostazioni del tenant", description=( "Restituisce la configurazione operativa del tenant: " "modalita' archiviazione (mock/produzione), endpoint e stato credenziali conservatore. " "Il super_admin puo' specificare ?tenant_id= per leggere le impostazioni di un tenant arbitrario." ), ) async def get_settings( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> TenantSettingsResponse: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) service = TenantSettingsService(db) settings = await service.get_or_create(target_tenant_id) return TenantSettingsService.to_response(settings) @router.put( "", response_model=TenantSettingsResponse, summary="Aggiorna le impostazioni del tenant", description=( "Aggiorna la configurazione operativa del tenant. " "Tutti i campi sono opzionali (semantica PATCH). " "Il passaggio a modalita' 'production' richiede un endpoint conservatore configurato. " "Il super_admin puo' specificare ?tenant_id= per aggiornare le impostazioni di un tenant arbitrario." ), ) async def update_settings( body: TenantSettingsUpdate, current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> TenantSettingsResponse: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) service = TenantSettingsService(db) settings = await service.update(target_tenant_id, body) await db.commit() await db.refresh(settings) return TenantSettingsService.to_response(settings) # ─── Test connessione conservatore ─────────────────────────────────────────── @router.post( "/test-conservatore", response_model=ConservatoreTestResult, summary="Testa la connessione al conservatore configurato", description=( "Verifica che le credenziali salvate per il conservatore siano valide " "effettuando una chiamata di autenticazione reale. " "Supporta Aeterna (JWT) e conservatori generici (HTTP Basic). " "Non modifica alcun dato, non invia pacchetti." ), ) async def test_conservatore_connection( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> ConservatoreTestResult: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) service = TenantSettingsService(db) creds = await service.get_conservatore_credentials(target_tenant_id) if creds.get("mode") != "production": return ConservatoreTestResult( success=False, message="La modalita' di archiviazione e' impostata su 'mock'. " "Configura l'endpoint e le credenziali, poi imposta la modalita' su 'produzione'.", ) endpoint = creds.get("endpoint") username = creds.get("username") password = creds.get("password") tenant_slug = creds.get("tenant_slug") conservatore_id = creds.get("conservatore_id", "") if not endpoint or not username or not password: return ConservatoreTestResult( success=False, message="Credenziali o endpoint non configurati. " "Compila tutti i campi obbligatori prima di testare la connessione.", ) # Rileva Aeterna da conservatore_id o URL is_aeterna = ( (conservatore_id or "").lower() == "aeterna" or "aeterna" in (endpoint or "").lower() or "idrainformatica" in (endpoint or "").lower() ) t_start = time.monotonic() if is_aeterna: # Test Aeterna: POST /api/v1/auth/login + GET /api/v1/auth/me if not tenant_slug: return ConservatoreTestResult( success=False, message="Provider Aeterna richiede il campo 'Tenant Slug'. Configuralo nelle impostazioni.", ) try: import httpx async with httpx.AsyncClient(timeout=15) as client: resp_login = await client.post( f"{endpoint.rstrip('/')}/api/v1/auth/login", json={ "email": username, "password": password, "tenant_slug": tenant_slug, }, ) latency_ms = int((time.monotonic() - t_start) * 1000) if resp_login.status_code != 200: return ConservatoreTestResult( success=False, message=f"Login Aeterna fallito (HTTP {resp_login.status_code}): {resp_login.text[:200]}", latency_ms=latency_ms, ) login_data = resp_login.json() token = login_data.get("access_token") async with httpx.AsyncClient(timeout=10) as client: resp_me = await client.get( f"{endpoint.rstrip('/')}/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"}, ) latency_ms = int((time.monotonic() - t_start) * 1000) me = resp_me.json() if resp_me.status_code == 200 else {} return ConservatoreTestResult( success=resp_me.status_code == 200, message=( f"Connessione ad Aeterna riuscita (utente: {me.get('email', '?')})" if resp_me.status_code == 200 else f"Login riuscito ma /me ha risposto HTTP {resp_me.status_code}" ), latency_ms=latency_ms, provider_info={ "platform": "Aeterna", "tenant_slug": tenant_slug, "user_email": me.get("email"), "permissions_count": len(me.get("permissions", [])), } if me else None, ) except Exception as e: return ConservatoreTestResult( success=False, message=f"Errore connessione ad Aeterna: {e}", latency_ms=int((time.monotonic() - t_start) * 1000), ) else: # Test generico: HTTP Basic HEAD o GET sull'endpoint import base64 import httpx auth_str = base64.b64encode(f"{username}:{password}".encode()).decode() try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get( endpoint, headers={"Authorization": f"Basic {auth_str}"}, ) latency_ms = int((time.monotonic() - t_start) * 1000) if response.status_code < 500: return ConservatoreTestResult( success=True, message=f"Endpoint raggiungibile (HTTP {response.status_code})", latency_ms=latency_ms, ) else: return ConservatoreTestResult( success=False, message=f"Endpoint ha risposto con errore HTTP {response.status_code}", latency_ms=latency_ms, ) except Exception as e: return ConservatoreTestResult( success=False, message=f"Errore connessione: {e}", latency_ms=int((time.monotonic() - t_start) * 1000), ) # ─── Indicizzazione full-text ───────────────────────────────────────────────── @router.get( "/indexing/stats", response_model=IndexingStats, summary="Statistiche indicizzazione full-text", description=( "Restituisce il numero di messaggi totali, indicizzati e non indicizzati " "per il tenant, con percentuale di copertura. " "Include anche le statistiche sugli allegati PDF/DOCX con testo estratto. " "Il super_admin puo' specificare ?tenant_id= per un tenant arbitrario." ), ) async def get_indexing_stats( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingStats: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) service = IndexingService(db) stats = await service.get_stats(target_tenant_id) return IndexingStats(**stats) @router.get( "/indexing/status", response_model=IndexingJobStatus, summary="Stato job indicizzazione in corso", description=( "Restituisce lo stato del job di reindex per il tenant: " "idle, running (con progresso), completed, failed o cancelled. " "Se il job e' running da piu' di 2 ore, il flag is_stale e' True." ), ) async def get_indexing_status( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() status_data = await IndexingService.get_job_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data) @router.post( "/indexing/reindex", response_model=IndexingJobStatus, status_code=status.HTTP_202_ACCEPTED, summary="Avvia job di reindex", description=( "Avvia un job di reindex full-text in background. " "mode='differential' indicizza solo i messaggi con search_vector NULL (piu' veloce). " "mode='full' riscrive il vettore di tutti i messaggi del tenant. " "Restituisce 409 se un job e' gia' in corso." ), ) async def start_reindex( body: StartReindexRequest, current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() try: await IndexingService.start_reindex( tenant_id=target_tenant_id, mode=body.mode, started_by_email=current_user.email, redis_url=app_settings.redis_url, db_url=app_settings.database_url, ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=str(exc), ) # Ritorna lo stato appena creato status_data = await IndexingService.get_job_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data) @router.delete( "/indexing/reindex", response_model=IndexingJobStatus, summary="Cancella job di reindex in corso", description=( "Invia il segnale di cancellazione al job di reindex in corso. " "Il job si fermera' alla fine del batch corrente (max qualche secondo). " "Se non c'e' nessun job in corso, ritorna 404." ), ) async def cancel_reindex( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() cancelled = await IndexingService.cancel_reindex( target_tenant_id, app_settings.redis_url ) if not cancelled: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Nessun job di reindex in corso per questo tenant", ) status_data = await IndexingService.get_job_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data) # ─── Scansione allegati ─────────────────────────────────────────────────────── @router.get( "/indexing/rescan-status", response_model=IndexingJobStatus, summary="Stato job scansione allegati in corso", description=( "Restituisce lo stato del job di scansione allegati per il tenant: " "idle, running (con progresso), completed, failed o cancelled." ), ) async def get_rescan_status( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() status_data = await IndexingService.get_rescan_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data) # ─── Conservazione sostitutiva – statistiche e trigger ─────────────────────── def _build_conservation_stats( total: int, conserved: int, pending: int, last_conserved_at, ) -> ConservationStats: not_queued = max(0, total - conserved - pending) coverage_pct = round((conserved / total * 100), 1) if total > 0 else 0.0 last_dt = last_conserved_at.isoformat() if last_conserved_at else None return ConservationStats( total_messages=total, conserved=conserved, pending=pending, not_queued=not_queued, coverage_pct=coverage_pct, last_conserved_at=last_dt, ) @router.get( "/conservation/stats", response_model=ConservationStatsResponse, summary="Statistiche conservazione sostitutiva", description=( "Restituisce il numero di messaggi totali, conservati, in coda e non accodati. " "admin: statistiche del proprio tenant. " "super_admin con ?tenant_id=: statistiche del tenant specificato. " "super_admin senza tenant_id: statistiche aggregate su tutti i tenant + breakdown per-tenant." ), ) async def get_conservation_stats( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare; assente = tutti i tenant", ), ) -> ConservationStatsResponse: from sqlalchemy import func, select from app.models.message import Message from app.models.tenant import Tenant # Se non e' super_admin, oppure e' super_admin con tenant_id specificato -> singolo tenant if current_user.role != "super_admin" or tenant_id is not None: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) row = await db.execute( select( func.count().label("total"), func.count().filter(Message.is_conserved == True).label("conserved"), # noqa: E712 func.count().filter( Message.is_pending_conservation == True, # noqa: E712 Message.is_conserved == False, # noqa: E712 ).label("pending"), func.max(Message.conserved_at).label("last_conserved_at"), ).where(Message.tenant_id == target_tenant_id) ) r = row.one() stats = _build_conservation_stats( total=r.total or 0, conserved=r.conserved or 0, pending=r.pending or 0, last_conserved_at=r.last_conserved_at, ) return ConservationStatsResponse(stats=stats, per_tenant=None) # super_admin senza tenant_id: aggregato + per-tenant rows = await db.execute( select( Message.tenant_id, func.count().label("total"), func.count().filter(Message.is_conserved == True).label("conserved"), # noqa: E712 func.count().filter( Message.is_pending_conservation == True, # noqa: E712 Message.is_conserved == False, # noqa: E712 ).label("pending"), func.max(Message.conserved_at).label("last_conserved_at"), ).group_by(Message.tenant_id) ) tenant_rows = rows.all() # Carica nomi tenant all_tenant_ids = [r.tenant_id for r in tenant_rows] tenant_names: dict[uuid.UUID, tuple[str, str]] = {} if all_tenant_ids: t_rows = await db.execute( select(Tenant.id, Tenant.name, Tenant.slug).where(Tenant.id.in_(all_tenant_ids)) ) for t in t_rows.all(): tenant_names[t.id] = (t.name, t.slug) per_tenant: list[ConservationTenantBreakdown] = [] agg_total = agg_conserved = agg_pending = 0 agg_last: object = None for r in tenant_rows: agg_total += r.total or 0 agg_conserved += r.conserved or 0 agg_pending += r.pending or 0 if r.last_conserved_at and (agg_last is None or r.last_conserved_at > agg_last): agg_last = r.last_conserved_at name, slug = tenant_names.get(r.tenant_id, ("(sconosciuto)", "")) per_tenant.append(ConservationTenantBreakdown( tenant_id=str(r.tenant_id), tenant_name=name, tenant_slug=slug, stats=_build_conservation_stats( total=r.total or 0, conserved=r.conserved or 0, pending=r.pending or 0, last_conserved_at=r.last_conserved_at, ), )) aggregate = _build_conservation_stats( total=agg_total, conserved=agg_conserved, pending=agg_pending, last_conserved_at=agg_last, ) return ConservationStatsResponse(stats=aggregate, per_tenant=per_tenant) @router.post( "/conservation/trigger", response_model=ConservationTriggerResult, status_code=status.HTTP_202_ACCEPTED, summary="Avvia manualmente il job di conservazione sostitutiva", description=( "Accoda immediatamente il job run_conservation nel worker arq. " "Il job elabora tutti i tenant con messaggi pendenti di conservazione. " "Utile per eseguire il ciclo di conservazione senza attendere il cron giornaliero. " "Solo admin e super_admin possono avviarlo." ), ) async def trigger_conservation( current_user: AdminUser, db: DB, ) -> ConservationTriggerResult: app_settings = get_app_settings() try: import urllib.parse from arq.connections import RedisSettings, create_pool parsed = urllib.parse.urlparse(app_settings.redis_url) redis_settings = RedisSettings( host=parsed.hostname or "localhost", port=parsed.port or 6379, database=int(parsed.path.lstrip("/") or "0"), password=parsed.password or None, ) redis = await create_pool(redis_settings) job = await redis.enqueue_job("run_conservation") await redis.aclose() job_id = job.job_id if job else None return ConservationTriggerResult( queued=True, message="Job di conservazione accodato con successo. Verra' eseguito dal worker entro pochi secondi.", job_id=job_id, ) except Exception as exc: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Impossibile accodare il job: {exc}", ) @router.post( "/indexing/rescan", response_model=IndexingJobStatus, status_code=status.HTTP_202_ACCEPTED, summary="Avvia job di scansione allegati", description=( "Avvia un job di scansione allegati in background. " "force=false (default): estrae il testo solo dagli allegati non ancora elaborati. " "force=true: ri-estrae il testo da tutti gli allegati del tenant. " "Al termine di ogni batch aggiorna anche il search_vector dei messaggi interessati. " "Restituisce 409 se un job di scansione o reindex e' gia' in corso." ), ) async def start_rescan( body: StartRescanRequest, current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() try: await IndexingService.start_rescan( tenant_id=target_tenant_id, started_by_email=current_user.email, redis_url=app_settings.redis_url, db_url=app_settings.database_url, force=body.force, ) except ValueError as exc: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=str(exc), ) status_data = await IndexingService.get_rescan_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data) @router.delete( "/indexing/rescan", response_model=IndexingJobStatus, summary="Cancella job di scansione allegati in corso", description=( "Invia il segnale di cancellazione al job di scansione allegati in corso. " "Il job si fermera' alla fine del batch corrente. " "Se non c'e' nessun job in corso, ritorna 404." ), ) async def cancel_rescan( current_user: AdminUser, db: DB, tenant_id: Optional[uuid.UUID] = Query( default=None, description="(solo super_admin) UUID del tenant su cui operare", ), ) -> IndexingJobStatus: target_tenant_id = _resolve_tenant_id(current_user, tenant_id) app_settings = get_app_settings() cancelled = await IndexingService.cancel_rescan( target_tenant_id, app_settings.redis_url ) if not cancelled: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Nessun job di scansione allegati in corso per questo tenant", ) status_data = await IndexingService.get_rescan_status( target_tenant_id, app_settings.redis_url ) return IndexingJobStatus(**status_data)