GapFill Flowee
This commit is contained in:
@@ -0,0 +1,358 @@
|
||||
"""
|
||||
Job arq: run_conservation – conservazione sostitutiva giornaliera.
|
||||
|
||||
Eseguito ogni giorno alle 18:00 (ora Italia, 16:00 UTC) tramite arq cron.
|
||||
|
||||
Flusso di esecuzione:
|
||||
1. Trova tutti i tenant che hanno messaggi con is_pending_conservation=TRUE
|
||||
e is_conserved=FALSE
|
||||
2. Per ogni tenant, legge la configurazione conservatore da tenant_settings
|
||||
(decifra credenziali se in modalità produzione)
|
||||
3. Per ogni messaggio da conservare:
|
||||
a. Carica il messaggio con i suoi allegati dal DB
|
||||
b. Carica le ricevute associate (accettazione, avvenuta_consegna)
|
||||
c. Scarica EML principale da MinIO
|
||||
d. Scarica ogni allegato da MinIO
|
||||
e. Scarica EML di ogni ricevuta da MinIO (se disponibile)
|
||||
f. Costruisce pacchetto BagIt SIP completo
|
||||
g. Invia al conservatore
|
||||
h. Su successo: imposta is_conserved=TRUE, conserved_at=NOW(),
|
||||
is_pending_conservation=FALSE
|
||||
i. Su errore: logga l'errore e lascia is_pending_conservation=TRUE
|
||||
(verrà ritentato al prossimo run giornaliero)
|
||||
|
||||
Gestione messaggi senza EML in MinIO:
|
||||
Se raw_eml_path è NULL o il download fallisce, viene generato un EML
|
||||
sintetico dai metadati del messaggio. La conservazione procede comunque.
|
||||
|
||||
Idempotenza:
|
||||
Il job è idempotente: se eseguito più volte, salta i messaggi già conservati
|
||||
(is_conserved=TRUE) perché il filtro WHERE esclude già quei record.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.archival.conservatore_client import (
|
||||
ConservatoreClient,
|
||||
build_bagit_sip_complete,
|
||||
)
|
||||
from app.database import AsyncSessionLocal
|
||||
from app.models import Attachment, Message, TenantSettings
|
||||
from app.security import decrypt_credential
|
||||
from app.storage.minio_client import download_attachment as minio_download
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ─── Costanti ─────────────────────────────────────────────────────────────────
|
||||
|
||||
# Tipi di messaggio considerati "ricevute" da includere nel SIP
|
||||
RECEIPT_PEC_TYPES = {"accettazione", "avvenuta_consegna"}
|
||||
|
||||
# Massimo messaggi processati per run (evita timeout su run iniziali con backlog)
|
||||
MAX_MESSAGES_PER_RUN = 200
|
||||
|
||||
|
||||
# ─── Helper: download EML da MinIO ────────────────────────────────────────────
|
||||
|
||||
async def _download_eml(raw_eml_path: str | None, msg_id: str, subject: str | None,
|
||||
from_address: str | None, to_addresses: list[str] | None,
|
||||
received_at: datetime | None) -> bytes:
|
||||
"""
|
||||
Tenta di scaricare l'EML da MinIO. Se non disponibile, genera un EML sintetico.
|
||||
"""
|
||||
if raw_eml_path:
|
||||
try:
|
||||
data = await minio_download(raw_eml_path)
|
||||
logger.debug(f"[conservation] EML scaricato da MinIO: {raw_eml_path} ({len(data)} bytes)")
|
||||
return data
|
||||
except Exception as e:
|
||||
logger.warning(f"[conservation] Download EML fallito ({raw_eml_path}): {e} – uso EML sintetico")
|
||||
|
||||
# EML sintetico dai metadati
|
||||
recv_str = received_at.isoformat() if received_at else datetime.now(UTC).isoformat()
|
||||
to_str = ", ".join(to_addresses) if to_addresses else "destinatario@pec.it"
|
||||
synthetic_eml = (
|
||||
f"From: {from_address or 'mittente@pec.it'}\r\n"
|
||||
f"To: {to_str}\r\n"
|
||||
f"Subject: {subject or 'Messaggio PEC'}\r\n"
|
||||
f"Date: {recv_str}\r\n"
|
||||
f"Message-ID: <{msg_id}@pechub.synthetic>\r\n"
|
||||
f"Content-Type: text/plain; charset=UTF-8\r\n"
|
||||
f"MIME-Version: 1.0\r\n"
|
||||
f"\r\n"
|
||||
f"[EML sintetico generato da PecHub – EML originale non disponibile]\r\n"
|
||||
f"ID messaggio PecHub: {msg_id}\r\n"
|
||||
f"Data conservazione: {datetime.now(UTC).isoformat()}\r\n"
|
||||
)
|
||||
logger.warning(f"[conservation] EML sintetico generato per messaggio {msg_id}")
|
||||
return synthetic_eml.encode("utf-8")
|
||||
|
||||
|
||||
# ─── Helper: credenziali conservatore dal DB ──────────────────────────────────
|
||||
|
||||
async def _get_conservatore_creds(db, tenant_id: uuid.UUID) -> dict:
|
||||
"""
|
||||
Legge le impostazioni conservatore del tenant dal DB e decifra le credenziali.
|
||||
Restituisce un dict compatibile con ConservatoreClient.from_tenant_credentials().
|
||||
"""
|
||||
result = await db.execute(
|
||||
select(TenantSettings).where(TenantSettings.tenant_id == tenant_id)
|
||||
)
|
||||
settings = result.scalar_one_or_none()
|
||||
|
||||
if settings is None:
|
||||
# Nessuna configurazione: usa mock
|
||||
return {"mode": "mock", "conservatore_id": "mock"}
|
||||
|
||||
username = None
|
||||
password = None
|
||||
|
||||
try:
|
||||
if settings.conservatore_username_enc:
|
||||
username = decrypt_credential(settings.conservatore_username_enc)
|
||||
if settings.conservatore_password_enc:
|
||||
password = decrypt_credential(settings.conservatore_password_enc)
|
||||
except ValueError as e:
|
||||
logger.error(f"[conservation] Decifratura credenziali fallita per tenant {tenant_id}: {e}")
|
||||
|
||||
return {
|
||||
"mode": settings.archival_mode,
|
||||
"conservatore_id": settings.conservatore_id,
|
||||
"endpoint": settings.conservatore_endpoint,
|
||||
"tenant_slug": settings.conservatore_tenant_slug,
|
||||
"username": username,
|
||||
"password": password,
|
||||
}
|
||||
|
||||
|
||||
# ─── Job principale ───────────────────────────────────────────────────────────
|
||||
|
||||
async def run_conservation(ctx: dict[str, Any]) -> dict:
|
||||
"""
|
||||
Job arq: esegue il ciclo di conservazione sostitutiva per tutti i tenant.
|
||||
|
||||
Schedulato tramite cron alle 16:00 UTC (18:00 ora Italia).
|
||||
|
||||
Returns:
|
||||
dict con statistiche del run: tenant processati, messaggi conservati,
|
||||
messaggi falliti.
|
||||
"""
|
||||
logger.info("[conservation] Avvio ciclo conservazione sostitutiva")
|
||||
|
||||
stats = {
|
||||
"tenant_count": 0,
|
||||
"messages_processed": 0,
|
||||
"messages_conserved": 0,
|
||||
"messages_failed": 0,
|
||||
"started_at": datetime.now(UTC).isoformat(),
|
||||
}
|
||||
|
||||
async with AsyncSessionLocal() as db:
|
||||
# ── 1. Trova tutti i tenant con messaggi da conservare ────────────────
|
||||
result = await db.execute(
|
||||
select(Message.tenant_id)
|
||||
.where(
|
||||
Message.is_pending_conservation == True, # noqa: E712
|
||||
Message.is_conserved == False, # noqa: E712
|
||||
)
|
||||
.distinct()
|
||||
)
|
||||
tenant_ids = [row[0] for row in result.fetchall()]
|
||||
|
||||
if not tenant_ids:
|
||||
logger.info("[conservation] Nessun messaggio da conservare")
|
||||
stats["finished_at"] = datetime.now(UTC).isoformat()
|
||||
return stats
|
||||
|
||||
logger.info(f"[conservation] Tenant con messaggi pendenti: {len(tenant_ids)}")
|
||||
stats["tenant_count"] = len(tenant_ids)
|
||||
|
||||
# ── 2. Processa ogni tenant ───────────────────────────────────────────
|
||||
for tenant_id in tenant_ids:
|
||||
logger.info(f"[conservation] Elaborazione tenant {tenant_id}")
|
||||
|
||||
# Leggi credenziali conservatore
|
||||
try:
|
||||
creds = await _get_conservatore_creds(db, tenant_id)
|
||||
client = ConservatoreClient.from_tenant_credentials(creds)
|
||||
conservatore_mode = creds.get("mode", "mock")
|
||||
logger.info(
|
||||
f"[conservation] Tenant {tenant_id}: "
|
||||
f"modalita={conservatore_mode}, "
|
||||
f"conservatore={creds.get('conservatore_id', 'mock')}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[conservation] Impossibile inizializzare client conservatore "
|
||||
f"per tenant {tenant_id}: {e} – tenant saltato"
|
||||
)
|
||||
continue
|
||||
|
||||
# Carica messaggi da conservare con allegati (eager load)
|
||||
msgs_result = await db.execute(
|
||||
select(Message)
|
||||
.where(
|
||||
Message.tenant_id == tenant_id,
|
||||
Message.is_pending_conservation == True, # noqa: E712
|
||||
Message.is_conserved == False, # noqa: E712
|
||||
)
|
||||
.options(selectinload(Message.attachments))
|
||||
.order_by(Message.received_at.asc().nullsfirst())
|
||||
.limit(MAX_MESSAGES_PER_RUN)
|
||||
)
|
||||
messages = msgs_result.scalars().all()
|
||||
|
||||
logger.info(
|
||||
f"[conservation] Tenant {tenant_id}: "
|
||||
f"{len(messages)} messaggi da conservare"
|
||||
)
|
||||
|
||||
for msg in messages:
|
||||
stats["messages_processed"] += 1
|
||||
msg_id = str(msg.id)
|
||||
subject = msg.subject or f"PEC {msg_id[:8]}"
|
||||
|
||||
try:
|
||||
# ── a. Scarica EML principale ─────────────────────────────
|
||||
eml_bytes = await _download_eml(
|
||||
raw_eml_path=msg.raw_eml_path,
|
||||
msg_id=msg_id,
|
||||
subject=msg.subject,
|
||||
from_address=msg.from_address,
|
||||
to_addresses=msg.to_addresses,
|
||||
received_at=msg.received_at,
|
||||
)
|
||||
|
||||
# ── b. Scarica allegati ───────────────────────────────────
|
||||
attachment_files: list[tuple[str, bytes]] = []
|
||||
for att in msg.attachments:
|
||||
try:
|
||||
att_bytes = await minio_download(att.storage_path)
|
||||
# Usa il filename originale, sanitizzato per path sicuri
|
||||
safe_name = att.filename.replace("/", "_").replace("\\", "_")
|
||||
attachment_files.append((safe_name, att_bytes))
|
||||
logger.debug(
|
||||
f"[conservation] Allegato scaricato: "
|
||||
f"{att.filename} ({len(att_bytes)} bytes)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[conservation] Allegato non disponibile "
|
||||
f"({att.filename}): {e} – saltato"
|
||||
)
|
||||
|
||||
# ── c. Carica e scarica ricevute PEC ──────────────────────
|
||||
receipts_result = await db.execute(
|
||||
select(Message).where(
|
||||
Message.parent_message_id == msg.id,
|
||||
Message.pec_type.in_(list(RECEIPT_PEC_TYPES)),
|
||||
)
|
||||
)
|
||||
receipt_messages = receipts_result.scalars().all()
|
||||
|
||||
receipt_files: list[tuple[str, bytes]] = []
|
||||
for receipt in receipt_messages:
|
||||
if not receipt.raw_eml_path:
|
||||
logger.debug(
|
||||
f"[conservation] Ricevuta {receipt.id} "
|
||||
f"({receipt.pec_type}) senza EML – saltata"
|
||||
)
|
||||
continue
|
||||
try:
|
||||
rec_bytes = await minio_download(receipt.raw_eml_path)
|
||||
receipt_files.append((str(receipt.id), rec_bytes))
|
||||
logger.debug(
|
||||
f"[conservation] Ricevuta scaricata: "
|
||||
f"{receipt.pec_type} ({len(rec_bytes)} bytes)"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[conservation] Download ricevuta {receipt.id} fallito: "
|
||||
f"{e} – saltata"
|
||||
)
|
||||
|
||||
# ── d. Costruisci SIP BagIt completo ──────────────────────
|
||||
sip_bytes = build_bagit_sip_complete(
|
||||
eml_bytes=eml_bytes,
|
||||
message_id=msg_id,
|
||||
subject=msg.subject,
|
||||
from_address=msg.from_address,
|
||||
to_addresses=msg.to_addresses,
|
||||
received_at=msg.received_at.isoformat() if msg.received_at else None,
|
||||
attachments=attachment_files if attachment_files else None,
|
||||
receipts=receipt_files if receipt_files else None,
|
||||
)
|
||||
|
||||
n_att = len(attachment_files)
|
||||
n_rec = len(receipt_files)
|
||||
sip_size_kb = len(sip_bytes) // 1024
|
||||
logger.info(
|
||||
f"[conservation] SIP costruito per {msg_id[:8]}... "
|
||||
f"({sip_size_kb} KB, {n_att} allegati, {n_rec} ricevute)"
|
||||
)
|
||||
|
||||
# ── e. Upload al conservatore ─────────────────────────────
|
||||
sip_filename = f"pechub-pec-{msg_id}.zip"
|
||||
upload_result = await client.upload_versamento(
|
||||
sip_path=sip_filename,
|
||||
sip_bytes=sip_bytes,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
|
||||
if not upload_result.success:
|
||||
raise RuntimeError(
|
||||
f"Upload conservatore fallito: {upload_result.message}"
|
||||
)
|
||||
|
||||
versamento_id = upload_result.versamento_id
|
||||
logger.info(
|
||||
f"[conservation] Messaggio {msg_id[:8]}... conservato: "
|
||||
f"versamento_id={versamento_id}"
|
||||
)
|
||||
|
||||
# ── f. Aggiorna DB: messaggio conservato ──────────────────
|
||||
await db.execute(
|
||||
update(Message)
|
||||
.where(Message.id == msg.id)
|
||||
.values(
|
||||
is_conserved=True,
|
||||
conserved_at=datetime.now(UTC),
|
||||
is_pending_conservation=False,
|
||||
)
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
stats["messages_conserved"] += 1
|
||||
logger.info(
|
||||
f"[conservation] DB aggiornato: "
|
||||
f"is_conserved=TRUE, is_pending_conservation=FALSE "
|
||||
f"per messaggio {msg_id[:8]}..."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
await db.rollback()
|
||||
stats["messages_failed"] += 1
|
||||
logger.error(
|
||||
f"[conservation] ERRORE conservazione messaggio {msg_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
# Il messaggio resta con is_pending_conservation=TRUE
|
||||
# e verrà ritentato al prossimo run giornaliero
|
||||
|
||||
stats["finished_at"] = datetime.now(UTC).isoformat()
|
||||
|
||||
logger.info(
|
||||
f"[conservation] Ciclo completato: "
|
||||
f"{stats['messages_conserved']} conservati, "
|
||||
f"{stats['messages_failed']} falliti "
|
||||
f"su {stats['messages_processed']} processati"
|
||||
)
|
||||
|
||||
return stats
|
||||
Reference in New Issue
Block a user