""" Servizio Audit Log – registrazione e consultazione degli eventi di sistema. Uso tipico nei router/servizi: from app.services.audit_service import log_audit await log_audit( db=db, tenant_id=current_user.tenant_id, user_id=current_user.id, action="user.created", resource_type="user", resource_id=new_user.id, outcome="success", ip_address=get_real_ip(request), user_agent=request.headers.get("user-agent"), payload={"email": new_user.email}, ) """ import csv import io import math import uuid from datetime import datetime from typing import Optional from fastapi import Request from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased from app.core.pagination import PaginatedResponse, PaginationParams from app.models.audit_log import AuditLog from app.models.user import User from app.schemas.audit_log import AuditLogResponse # ─── Helper IP reale (legge X-Real-IP o X-Forwarded-For da reverse proxy) ──── def get_real_ip(request: Request) -> Optional[str]: """ Restituisce l'IP reale del client, leggendo gli header del reverse proxy. Priorita': X-Real-IP > primo IP di X-Forwarded-For > request.client.host """ x_real_ip = request.headers.get("x-real-ip") if x_real_ip: return x_real_ip.strip() x_forwarded_for = request.headers.get("x-forwarded-for", "") if x_forwarded_for: # Prende il primo IP della catena (quello del client originale) first_ip = x_forwarded_for.split(",")[0].strip() if first_ip: return first_ip if request.client: return request.client.host return None # ─── Helper standalone (da chiamare ovunque senza istanziare la classe) ─────── async def log_audit( db: AsyncSession, action: str, *, tenant_id: Optional[uuid.UUID] = None, user_id: Optional[uuid.UUID] = None, resource_type: Optional[str] = None, resource_id: Optional[uuid.UUID] = None, outcome: str = "success", ip_address: Optional[str] = None, user_agent: Optional[str] = None, payload: Optional[dict] = None, ) -> None: """ Inserisce un record di audit log nella sessione corrente. Non fa commit: il commit avviene con la transazione del chiamante. Non solleva eccezioni: gli errori sono loggati ma non propagati per evitare di bloccare l'operazione principale. """ try: entry = AuditLog( tenant_id=tenant_id, user_id=user_id, action=action, resource_type=resource_type, resource_id=resource_id, ip_address=ip_address, user_agent=user_agent, payload=payload or {}, outcome=outcome, ) db.add(entry) except Exception: # Mai bloccare l'operazione principale per un errore di audit import logging logging.getLogger(__name__).warning( "Impossibile registrare evento audit: action=%s", action, exc_info=True ) # ─── Servizio per query (usato dal router) ──────────────────────────────────── class AuditService: def __init__(self, db: AsyncSession) -> None: self.db = db def _build_query( self, *, tenant_id: Optional[uuid.UUID], action: Optional[str] = None, user_id: Optional[uuid.UUID] = None, outcome: Optional[str] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, resource_type: Optional[str] = None, ): """Costruisce la query base con JOIN utente e filtri.""" UserAlias = aliased(User) stmt = ( select( AuditLog, UserAlias.email.label("user_email"), UserAlias.full_name.label("user_full_name"), ) .outerjoin(UserAlias, UserAlias.id == AuditLog.user_id) ) filters = [] if tenant_id is not None: filters.append(AuditLog.tenant_id == tenant_id) if action: if action.endswith("*"): filters.append(AuditLog.action.like(action[:-1] + "%")) else: filters.append(AuditLog.action == action) if user_id: filters.append(AuditLog.user_id == user_id) if outcome: filters.append(AuditLog.outcome == outcome) if date_from: filters.append(AuditLog.occurred_at >= date_from) if date_to: filters.append(AuditLog.occurred_at <= date_to) if resource_type: filters.append(AuditLog.resource_type == resource_type) if filters: stmt = stmt.where(and_(*filters)) return stmt async def list( self, *, tenant_id: Optional[uuid.UUID], page: int = 1, page_size: int = 25, action: Optional[str] = None, user_id: Optional[uuid.UUID] = None, outcome: Optional[str] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, resource_type: Optional[str] = None, ) -> PaginatedResponse[AuditLogResponse]: """ Restituisce la lista paginata degli eventi audit con dati utente. Se tenant_id e' None (super_admin), restituisce eventi di tutti i tenant. """ base_stmt = self._build_query( tenant_id=tenant_id, action=action, user_id=user_id, outcome=outcome, date_from=date_from, date_to=date_to, resource_type=resource_type, ) # Count totale count_q = select(func.count()).select_from(base_stmt.subquery()) total = (await self.db.execute(count_q)).scalar_one() # Dati paginati offset = (page - 1) * page_size items_q = base_stmt.order_by(AuditLog.occurred_at.desc()).offset(offset).limit(page_size) rows = (await self.db.execute(items_q)).all() items = [] for row in rows: entry: AuditLog = row[0] email: Optional[str] = row[1] full_name: Optional[str] = row[2] resp = AuditLogResponse.model_validate(entry) resp.user_email = email resp.user_full_name = full_name items.append(resp) pages = math.ceil(total / page_size) if page_size > 0 else 0 return PaginatedResponse[AuditLogResponse]( items=items, total=total, page=page, page_size=page_size, pages=pages, ) async def export_csv( self, *, tenant_id: Optional[uuid.UUID], action: Optional[str] = None, user_id: Optional[uuid.UUID] = None, outcome: Optional[str] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, resource_type: Optional[str] = None, limit: int = 10000, ) -> str: """ Restituisce i log in formato CSV (stringa). Massimo `limit` righe per prevenire export eccessivi. """ base_stmt = self._build_query( tenant_id=tenant_id, action=action, user_id=user_id, outcome=outcome, date_from=date_from, date_to=date_to, resource_type=resource_type, ) items_q = base_stmt.order_by(AuditLog.occurred_at.desc()).limit(limit) rows = (await self.db.execute(items_q)).all() output = io.StringIO() writer = csv.writer(output) writer.writerow([ "ID", "Data/Ora", "Azione", "Esito", "Utente", "Email utente", "IP", "Tipo risorsa", "ID risorsa", ]) for row in rows: entry: AuditLog = row[0] email: Optional[str] = row[1] full_name: Optional[str] = row[2] writer.writerow([ entry.id, entry.occurred_at.strftime("%d/%m/%Y %H:%M:%S"), entry.action, entry.outcome, full_name or "", email or "", str(entry.ip_address) if entry.ip_address else "", entry.resource_type or "", str(entry.resource_id) if entry.resource_id else "", ]) return output.getvalue() async def export_pdf_bytes( self, *, tenant_id: Optional[uuid.UUID], action: Optional[str] = None, user_id: Optional[uuid.UUID] = None, outcome: Optional[str] = None, date_from: Optional[datetime] = None, date_to: Optional[datetime] = None, resource_type: Optional[str] = None, limit: int = 5000, ) -> bytes: """ Genera un PDF con i log di audit usando reportlab. Restituisce i byte del PDF. """ from reportlab.lib import colors from reportlab.lib.pagesizes import A4, landscape from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import cm from reportlab.platypus import ( SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, ) base_stmt = self._build_query( tenant_id=tenant_id, action=action, user_id=user_id, outcome=outcome, date_from=date_from, date_to=date_to, resource_type=resource_type, ) items_q = base_stmt.order_by(AuditLog.occurred_at.desc()).limit(limit) rows = (await self.db.execute(items_q)).all() buf = io.BytesIO() doc = SimpleDocTemplate( buf, pagesize=landscape(A4), rightMargin=1 * cm, leftMargin=1 * cm, topMargin=1.5 * cm, bottomMargin=1.5 * cm, ) styles = getSampleStyleSheet() story = [] # Titolo title_text = "Audit Log" if date_from or date_to: parts = [] if date_from: parts.append(f"dal {date_from.strftime('%d/%m/%Y')}") if date_to: parts.append(f"al {date_to.strftime('%d/%m/%Y')}") title_text += " – " + " ".join(parts) story.append(Paragraph(title_text, styles["Heading1"])) story.append(Spacer(1, 0.3 * cm)) story.append(Paragraph( f"Esportato il {datetime.now().strftime('%d/%m/%Y %H:%M')} — {len(rows)} record", styles["Normal"], )) story.append(Spacer(1, 0.5 * cm)) # Tabella headers = ["Data/Ora", "Azione", "Esito", "Utente", "IP", "Risorsa"] table_data = [headers] for row in rows: entry: AuditLog = row[0] email: Optional[str] = row[1] full_name: Optional[str] = row[2] utente = full_name or email or (str(entry.user_id)[:8] if entry.user_id else "—") risorsa = entry.resource_type or "—" table_data.append([ entry.occurred_at.strftime("%d/%m/%Y %H:%M:%S"), entry.action, "OK" if entry.outcome == "success" else "FAIL", utente, str(entry.ip_address) if entry.ip_address else "—", risorsa, ]) col_widths = [3.8 * cm, 5.0 * cm, 1.8 * cm, 4.5 * cm, 3.5 * cm, 3.0 * cm] table = Table(table_data, colWidths=col_widths, repeatRows=1) table.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1e3a5f")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), ("FONTSIZE", (0, 0), (-1, 0), 8), ("FONTSIZE", (0, 1), (-1, -1), 7), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#f5f8ff")]), ("GRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#cccccc")), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ])) story.append(table) doc.build(story) return buf.getvalue()