OCR + reportistica

This commit is contained in:
2026-03-27 13:54:07 +01:00
parent cbeedc2d2f
commit bb2060c1ae
26 changed files with 5503 additions and 237 deletions
+4
View File
@@ -5,6 +5,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
gcc \
libpq-dev \
tesseract-ocr \
tesseract-ocr-ita \
tesseract-ocr-eng \
poppler-utils \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
+114
View File
@@ -0,0 +1,114 @@
"""
Router Reports Dashboard e Reportistica (Fase 7).
Endpoint:
GET /reports/summary KPI + serie storica + breakdown caselle
GET /reports/export export CSV o PDF
Permessi:
- Tutti gli utenti autenticati possono accedere.
- Admin e super_admin: vedono tutto il tenant.
- Operator/Supervisor/Readonly: vedono solo le caselle su cui hanno can_read.
"""
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Query
from fastapi.responses import Response
from app.dependencies import CurrentUser, DB
from app.schemas.reports import ReportSummaryResponse
from app.services.report_service import ReportService
router = APIRouter(prefix="/reports", tags=["Reports"])
async def _get_visible_ids(user, db) -> Optional[list[uuid.UUID]]:
"""Restituisce None per admin (nessun filtro), lista per non-admin."""
if user.is_admin:
return None
from app.services.permission_service import PermissionService
svc = PermissionService(db)
return await svc.get_visible_mailboxes(user)
# ─── GET /reports/summary ─────────────────────────────────────────────────────
@router.get("/summary", response_model=ReportSummaryResponse)
async def get_report_summary(
current_user: CurrentUser,
db: DB,
days: int = Query(7, ge=1, le=365, description="Periodo in giorni per la serie storica"),
) -> ReportSummaryResponse:
"""
Restituisce il riepilogo completo per la dashboard:
- KPI (PEC ricevute/inviate oggi, anomalie, tasso consegna, ...)
- Serie storica giornaliera per il grafico a barre
- Distribuzione stati outbound per il grafico a torta
- Statistiche per casella
"""
visible = await _get_visible_ids(current_user, db)
svc = ReportService(db)
return await svc.get_summary(
tenant_id=current_user.tenant_id,
period_days=days,
visible_mailbox_ids=visible,
)
# ─── GET /reports/export ──────────────────────────────────────────────────────
@router.get("/export")
async def export_report(
current_user: CurrentUser,
db: DB,
format: str = Query("csv", pattern="^(csv|pdf)$", description="Formato: csv o pdf"),
date_from: Optional[datetime] = Query(None, description="Data inizio (ISO 8601)"),
date_to: Optional[datetime] = Query(None, description="Data fine (ISO 8601)"),
mailbox_id: Optional[uuid.UUID] = Query(None, description="Filtra per casella specifica"),
) -> Response:
"""
Esporta i dati in formato CSV o PDF.
- CSV: lista completa dei messaggi del periodo con tutti i metadati.
- PDF: riepilogo KPI + tabella caselle (generato con reportlab).
"""
visible = await _get_visible_ids(current_user, db)
svc = ReportService(db)
now_str = datetime.now().strftime("%Y%m%d_%H%M%S")
if format == "csv":
data = await svc.export_csv(
tenant_id=current_user.tenant_id,
date_from=date_from,
date_to=date_to,
mailbox_id=mailbox_id,
visible_mailbox_ids=visible,
)
return Response(
content=data,
media_type="text/csv; charset=utf-8-sig",
headers={
"Content-Disposition": f'attachment; filename="pechub_report_{now_str}.csv"',
"Content-Length": str(len(data)),
},
)
# PDF
data = await svc.export_pdf(
tenant_id=current_user.tenant_id,
date_from=date_from,
date_to=date_to,
visible_mailbox_ids=visible,
)
return Response(
content=data,
media_type="application/pdf",
headers={
"Content-Disposition": f'attachment; filename="pechub_report_{now_str}.pdf"',
"Content-Length": str(len(data)),
},
)
+289 -11
View File
@@ -1,23 +1,57 @@
"""
Router Impostazioni Tenant (Fase 6).
Router Impostazioni Tenant.
Endpoint:
GET /settings legge le impostazioni del tenant corrente (admin)
PUT /settings aggiorna le impostazioni del tenant corrente (admin)
Endpoint esistenti:
GET /settings -> legge le impostazioni del tenant corrente (admin)
PUT /settings -> aggiorna le impostazioni del tenant corrente (admin)
Solo gli admin e super_admin possono accedere.
La sezione "archiviazione" gestisce il toggle mock/produzione per il
conservatore AgID (Fase 6 Archiviazione Sostitutiva).
Endpoint indicizzazione full-text (Fase 8):
GET /settings/indexing/stats -> statistiche copertura indicizzazione
GET /settings/indexing/status -> stato job reindex in corso
POST /settings/indexing/reindex -> avvia reindex (full o differential)
DELETE /settings/indexing/reindex -> cancella job in corso
Solo admin e super_admin possono accedere.
Il super_admin puo' operare su qualsiasi tenant tramite query param ?tenant_id=<uuid>.
"""
from fastapi import APIRouter
import uuid
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, Query, status
from app.config import get_settings as get_app_settings
from app.dependencies import AdminUser, DB
from app.schemas.tenant_settings import TenantSettingsResponse, TenantSettingsUpdate
from app.schemas.tenant_settings import (
IndexingJobStatus,
IndexingStats,
StartReindexRequest,
StartRescanRequest,
TenantSettingsResponse,
TenantSettingsUpdate,
)
from app.services.indexing_service import IndexingService
from app.services.tenant_settings_service import TenantSettingsService
router = APIRouter(prefix="/settings", tags=["Impostazioni"])
# ─── Helper tenant_id resolution ─────────────────────────────────────────────
def _resolve_tenant_id(
current_user: AdminUser,
tenant_id_param: Optional[uuid.UUID] = None,
) -> uuid.UUID:
"""
Risolve il tenant_id da usare per l'operazione.
- super_admin: puo' passare un tenant_id arbitrario
- admin: usa sempre il proprio tenant_id (tenant_id_param ignorato)
"""
if current_user.role == "super_admin" and tenant_id_param is not None:
return tenant_id_param
return current_user.tenant_id
# ─── Impostazioni generali ────────────────────────────────────────────────────
@router.get(
"",
@@ -25,7 +59,7 @@ router = APIRouter(prefix="/settings", tags=["Impostazioni"])
summary="Legge le impostazioni del tenant",
description=(
"Restituisce la configurazione operativa del tenant: "
"modalità archiviazione (mock/produzione), endpoint e stato credenziali conservatore."
"modalita' archiviazione (mock/produzione), endpoint e stato credenziali conservatore."
),
)
async def get_settings(
@@ -44,7 +78,7 @@ async def get_settings(
description=(
"Aggiorna la configurazione operativa del tenant. "
"Tutti i campi sono opzionali (semantica PATCH). "
"Il passaggio a modalità 'production' richiede un endpoint conservatore configurato."
"Il passaggio a modalita' 'production' richiede un endpoint conservatore configurato."
),
)
async def update_settings(
@@ -57,3 +91,247 @@ async def update_settings(
await db.commit()
await db.refresh(settings)
return TenantSettingsService.to_response(settings)
# ─── Indicizzazione full-text ─────────────────────────────────────────────────
@router.get(
"/indexing/stats",
response_model=IndexingStats,
summary="Statistiche indicizzazione full-text",
description=(
"Restituisce il numero di messaggi totali, indicizzati e non indicizzati "
"per il tenant, con percentuale di copertura. "
"Include anche le statistiche sugli allegati PDF/DOCX con testo estratto. "
"Il super_admin puo' specificare ?tenant_id=<uuid> per un tenant arbitrario."
),
)
async def get_indexing_stats(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingStats:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
service = IndexingService(db)
stats = await service.get_stats(target_tenant_id)
return IndexingStats(**stats)
@router.get(
"/indexing/status",
response_model=IndexingJobStatus,
summary="Stato job indicizzazione in corso",
description=(
"Restituisce lo stato del job di reindex per il tenant: "
"idle, running (con progresso), completed, failed o cancelled. "
"Se il job e' running da piu' di 2 ore, il flag is_stale e' True."
),
)
async def get_indexing_status(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.post(
"/indexing/reindex",
response_model=IndexingJobStatus,
status_code=status.HTTP_202_ACCEPTED,
summary="Avvia job di reindex",
description=(
"Avvia un job di reindex full-text in background. "
"mode='differential' indicizza solo i messaggi con search_vector NULL (piu' veloce). "
"mode='full' riscrive il vettore di tutti i messaggi del tenant. "
"Restituisce 409 se un job e' gia' in corso."
),
)
async def start_reindex(
body: StartReindexRequest,
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
try:
await IndexingService.start_reindex(
tenant_id=target_tenant_id,
mode=body.mode,
started_by_email=current_user.email,
redis_url=app_settings.redis_url,
db_url=app_settings.database_url,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(exc),
)
# Ritorna lo stato appena creato
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.delete(
"/indexing/reindex",
response_model=IndexingJobStatus,
summary="Cancella job di reindex in corso",
description=(
"Invia il segnale di cancellazione al job di reindex in corso. "
"Il job si fermera' alla fine del batch corrente (max qualche secondo). "
"Se non c'e' nessun job in corso, ritorna 404."
),
)
async def cancel_reindex(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
cancelled = await IndexingService.cancel_reindex(
target_tenant_id, app_settings.redis_url
)
if not cancelled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nessun job di reindex in corso per questo tenant",
)
status_data = await IndexingService.get_job_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
# ─── Scansione allegati ───────────────────────────────────────────────────────
@router.get(
"/indexing/rescan-status",
response_model=IndexingJobStatus,
summary="Stato job scansione allegati in corso",
description=(
"Restituisce lo stato del job di scansione allegati per il tenant: "
"idle, running (con progresso), completed, failed o cancelled."
),
)
async def get_rescan_status(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.post(
"/indexing/rescan",
response_model=IndexingJobStatus,
status_code=status.HTTP_202_ACCEPTED,
summary="Avvia job di scansione allegati",
description=(
"Avvia un job di scansione allegati in background. "
"force=false (default): estrae il testo solo dagli allegati non ancora elaborati. "
"force=true: ri-estrae il testo da tutti gli allegati del tenant. "
"Al termine di ogni batch aggiorna anche il search_vector dei messaggi interessati. "
"Restituisce 409 se un job di scansione o reindex e' gia' in corso."
),
)
async def start_rescan(
body: StartRescanRequest,
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
try:
await IndexingService.start_rescan(
tenant_id=target_tenant_id,
started_by_email=current_user.email,
redis_url=app_settings.redis_url,
db_url=app_settings.database_url,
force=body.force,
)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=str(exc),
)
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
@router.delete(
"/indexing/rescan",
response_model=IndexingJobStatus,
summary="Cancella job di scansione allegati in corso",
description=(
"Invia il segnale di cancellazione al job di scansione allegati in corso. "
"Il job si fermera' alla fine del batch corrente. "
"Se non c'e' nessun job in corso, ritorna 404."
),
)
async def cancel_rescan(
current_user: AdminUser,
db: DB,
tenant_id: Optional[uuid.UUID] = Query(
default=None,
description="(solo super_admin) UUID del tenant su cui operare",
),
) -> IndexingJobStatus:
target_tenant_id = _resolve_tenant_id(current_user, tenant_id)
app_settings = get_app_settings()
cancelled = await IndexingService.cancel_rescan(
target_tenant_id, app_settings.redis_url
)
if not cancelled:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nessun job di scansione allegati in corso per questo tenant",
)
status_data = await IndexingService.get_rescan_status(
target_tenant_id, app_settings.redis_url
)
return IndexingJobStatus(**status_data)
+2 -1
View File
@@ -13,7 +13,7 @@ from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_address
from app.api.v1 import auth, labels, mailboxes, messages, notifications, permissions, send, tenants, users, virtual_boxes, ws
from app.api.v1 import auth, labels, mailboxes, messages, notifications, permissions, reports, send, tenants, users, virtual_boxes, ws
from app.api.v1 import settings as settings_router
from app.config import get_settings
from app.core.logging import get_logger, setup_logging
@@ -96,6 +96,7 @@ app.include_router(virtual_boxes.router, prefix=API_PREFIX)
app.include_router(notifications.router, prefix=API_PREFIX)
app.include_router(labels.router, prefix=API_PREFIX)
app.include_router(settings_router.router, prefix=API_PREFIX)
app.include_router(reports.router, prefix=API_PREFIX)
# ─── Health check ─────────────────────────────────────────────────────────────
+75
View File
@@ -0,0 +1,75 @@
"""
Schemi Pydantic per la Dashboard e Reportistica (Fase 7).
"""
from datetime import date, datetime
from typing import Optional
import uuid
from pydantic import BaseModel, Field
class KpiSummary(BaseModel):
"""Contatori KPI principali del tenant."""
# Oggi
received_today: int = Field(0, description="PEC ricevute oggi")
sent_today: int = Field(0, description="PEC inviate oggi (outbound)")
# Ultimi 7 giorni
received_7d: int = Field(0, description="PEC ricevute negli ultimi 7 giorni")
sent_7d: int = Field(0, description="PEC inviate negli ultimi 7 giorni")
# Ultimi 30 giorni
received_30d: int = Field(0, description="PEC ricevute negli ultimi 30 giorni")
sent_30d: int = Field(0, description="PEC inviate negli ultimi 30 giorni")
# Stato
anomalie_attive: int = Field(0, description="Messaggi outbound in stato anomaly")
tasso_consegna: float = Field(0.0, description="Percentuale consegna (0-100)")
caselle_in_errore: int = Field(0, description="Caselle con status=error")
messaggi_non_letti: int = Field(0, description="Messaggi inbound non letti")
# Totali assoluti
totale_messaggi: int = Field(0, description="Totale messaggi nel tenant")
class DailyStat(BaseModel):
"""Statistiche giornaliere per il grafico a barre."""
day: date = Field(..., description="Data (YYYY-MM-DD)")
received: int = Field(0, description="PEC ricevute in quel giorno")
sent: int = Field(0, description="PEC inviate in quel giorno")
class OutboundStateStat(BaseModel):
"""Conteggio messaggi outbound per stato (per il grafico a torta)."""
state: str
count: int
class MailboxStat(BaseModel):
"""Statistiche per singola casella."""
mailbox_id: uuid.UUID
email_address: str
display_name: Optional[str] = None
status: str
received_total: int = 0
sent_total: int = 0
anomalie: int = 0
non_letti: int = 0
last_sync_at: Optional[datetime] = None
class ReportSummaryResponse(BaseModel):
"""Risposta completa dell'endpoint /reports/summary."""
generated_at: datetime
period_days: int = Field(..., description="Numero di giorni del periodo selezionato")
kpi: KpiSummary
daily_stats: list[DailyStat] = Field(default_factory=list)
outbound_states: list[OutboundStateStat] = Field(default_factory=list)
mailbox_stats: list[MailboxStat] = Field(default_factory=list)
+53 -2
View File
@@ -1,12 +1,13 @@
"""
Schema Pydantic per TenantSettings lettura e aggiornamento impostazioni tenant.
Include schemi per il modulo di indicizzazione full-text.
"""
import uuid
from datetime import datetime
from typing import Literal
from typing import Literal, Optional
from pydantic import BaseModel, Field, HttpUrl, field_validator
from pydantic import BaseModel, Field, field_validator
ArchivalMode = Literal["mock", "production"]
@@ -68,3 +69,53 @@ class TenantSettingsUpdate(BaseModel):
if v is not None and v not in ("mock", "production"):
raise ValueError("archival_mode deve essere 'mock' o 'production'")
return v
# ─── Schemi indicizzazione full-text ──────────────────────────────────────────
class IndexingStats(BaseModel):
"""Statistiche di copertura dell'indicizzazione per un tenant."""
total_messages: int
indexed_messages: int
unindexed_messages: int
coverage_pct: float # percentuale messaggi con search_vector != NULL
attachments_total: int # allegati PDF/DOCX totali
attachments_extracted: int # allegati con testo estratto
attachments_pct: float # percentuale allegati con testo estratto
class IndexingJobStatus(BaseModel):
"""Stato di un job di reindex in corso o completato."""
status: str # idle | running | completed | failed | cancelled
mode: Optional[str] = None # full | differential
total: int = 0
processed: int = 0
progress_pct: float = 0.0
started_at: Optional[str] = None # ISO datetime
finished_at: Optional[str] = None # ISO datetime
started_by: Optional[str] = None # email utente che ha avviato il job
elapsed_seconds: Optional[int] = None
is_stale: bool = False # True se running da piu' di STALE_THRESHOLD_HOURS
error: Optional[str] = None
class StartReindexRequest(BaseModel):
"""Body per POST /settings/indexing/reindex."""
mode: Literal["full", "differential"] = "differential"
class StartRescanRequest(BaseModel):
"""Body per POST /settings/indexing/rescan."""
force: bool = Field(
default=False,
description=(
"False (default): estrae solo allegati con extracted_text NULL. "
"True: ri-estrae tutti gli allegati, sovrascrivendo i testi gia' presenti."
),
)
File diff suppressed because it is too large Load Diff
+594
View File
@@ -0,0 +1,594 @@
"""
ReportService calcola KPI, serie storiche e produce export CSV/PDF.
Non richiede migrazioni: lavora sulle tabelle messages e mailboxes esistenti.
"""
import csv
import io
import uuid
from datetime import date, datetime, timedelta, timezone
from typing import AsyncGenerator, Optional
from sqlalchemy import case, func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
from app.models.mailbox import Mailbox
from app.models.message import Message
from app.schemas.reports import (
DailyStat,
KpiSummary,
MailboxStat,
OutboundStateStat,
ReportSummaryResponse,
)
logger = get_logger(__name__)
class ReportService:
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ─── KPI principali ──────────────────────────────────────────────────────
async def get_summary(
self,
tenant_id: uuid.UUID,
period_days: int = 7,
visible_mailbox_ids: Optional[list[uuid.UUID]] = None,
) -> ReportSummaryResponse:
"""
Restituisce il riepilogo completo per la dashboard.
visible_mailbox_ids: se None l'utente e admin e vede tutto il tenant,
altrimenti filtra sulle caselle accessibili.
"""
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
d7_start = now - timedelta(days=7)
d30_start = now - timedelta(days=30)
# ── Filtro base tenant + caselle visibili ────────────────────────────
def _base(q):
q = q.where(Message.tenant_id == tenant_id)
if visible_mailbox_ids is not None:
if not visible_mailbox_ids:
# Nessuna casella visibile: ritorna subito valori zero
return None
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
return q
async def _count(q) -> int:
q = _base(q)
if q is None:
return 0
r = await self.db.execute(q)
return r.scalar_one() or 0
# PEC ricevute
received_today = await _count(
select(func.count(Message.id)).where(
Message.direction == "inbound",
Message.received_at >= today_start,
)
)
received_7d = await _count(
select(func.count(Message.id)).where(
Message.direction == "inbound",
Message.received_at >= d7_start,
)
)
received_30d = await _count(
select(func.count(Message.id)).where(
Message.direction == "inbound",
Message.received_at >= d30_start,
)
)
# PEC inviate
sent_today = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.sent_at >= today_start,
)
)
sent_7d = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.sent_at >= d7_start,
)
)
sent_30d = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.sent_at >= d30_start,
)
)
# Anomalie (outbound con state=anomaly, senza genitore)
anomalie = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.state == "anomaly",
)
)
# Tasso consegna: delivered / (delivered + anomaly + failed)
delivered = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.state == "delivered",
)
)
failed = await _count(
select(func.count(Message.id)).where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.state.in_(["anomaly", "failed"]),
)
)
total_terminal = delivered + failed
tasso_consegna = round((delivered / total_terminal * 100), 1) if total_terminal > 0 else 0.0
# Non letti
non_letti = await _count(
select(func.count(Message.id)).where(
Message.direction == "inbound",
Message.is_read == False, # noqa: E712
Message.is_trashed == False, # noqa: E712
)
)
# Totale messaggi
totale = await _count(select(func.count(Message.id)))
# Caselle in errore (NON filtrato per visible_mailbox_ids e una info admin)
caselle_errore_r = await self.db.execute(
select(func.count(Mailbox.id)).where(
Mailbox.tenant_id == tenant_id,
Mailbox.status == "error",
)
)
caselle_errore = caselle_errore_r.scalar_one() or 0
kpi = KpiSummary(
received_today=received_today,
sent_today=sent_today,
received_7d=received_7d,
sent_7d=sent_7d,
received_30d=received_30d,
sent_30d=sent_30d,
anomalie_attive=anomalie,
tasso_consegna=tasso_consegna,
caselle_in_errore=caselle_errore,
messaggi_non_letti=non_letti,
totale_messaggi=totale,
)
# ── Serie storica giornaliera ─────────────────────────────────────────
daily_stats = await self._get_daily_stats(tenant_id, period_days, visible_mailbox_ids)
# ── Distribuzione stati outbound ──────────────────────────────────────
outbound_states = await self._get_outbound_states(tenant_id, visible_mailbox_ids)
# ── Statistiche per casella ───────────────────────────────────────────
mailbox_stats = await self._get_mailbox_stats(tenant_id, visible_mailbox_ids)
return ReportSummaryResponse(
generated_at=now,
period_days=period_days,
kpi=kpi,
daily_stats=daily_stats,
outbound_states=outbound_states,
mailbox_stats=mailbox_stats,
)
# ─── Serie storica ────────────────────────────────────────────────────────
async def _get_daily_stats(
self,
tenant_id: uuid.UUID,
days: int,
visible_mailbox_ids: Optional[list[uuid.UUID]],
) -> list[DailyStat]:
"""Conta PEC ricevute e inviate per ciascuno degli ultimi `days` giorni."""
since = datetime.now(timezone.utc) - timedelta(days=days)
def _apply_filters(q):
q = q.where(Message.tenant_id == tenant_id)
if visible_mailbox_ids is not None:
if not visible_mailbox_ids:
return None
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
return q
# Aggregazione ricevute per giorno
q_recv = (
select(
func.date_trunc("day", Message.received_at).label("day"),
func.count(Message.id).label("cnt"),
)
.where(
Message.direction == "inbound",
Message.received_at >= since,
)
.group_by(text("day"))
.order_by(text("day"))
)
q_recv = _apply_filters(q_recv)
# Aggregazione inviate per giorno
q_sent = (
select(
func.date_trunc("day", Message.sent_at).label("day"),
func.count(Message.id).label("cnt"),
)
.where(
Message.direction == "outbound",
Message.parent_message_id.is_(None),
Message.sent_at >= since,
)
.group_by(text("day"))
.order_by(text("day"))
)
q_sent = _apply_filters(q_sent)
recv_map: dict[date, int] = {}
sent_map: dict[date, int] = {}
if q_recv is not None:
r = await self.db.execute(q_recv)
for row in r.all():
if row.day:
d = row.day.date() if hasattr(row.day, "date") else row.day
recv_map[d] = row.cnt
if q_sent is not None:
r = await self.db.execute(q_sent)
for row in r.all():
if row.day:
d = row.day.date() if hasattr(row.day, "date") else row.day
sent_map[d] = row.cnt
# Costruisce la serie completa (tutti i giorni, anche quelli a zero)
result: list[DailyStat] = []
for i in range(days, -1, -1):
d = (datetime.now(timezone.utc) - timedelta(days=i)).date()
result.append(DailyStat(
day=d,
received=recv_map.get(d, 0),
sent=sent_map.get(d, 0),
))
return result
# ─── Distribuzione stati outbound ────────────────────────────────────────
async def _get_outbound_states(
self,
tenant_id: uuid.UUID,
visible_mailbox_ids: Optional[list[uuid.UUID]],
) -> list[OutboundStateStat]:
q = (
select(Message.state, func.count(Message.id).label("cnt"))
.where(
Message.tenant_id == tenant_id,
Message.direction == "outbound",
Message.parent_message_id.is_(None),
)
.group_by(Message.state)
)
if visible_mailbox_ids is not None:
if not visible_mailbox_ids:
return []
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
r = await self.db.execute(q)
return [OutboundStateStat(state=row.state, count=row.cnt) for row in r.all()]
# ─── Statistiche per casella ──────────────────────────────────────────────
async def _get_mailbox_stats(
self,
tenant_id: uuid.UUID,
visible_mailbox_ids: Optional[list[uuid.UUID]],
) -> list[MailboxStat]:
# Carica le caselle
mb_q = select(Mailbox).where(Mailbox.tenant_id == tenant_id)
if visible_mailbox_ids is not None:
if not visible_mailbox_ids:
return []
mb_q = mb_q.where(Mailbox.id.in_(visible_mailbox_ids))
mb_result = await self.db.execute(mb_q)
mailboxes = mb_result.scalars().all()
if not mailboxes:
return []
mailbox_ids = [m.id for m in mailboxes]
mailbox_map = {m.id: m for m in mailboxes}
# Aggregazione messaggi per casella e direction
agg_q = (
select(
Message.mailbox_id,
Message.direction,
Message.state,
Message.is_read,
func.count(Message.id).label("cnt"),
)
.where(
Message.tenant_id == tenant_id,
Message.mailbox_id.in_(mailbox_ids),
Message.parent_message_id.is_(None),
)
.group_by(Message.mailbox_id, Message.direction, Message.state, Message.is_read)
)
agg_result = await self.db.execute(agg_q)
# Accumula per casella
stats: dict[uuid.UUID, MailboxStat] = {}
for mb in mailboxes:
stats[mb.id] = MailboxStat(
mailbox_id=mb.id,
email_address=mb.email_address,
display_name=mb.display_name,
status=mb.status,
last_sync_at=mb.last_sync_at,
)
for row in agg_result.all():
s = stats.get(row.mailbox_id)
if not s:
continue
if row.direction == "inbound":
s.received_total += row.cnt
if not row.is_read:
s.non_letti += row.cnt
elif row.direction == "outbound":
s.sent_total += row.cnt
if row.state == "anomaly":
s.anomalie += row.cnt
# Ordina per volume decrescente
return sorted(
stats.values(),
key=lambda x: x.received_total + x.sent_total,
reverse=True,
)
# ─── Export CSV ───────────────────────────────────────────────────────────
async def export_csv(
self,
tenant_id: uuid.UUID,
date_from: Optional[datetime],
date_to: Optional[datetime],
mailbox_id: Optional[uuid.UUID],
visible_mailbox_ids: Optional[list[uuid.UUID]],
) -> bytes:
"""Genera un CSV con tutti i messaggi del periodo."""
q = (
select(
Message.id,
Message.direction,
Message.state,
Message.pec_type,
Message.subject,
Message.from_address,
Message.received_at,
Message.sent_at,
Message.size_bytes,
Message.is_read,
Message.has_attachments,
Mailbox.email_address.label("mailbox_email"),
)
.join(Mailbox, Message.mailbox_id == Mailbox.id)
.where(
Message.tenant_id == tenant_id,
Message.parent_message_id.is_(None),
)
.order_by(Message.received_at.desc().nullslast(), Message.created_at.desc())
)
if visible_mailbox_ids is not None:
if not visible_mailbox_ids:
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(CSV_HEADERS)
return buf.getvalue().encode("utf-8-sig")
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
if mailbox_id:
q = q.where(Message.mailbox_id == mailbox_id)
if date_from:
q = q.where(
(Message.received_at >= date_from) | (Message.sent_at >= date_from)
)
if date_to:
q = q.where(
(Message.received_at <= date_to) | (Message.sent_at <= date_to)
)
result = await self.db.execute(q)
rows = result.all()
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(CSV_HEADERS)
for r in rows:
ts = r.received_at or r.sent_at
writer.writerow([
str(r.id),
r.mailbox_email or "",
r.direction or "",
r.state or "",
r.pec_type or "",
r.subject or "",
r.from_address or "",
ts.strftime("%Y-%m-%d %H:%M:%S") if ts else "",
r.size_bytes or "",
"Si" if r.is_read else "No",
"Si" if r.has_attachments else "No",
])
return buf.getvalue().encode("utf-8-sig")
# ─── Export PDF ───────────────────────────────────────────────────────────
async def export_pdf(
self,
tenant_id: uuid.UUID,
date_from: Optional[datetime],
date_to: Optional[datetime],
visible_mailbox_ids: Optional[list[uuid.UUID]],
) -> bytes:
"""
Genera un PDF di riepilogo con KPI e tabella caselle.
Usa reportlab (puro Python, nessuna dipendenza di sistema).
"""
try:
from reportlab.lib import colors
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import cm
from reportlab.platypus import (
Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle,
)
except ImportError:
raise RuntimeError(
"reportlab non installato. Aggiungere 'reportlab>=4.2.0' "
"alle dipendenze del backend."
)
summary = await self.get_summary(tenant_id, 30, visible_mailbox_ids)
now_str = summary.generated_at.strftime("%d/%m/%Y %H:%M")
buf = io.BytesIO()
doc = SimpleDocTemplate(
buf,
pagesize=A4,
leftMargin=2 * cm,
rightMargin=2 * cm,
topMargin=2 * cm,
bottomMargin=2 * cm,
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle(
"Title", parent=styles["Title"], fontSize=18, spaceAfter=6,
)
subtitle_style = ParagraphStyle(
"Subtitle", parent=styles["Normal"], fontSize=10, textColor=colors.grey, spaceAfter=20,
)
heading_style = ParagraphStyle(
"Heading2", parent=styles["Heading2"], fontSize=13, spaceBefore=14, spaceAfter=6,
)
story = []
# Intestazione
story.append(Paragraph("PEChub Report Attivita PEC", title_style))
date_range = ""
if date_from:
date_range += f"Dal {date_from.strftime('%d/%m/%Y')} "
if date_to:
date_range += f"Al {date_to.strftime('%d/%m/%Y')} "
story.append(Paragraph(
f"Generato il {now_str} {date_range}",
subtitle_style,
))
# Sezione KPI
story.append(Paragraph("Indicatori Chiave (ultimi 30 giorni)", heading_style))
kpi = summary.kpi
kpi_data = [
["Indicatore", "Valore"],
["PEC ricevute oggi", str(kpi.received_today)],
["PEC inviate oggi", str(kpi.sent_today)],
["PEC ricevute (7 gg)", str(kpi.received_7d)],
["PEC inviate (7 gg)", str(kpi.sent_7d)],
["PEC ricevute (30 gg)", str(kpi.received_30d)],
["PEC inviate (30 gg)", str(kpi.sent_30d)],
["Anomalie attive", str(kpi.anomalie_attive)],
["Tasso di consegna", f"{kpi.tasso_consegna}%"],
["Caselle in errore", str(kpi.caselle_in_errore)],
["Messaggi non letti", str(kpi.messaggi_non_letti)],
]
kpi_table = Table(kpi_data, colWidths=[10 * cm, 5 * cm])
kpi_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1e40af")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 10),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f0f4ff")]),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("ALIGN", (1, 0), (1, -1), "RIGHT"),
("LEFTPADDING", (0, 0), (-1, -1), 8),
("RIGHTPADDING", (0, 0), (-1, -1), 8),
("TOPPADDING", (0, 0), (-1, -1), 5),
("BOTTOMPADDING", (0, 0), (-1, -1), 5),
]))
story.append(kpi_table)
story.append(Spacer(1, 0.5 * cm))
# Sezione caselle
if summary.mailbox_stats:
story.append(Paragraph("Dettaglio per Casella", heading_style))
mb_header = ["Casella", "Stato", "Ricevute", "Inviate", "Anomalie", "Non letti"]
mb_data = [mb_header]
for ms in summary.mailbox_stats:
mb_data.append([
ms.email_address,
ms.status,
str(ms.received_total),
str(ms.sent_total),
str(ms.anomalie),
str(ms.non_letti),
])
mb_table = Table(
mb_data,
colWidths=[6.5 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm],
)
mb_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1e40af")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, -1), 9),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f0f4ff")]),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("ALIGN", (1, 0), (-1, -1), "RIGHT"),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 4),
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
]))
story.append(mb_table)
doc.build(story)
return buf.getvalue()
# ─── Costanti ────────────────────────────────────────────────────────────────
CSV_HEADERS = [
"ID",
"Casella",
"Direzione",
"Stato",
"Tipo PEC",
"Oggetto",
"Mittente",
"Data/Ora",
"Dimensione (byte)",
"Letto",
"Allegati",
]
+12
View File
@@ -46,6 +46,15 @@ dependencies = [
# Storage MinIO/S3
"miniopy-async>=1.21.0",
# Estrazione testo allegati (usato anche dal job rescan nel backend)
"pypdf>=4.0.0",
"python-docx>=1.1.0",
# OCR per allegati image-only (immagini dirette e PDF scansionati)
"pytesseract>=0.3.13",
"pdf2image>=1.17.0",
"Pillow>=11.0.0",
# IMAP async (per test connessione nel backend + mailbox service)
"aioimaplib>=2.0.0",
@@ -58,6 +67,9 @@ dependencies = [
# Utilities
"python-multipart>=0.0.9", # upload file
"python-dotenv>=1.0.0",
# Generazione PDF report (puro Python, nessuna dipendenza di sistema)
"reportlab>=4.2.0",
]
[project.optional-dependencies]