mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
595 lines
22 KiB
Python
595 lines
22 KiB
Python
"""
|
||
ReportService – calcola KPI, serie storiche e produce export CSV/PDF.
|
||
|
||
Non richiede migrazioni: lavora sulle tabelle messages e mailboxes esistenti.
|
||
"""
|
||
|
||
import csv
|
||
import io
|
||
import uuid
|
||
from datetime import date, datetime, timedelta, timezone
|
||
from typing import AsyncGenerator, Optional
|
||
|
||
from sqlalchemy import case, func, select, text
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.logging import get_logger
|
||
from app.models.mailbox import Mailbox
|
||
from app.models.message import Message
|
||
from app.schemas.reports import (
|
||
DailyStat,
|
||
KpiSummary,
|
||
MailboxStat,
|
||
OutboundStateStat,
|
||
ReportSummaryResponse,
|
||
)
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class ReportService:
|
||
def __init__(self, db: AsyncSession) -> None:
|
||
self.db = db
|
||
|
||
# ─── KPI principali ──────────────────────────────────────────────────────
|
||
|
||
async def get_summary(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
period_days: int = 7,
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]] = None,
|
||
) -> ReportSummaryResponse:
|
||
"""
|
||
Restituisce il riepilogo completo per la dashboard.
|
||
|
||
visible_mailbox_ids: se None l'utente e admin e vede tutto il tenant,
|
||
altrimenti filtra sulle caselle accessibili.
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
d7_start = now - timedelta(days=7)
|
||
d30_start = now - timedelta(days=30)
|
||
|
||
# ── Filtro base tenant + caselle visibili ────────────────────────────
|
||
def _base(q):
|
||
q = q.where(Message.tenant_id == tenant_id)
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
# Nessuna casella visibile: ritorna subito valori zero
|
||
return None
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
return q
|
||
|
||
async def _count(q) -> int:
|
||
q = _base(q)
|
||
if q is None:
|
||
return 0
|
||
r = await self.db.execute(q)
|
||
return r.scalar_one() or 0
|
||
|
||
# PEC ricevute
|
||
received_today = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "inbound",
|
||
Message.received_at >= today_start,
|
||
)
|
||
)
|
||
received_7d = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "inbound",
|
||
Message.received_at >= d7_start,
|
||
)
|
||
)
|
||
received_30d = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "inbound",
|
||
Message.received_at >= d30_start,
|
||
)
|
||
)
|
||
|
||
# PEC inviate
|
||
sent_today = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.sent_at >= today_start,
|
||
)
|
||
)
|
||
sent_7d = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.sent_at >= d7_start,
|
||
)
|
||
)
|
||
sent_30d = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.sent_at >= d30_start,
|
||
)
|
||
)
|
||
|
||
# Anomalie (outbound con state=anomaly, senza genitore)
|
||
anomalie = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.state == "anomaly",
|
||
)
|
||
)
|
||
|
||
# Tasso consegna: delivered / (delivered + anomaly + failed)
|
||
delivered = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.state == "delivered",
|
||
)
|
||
)
|
||
failed = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.state.in_(["anomaly", "failed"]),
|
||
)
|
||
)
|
||
total_terminal = delivered + failed
|
||
tasso_consegna = round((delivered / total_terminal * 100), 1) if total_terminal > 0 else 0.0
|
||
|
||
# Non letti
|
||
non_letti = await _count(
|
||
select(func.count(Message.id)).where(
|
||
Message.direction == "inbound",
|
||
Message.is_read == False, # noqa: E712
|
||
Message.is_trashed == False, # noqa: E712
|
||
)
|
||
)
|
||
|
||
# Totale messaggi
|
||
totale = await _count(select(func.count(Message.id)))
|
||
|
||
# Caselle in errore (NON filtrato per visible_mailbox_ids – e una info admin)
|
||
caselle_errore_r = await self.db.execute(
|
||
select(func.count(Mailbox.id)).where(
|
||
Mailbox.tenant_id == tenant_id,
|
||
Mailbox.status == "error",
|
||
)
|
||
)
|
||
caselle_errore = caselle_errore_r.scalar_one() or 0
|
||
|
||
kpi = KpiSummary(
|
||
received_today=received_today,
|
||
sent_today=sent_today,
|
||
received_7d=received_7d,
|
||
sent_7d=sent_7d,
|
||
received_30d=received_30d,
|
||
sent_30d=sent_30d,
|
||
anomalie_attive=anomalie,
|
||
tasso_consegna=tasso_consegna,
|
||
caselle_in_errore=caselle_errore,
|
||
messaggi_non_letti=non_letti,
|
||
totale_messaggi=totale,
|
||
)
|
||
|
||
# ── Serie storica giornaliera ─────────────────────────────────────────
|
||
daily_stats = await self._get_daily_stats(tenant_id, period_days, visible_mailbox_ids)
|
||
|
||
# ── Distribuzione stati outbound ──────────────────────────────────────
|
||
outbound_states = await self._get_outbound_states(tenant_id, visible_mailbox_ids)
|
||
|
||
# ── Statistiche per casella ───────────────────────────────────────────
|
||
mailbox_stats = await self._get_mailbox_stats(tenant_id, visible_mailbox_ids)
|
||
|
||
return ReportSummaryResponse(
|
||
generated_at=now,
|
||
period_days=period_days,
|
||
kpi=kpi,
|
||
daily_stats=daily_stats,
|
||
outbound_states=outbound_states,
|
||
mailbox_stats=mailbox_stats,
|
||
)
|
||
|
||
# ─── Serie storica ────────────────────────────────────────────────────────
|
||
|
||
async def _get_daily_stats(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
days: int,
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]],
|
||
) -> list[DailyStat]:
|
||
"""Conta PEC ricevute e inviate per ciascuno degli ultimi `days` giorni."""
|
||
since = datetime.now(timezone.utc) - timedelta(days=days)
|
||
|
||
def _apply_filters(q):
|
||
q = q.where(Message.tenant_id == tenant_id)
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
return None
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
return q
|
||
|
||
# Aggregazione ricevute per giorno
|
||
q_recv = (
|
||
select(
|
||
func.date_trunc("day", Message.received_at).label("day"),
|
||
func.count(Message.id).label("cnt"),
|
||
)
|
||
.where(
|
||
Message.direction == "inbound",
|
||
Message.received_at >= since,
|
||
)
|
||
.group_by(text("day"))
|
||
.order_by(text("day"))
|
||
)
|
||
q_recv = _apply_filters(q_recv)
|
||
|
||
# Aggregazione inviate per giorno
|
||
q_sent = (
|
||
select(
|
||
func.date_trunc("day", Message.sent_at).label("day"),
|
||
func.count(Message.id).label("cnt"),
|
||
)
|
||
.where(
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
Message.sent_at >= since,
|
||
)
|
||
.group_by(text("day"))
|
||
.order_by(text("day"))
|
||
)
|
||
q_sent = _apply_filters(q_sent)
|
||
|
||
recv_map: dict[date, int] = {}
|
||
sent_map: dict[date, int] = {}
|
||
|
||
if q_recv is not None:
|
||
r = await self.db.execute(q_recv)
|
||
for row in r.all():
|
||
if row.day:
|
||
d = row.day.date() if hasattr(row.day, "date") else row.day
|
||
recv_map[d] = row.cnt
|
||
|
||
if q_sent is not None:
|
||
r = await self.db.execute(q_sent)
|
||
for row in r.all():
|
||
if row.day:
|
||
d = row.day.date() if hasattr(row.day, "date") else row.day
|
||
sent_map[d] = row.cnt
|
||
|
||
# Costruisce la serie completa (tutti i giorni, anche quelli a zero)
|
||
result: list[DailyStat] = []
|
||
for i in range(days, -1, -1):
|
||
d = (datetime.now(timezone.utc) - timedelta(days=i)).date()
|
||
result.append(DailyStat(
|
||
day=d,
|
||
received=recv_map.get(d, 0),
|
||
sent=sent_map.get(d, 0),
|
||
))
|
||
return result
|
||
|
||
# ─── Distribuzione stati outbound ────────────────────────────────────────
|
||
|
||
async def _get_outbound_states(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]],
|
||
) -> list[OutboundStateStat]:
|
||
q = (
|
||
select(Message.state, func.count(Message.id).label("cnt"))
|
||
.where(
|
||
Message.tenant_id == tenant_id,
|
||
Message.direction == "outbound",
|
||
Message.parent_message_id.is_(None),
|
||
)
|
||
.group_by(Message.state)
|
||
)
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
return []
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
|
||
r = await self.db.execute(q)
|
||
return [OutboundStateStat(state=row.state, count=row.cnt) for row in r.all()]
|
||
|
||
# ─── Statistiche per casella ──────────────────────────────────────────────
|
||
|
||
async def _get_mailbox_stats(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]],
|
||
) -> list[MailboxStat]:
|
||
# Carica le caselle
|
||
mb_q = select(Mailbox).where(Mailbox.tenant_id == tenant_id)
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
return []
|
||
mb_q = mb_q.where(Mailbox.id.in_(visible_mailbox_ids))
|
||
mb_result = await self.db.execute(mb_q)
|
||
mailboxes = mb_result.scalars().all()
|
||
|
||
if not mailboxes:
|
||
return []
|
||
|
||
mailbox_ids = [m.id for m in mailboxes]
|
||
mailbox_map = {m.id: m for m in mailboxes}
|
||
|
||
# Aggregazione messaggi per casella e direction
|
||
agg_q = (
|
||
select(
|
||
Message.mailbox_id,
|
||
Message.direction,
|
||
Message.state,
|
||
Message.is_read,
|
||
func.count(Message.id).label("cnt"),
|
||
)
|
||
.where(
|
||
Message.tenant_id == tenant_id,
|
||
Message.mailbox_id.in_(mailbox_ids),
|
||
Message.parent_message_id.is_(None),
|
||
)
|
||
.group_by(Message.mailbox_id, Message.direction, Message.state, Message.is_read)
|
||
)
|
||
agg_result = await self.db.execute(agg_q)
|
||
|
||
# Accumula per casella
|
||
stats: dict[uuid.UUID, MailboxStat] = {}
|
||
for mb in mailboxes:
|
||
stats[mb.id] = MailboxStat(
|
||
mailbox_id=mb.id,
|
||
email_address=mb.email_address,
|
||
display_name=mb.display_name,
|
||
status=mb.status,
|
||
last_sync_at=mb.last_sync_at,
|
||
)
|
||
|
||
for row in agg_result.all():
|
||
s = stats.get(row.mailbox_id)
|
||
if not s:
|
||
continue
|
||
if row.direction == "inbound":
|
||
s.received_total += row.cnt
|
||
if not row.is_read:
|
||
s.non_letti += row.cnt
|
||
elif row.direction == "outbound":
|
||
s.sent_total += row.cnt
|
||
if row.state == "anomaly":
|
||
s.anomalie += row.cnt
|
||
|
||
# Ordina per volume decrescente
|
||
return sorted(
|
||
stats.values(),
|
||
key=lambda x: x.received_total + x.sent_total,
|
||
reverse=True,
|
||
)
|
||
|
||
# ─── Export CSV ───────────────────────────────────────────────────────────
|
||
|
||
async def export_csv(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
date_from: Optional[datetime],
|
||
date_to: Optional[datetime],
|
||
mailbox_id: Optional[uuid.UUID],
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]],
|
||
) -> bytes:
|
||
"""Genera un CSV con tutti i messaggi del periodo."""
|
||
q = (
|
||
select(
|
||
Message.id,
|
||
Message.direction,
|
||
Message.state,
|
||
Message.pec_type,
|
||
Message.subject,
|
||
Message.from_address,
|
||
Message.received_at,
|
||
Message.sent_at,
|
||
Message.size_bytes,
|
||
Message.is_read,
|
||
Message.has_attachments,
|
||
Mailbox.email_address.label("mailbox_email"),
|
||
)
|
||
.join(Mailbox, Message.mailbox_id == Mailbox.id)
|
||
.where(
|
||
Message.tenant_id == tenant_id,
|
||
Message.parent_message_id.is_(None),
|
||
)
|
||
.order_by(Message.received_at.desc().nullslast(), Message.created_at.desc())
|
||
)
|
||
|
||
if visible_mailbox_ids is not None:
|
||
if not visible_mailbox_ids:
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf)
|
||
writer.writerow(CSV_HEADERS)
|
||
return buf.getvalue().encode("utf-8-sig")
|
||
q = q.where(Message.mailbox_id.in_(visible_mailbox_ids))
|
||
|
||
if mailbox_id:
|
||
q = q.where(Message.mailbox_id == mailbox_id)
|
||
|
||
if date_from:
|
||
q = q.where(
|
||
(Message.received_at >= date_from) | (Message.sent_at >= date_from)
|
||
)
|
||
if date_to:
|
||
q = q.where(
|
||
(Message.received_at <= date_to) | (Message.sent_at <= date_to)
|
||
)
|
||
|
||
result = await self.db.execute(q)
|
||
rows = result.all()
|
||
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf)
|
||
writer.writerow(CSV_HEADERS)
|
||
|
||
for r in rows:
|
||
ts = r.received_at or r.sent_at
|
||
writer.writerow([
|
||
str(r.id),
|
||
r.mailbox_email or "",
|
||
r.direction or "",
|
||
r.state or "",
|
||
r.pec_type or "",
|
||
r.subject or "",
|
||
r.from_address or "",
|
||
ts.strftime("%Y-%m-%d %H:%M:%S") if ts else "",
|
||
r.size_bytes or "",
|
||
"Si" if r.is_read else "No",
|
||
"Si" if r.has_attachments else "No",
|
||
])
|
||
|
||
return buf.getvalue().encode("utf-8-sig")
|
||
|
||
# ─── Export PDF ───────────────────────────────────────────────────────────
|
||
|
||
async def export_pdf(
|
||
self,
|
||
tenant_id: uuid.UUID,
|
||
date_from: Optional[datetime],
|
||
date_to: Optional[datetime],
|
||
visible_mailbox_ids: Optional[list[uuid.UUID]],
|
||
) -> bytes:
|
||
"""
|
||
Genera un PDF di riepilogo con KPI e tabella caselle.
|
||
Usa reportlab (puro Python, nessuna dipendenza di sistema).
|
||
"""
|
||
try:
|
||
from reportlab.lib import colors
|
||
from reportlab.lib.pagesizes import A4
|
||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||
from reportlab.lib.units import cm
|
||
from reportlab.platypus import (
|
||
Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle,
|
||
)
|
||
except ImportError:
|
||
raise RuntimeError(
|
||
"reportlab non installato. Aggiungere 'reportlab>=4.2.0' "
|
||
"alle dipendenze del backend."
|
||
)
|
||
|
||
summary = await self.get_summary(tenant_id, 30, visible_mailbox_ids)
|
||
now_str = summary.generated_at.strftime("%d/%m/%Y %H:%M")
|
||
|
||
buf = io.BytesIO()
|
||
doc = SimpleDocTemplate(
|
||
buf,
|
||
pagesize=A4,
|
||
leftMargin=2 * cm,
|
||
rightMargin=2 * cm,
|
||
topMargin=2 * cm,
|
||
bottomMargin=2 * cm,
|
||
)
|
||
|
||
styles = getSampleStyleSheet()
|
||
title_style = ParagraphStyle(
|
||
"Title", parent=styles["Title"], fontSize=18, spaceAfter=6,
|
||
)
|
||
subtitle_style = ParagraphStyle(
|
||
"Subtitle", parent=styles["Normal"], fontSize=10, textColor=colors.grey, spaceAfter=20,
|
||
)
|
||
heading_style = ParagraphStyle(
|
||
"Heading2", parent=styles["Heading2"], fontSize=13, spaceBefore=14, spaceAfter=6,
|
||
)
|
||
|
||
story = []
|
||
|
||
# Intestazione
|
||
story.append(Paragraph("PEChub – Report Attivita PEC", title_style))
|
||
date_range = ""
|
||
if date_from:
|
||
date_range += f"Dal {date_from.strftime('%d/%m/%Y')} "
|
||
if date_to:
|
||
date_range += f"Al {date_to.strftime('%d/%m/%Y')} "
|
||
story.append(Paragraph(
|
||
f"Generato il {now_str} {date_range}",
|
||
subtitle_style,
|
||
))
|
||
|
||
# Sezione KPI
|
||
story.append(Paragraph("Indicatori Chiave (ultimi 30 giorni)", heading_style))
|
||
kpi = summary.kpi
|
||
kpi_data = [
|
||
["Indicatore", "Valore"],
|
||
["PEC ricevute oggi", str(kpi.received_today)],
|
||
["PEC inviate oggi", str(kpi.sent_today)],
|
||
["PEC ricevute (7 gg)", str(kpi.received_7d)],
|
||
["PEC inviate (7 gg)", str(kpi.sent_7d)],
|
||
["PEC ricevute (30 gg)", str(kpi.received_30d)],
|
||
["PEC inviate (30 gg)", str(kpi.sent_30d)],
|
||
["Anomalie attive", str(kpi.anomalie_attive)],
|
||
["Tasso di consegna", f"{kpi.tasso_consegna}%"],
|
||
["Caselle in errore", str(kpi.caselle_in_errore)],
|
||
["Messaggi non letti", str(kpi.messaggi_non_letti)],
|
||
]
|
||
kpi_table = Table(kpi_data, colWidths=[10 * cm, 5 * cm])
|
||
kpi_table.setStyle(TableStyle([
|
||
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1e40af")),
|
||
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
|
||
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
|
||
("FONTSIZE", (0, 0), (-1, -1), 10),
|
||
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f0f4ff")]),
|
||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||
("ALIGN", (1, 0), (1, -1), "RIGHT"),
|
||
("LEFTPADDING", (0, 0), (-1, -1), 8),
|
||
("RIGHTPADDING", (0, 0), (-1, -1), 8),
|
||
("TOPPADDING", (0, 0), (-1, -1), 5),
|
||
("BOTTOMPADDING", (0, 0), (-1, -1), 5),
|
||
]))
|
||
story.append(kpi_table)
|
||
story.append(Spacer(1, 0.5 * cm))
|
||
|
||
# Sezione caselle
|
||
if summary.mailbox_stats:
|
||
story.append(Paragraph("Dettaglio per Casella", heading_style))
|
||
mb_header = ["Casella", "Stato", "Ricevute", "Inviate", "Anomalie", "Non letti"]
|
||
mb_data = [mb_header]
|
||
for ms in summary.mailbox_stats:
|
||
mb_data.append([
|
||
ms.email_address,
|
||
ms.status,
|
||
str(ms.received_total),
|
||
str(ms.sent_total),
|
||
str(ms.anomalie),
|
||
str(ms.non_letti),
|
||
])
|
||
mb_table = Table(
|
||
mb_data,
|
||
colWidths=[6.5 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm, 2.2 * cm],
|
||
)
|
||
mb_table.setStyle(TableStyle([
|
||
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1e40af")),
|
||
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
|
||
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
|
||
("FONTSIZE", (0, 0), (-1, -1), 9),
|
||
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f0f4ff")]),
|
||
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
|
||
("ALIGN", (1, 0), (-1, -1), "RIGHT"),
|
||
("LEFTPADDING", (0, 0), (-1, -1), 6),
|
||
("RIGHTPADDING", (0, 0), (-1, -1), 6),
|
||
("TOPPADDING", (0, 0), (-1, -1), 4),
|
||
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
|
||
]))
|
||
story.append(mb_table)
|
||
|
||
doc.build(story)
|
||
return buf.getvalue()
|
||
|
||
|
||
# ─── Costanti ────────────────────────────────────────────────────────────────
|
||
|
||
CSV_HEADERS = [
|
||
"ID",
|
||
"Casella",
|
||
"Direzione",
|
||
"Stato",
|
||
"Tipo PEC",
|
||
"Oggetto",
|
||
"Mittente",
|
||
"Data/Ora",
|
||
"Dimensione (byte)",
|
||
"Letto",
|
||
"Allegati",
|
||
]
|