mirror of
https://github.com/idrainformatica/PecFlow.git
synced 2026-06-16 20:55:41 +02:00
1227 lines
46 KiB
Python
1227 lines
46 KiB
Python
"""
|
|
Servizio di gestione indicizzazione full-text dei messaggi.
|
|
|
|
Funzionalita':
|
|
- Statistiche sull'indicizzazione (messaggi indicizzati vs totali)
|
|
- Avvio reindex totale o differenziale in background
|
|
- Monitoraggio progresso tramite Redis
|
|
- Cancellazione di un job in corso
|
|
- Avvio rescan allegati: ri-estrazione testo da MinIO + aggiornamento search_vector
|
|
|
|
Stato del job reindex salvato in Redis:
|
|
pechub:reindex:{tenant_id} -> JSON con stato corrente (TTL 24h)
|
|
pechub:reindex:{tenant_id}:cancel -> flag cancellazione (TTL 10min)
|
|
|
|
Stato del job rescan salvato in Redis:
|
|
pechub:rescan:{tenant_id} -> JSON con stato corrente (TTL 24h)
|
|
pechub:rescan:{tenant_id}:cancel -> flag cancellazione (TTL 10min)
|
|
|
|
Il background task usa una sessione DB propria (non quella della request).
|
|
"""
|
|
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Literal, Optional
|
|
|
|
from sqlalchemy import func, select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# ─── Costanti ─────────────────────────────────────────────────────────────────
|
|
|
|
REDIS_KEY_PREFIX = "pechub:reindex"
|
|
REDIS_RESCAN_PREFIX = "pechub:rescan"
|
|
REDIS_TTL_STATUS = 60 * 60 * 24 # 24 ore
|
|
REDIS_TTL_CANCEL = 60 * 10 # 10 minuti
|
|
BATCH_SIZE = 500 # messaggi per batch (reindex)
|
|
RESCAN_BATCH_SIZE = 50 # allegati per batch (rescan - piu' pesante)
|
|
STALE_THRESHOLD_HOURS = 2 # ore prima di segnalare un job come stale
|
|
|
|
MAX_EXTRACTED_TEXT_LEN = 50_000
|
|
MAX_COMBINED_TEXT_LEN = 200_000
|
|
|
|
ReindexMode = Literal["full", "differential"]
|
|
JobStatus = Literal["idle", "running", "completed", "failed", "cancelled"]
|
|
|
|
# ─── Content-type e estensioni supportate per rescan ─────────────────────────
|
|
|
|
_SUPPORTED_EXTENSIONS = {
|
|
"pdf", "docx", "doc", "xlsx", "xls", "pptx", "ppt",
|
|
"odt", "ods", "odp", "rtf", "txt", "csv", "xml",
|
|
"html", "htm", "json", "eml", "msg", "p7m",
|
|
"md",
|
|
# Immagini (OCR)
|
|
"png", "jpg", "jpeg", "tiff", "tif", "bmp", "gif", "webp",
|
|
}
|
|
|
|
_SUPPORTED_CONTENT_TYPES = {
|
|
"application/pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"application/msword",
|
|
"application/vnd.ms-word",
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
"application/vnd.ms-excel",
|
|
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
|
|
"application/vnd.ms-powerpoint",
|
|
"application/vnd.oasis.opendocument.text",
|
|
"application/vnd.oasis.opendocument.spreadsheet",
|
|
"application/vnd.oasis.opendocument.presentation",
|
|
"application/rtf",
|
|
"text/rtf",
|
|
"text/plain",
|
|
"text/csv",
|
|
"text/xml",
|
|
"application/xml",
|
|
"text/html",
|
|
"message/rfc822",
|
|
"application/pkcs7-mime",
|
|
"application/x-pkcs7-mime",
|
|
"text/markdown",
|
|
# Immagini (OCR)
|
|
"image/png",
|
|
"image/jpeg",
|
|
"image/tiff",
|
|
"image/bmp",
|
|
"image/gif",
|
|
"image/webp",
|
|
}
|
|
|
|
|
|
# ─── Helpers Redis ─────────────────────────────────────────────────────────────
|
|
|
|
def _redis_key(tenant_id: uuid.UUID) -> str:
|
|
return f"{REDIS_KEY_PREFIX}:{tenant_id}"
|
|
|
|
|
|
def _redis_cancel_key(tenant_id: uuid.UUID) -> str:
|
|
return f"{REDIS_KEY_PREFIX}:{tenant_id}:cancel"
|
|
|
|
|
|
def _redis_rescan_key(tenant_id: uuid.UUID) -> str:
|
|
return f"{REDIS_RESCAN_PREFIX}:{tenant_id}"
|
|
|
|
|
|
def _redis_rescan_cancel_key(tenant_id: uuid.UUID) -> str:
|
|
return f"{REDIS_RESCAN_PREFIX}:{tenant_id}:cancel"
|
|
|
|
|
|
# ─── Estrattori testo allegati ─────────────────────────────────────────────────
|
|
|
|
def _ext(filename: str | None) -> str:
|
|
"""Restituisce l'estensione del file in minuscolo, senza punto."""
|
|
if not filename:
|
|
return ""
|
|
fn = filename.lower()
|
|
if fn.endswith(".p7m"):
|
|
return "p7m"
|
|
idx = fn.rfind(".")
|
|
return fn[idx + 1:] if idx >= 0 else ""
|
|
|
|
|
|
# Soglia minima di caratteri estratti da pypdf prima di ricorrere all'OCR.
|
|
_PDF_OCR_THRESHOLD = 50
|
|
# Numero massimo di pagine OCR per evitare timeout su PDF lunghi.
|
|
_PDF_OCR_MAX_PAGES = 15
|
|
|
|
|
|
def _extract_pdf(content: bytes) -> str:
|
|
"""
|
|
Estrae testo da PDF tramite pypdf.
|
|
|
|
Se il testo estratto e' inferiore a _PDF_OCR_THRESHOLD caratteri (PDF
|
|
image-only / scansione), attiva il fallback OCR via Tesseract.
|
|
"""
|
|
try:
|
|
import pypdf # type: ignore[import]
|
|
reader = pypdf.PdfReader(io.BytesIO(content))
|
|
parts: list[str] = []
|
|
for page in reader.pages:
|
|
try:
|
|
t = page.extract_text()
|
|
if t:
|
|
parts.append(t)
|
|
except Exception:
|
|
continue
|
|
text = " ".join(parts)
|
|
except ImportError:
|
|
logger.warning("pypdf non installato: impossibile estrarre testo da PDF")
|
|
return ""
|
|
except Exception as e:
|
|
logger.debug(f"Errore estrazione PDF: {e}")
|
|
return ""
|
|
|
|
if len(text.strip()) < _PDF_OCR_THRESHOLD:
|
|
logger.debug(
|
|
f"PDF con testo insufficiente ({len(text.strip())} char), "
|
|
"tentativo OCR..."
|
|
)
|
|
ocr_text = _extract_pdf_ocr(content)
|
|
if len(ocr_text.strip()) > len(text.strip()):
|
|
return ocr_text
|
|
|
|
return text
|
|
|
|
|
|
def _extract_pdf_ocr(content: bytes) -> str:
|
|
"""
|
|
OCR su PDF image-only tramite pdf2image + Tesseract.
|
|
|
|
Converte le pagine a 200 DPI e applica Tesseract con lingua italiana + inglese.
|
|
Processa al massimo _PDF_OCR_MAX_PAGES pagine per evitare timeout.
|
|
"""
|
|
try:
|
|
from pdf2image import convert_from_bytes # type: ignore[import]
|
|
import pytesseract # type: ignore[import]
|
|
|
|
pages = convert_from_bytes(
|
|
content,
|
|
dpi=200,
|
|
last_page=_PDF_OCR_MAX_PAGES,
|
|
)
|
|
parts: list[str] = []
|
|
for page_img in pages:
|
|
try:
|
|
t = pytesseract.image_to_string(page_img, lang="ita+eng")
|
|
if t and t.strip():
|
|
parts.append(t.strip())
|
|
except Exception:
|
|
continue
|
|
return " ".join(parts)
|
|
except ImportError:
|
|
logger.warning(
|
|
"pdf2image o pytesseract non installati: impossibile OCR PDF"
|
|
)
|
|
return ""
|
|
except Exception as e:
|
|
logger.debug(f"Errore OCR PDF: {e}")
|
|
return ""
|
|
|
|
|
|
def _extract_image_ocr(content: bytes) -> str:
|
|
"""
|
|
Estrae testo da un file immagine (PNG, JPEG, TIFF, BMP, ecc.) tramite OCR.
|
|
|
|
Usa Tesseract con lingua italiana + inglese per massima copertura
|
|
su documenti italiani.
|
|
"""
|
|
try:
|
|
import pytesseract # type: ignore[import]
|
|
from PIL import Image # type: ignore[import]
|
|
|
|
img = Image.open(io.BytesIO(content))
|
|
if img.mode not in ("RGB", "L"):
|
|
img = img.convert("RGB")
|
|
text = pytesseract.image_to_string(img, lang="ita+eng")
|
|
return " ".join(text.split())
|
|
except ImportError:
|
|
logger.warning(
|
|
"pytesseract o Pillow non installati: impossibile OCR immagine"
|
|
)
|
|
return ""
|
|
except Exception as e:
|
|
logger.debug(f"Errore OCR immagine: {e}")
|
|
return ""
|
|
|
|
|
|
def _extract_docx(content: bytes) -> str:
|
|
try:
|
|
import docx # type: ignore[import]
|
|
doc = docx.Document(io.BytesIO(content))
|
|
parts = [p.text for p in doc.paragraphs if p.text and p.text.strip()]
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
for cell in row.cells:
|
|
if cell.text and cell.text.strip():
|
|
parts.append(cell.text.strip())
|
|
return " ".join(parts)
|
|
except ImportError:
|
|
logger.warning("python-docx non installato: impossibile estrarre testo da DOCX")
|
|
return ""
|
|
except Exception as e:
|
|
logger.debug(f"Errore estrazione DOCX: {e}")
|
|
return ""
|
|
|
|
|
|
def _extract_doc(content: bytes) -> str:
|
|
"""
|
|
Estrae testo da file DOC (formato OLE2/legacy Microsoft Word).
|
|
|
|
Prima tenta python-docx (gestisce .docx eventualmente rinominati come .doc).
|
|
Se fallisce, esegue uno scan del binario OLE2 estraendo sequenze di caratteri
|
|
stampabili di almeno 5 caratteri consecutivi (approccio 'strings').
|
|
Non richiede librerie aggiuntive e funziona per la maggior parte dei .doc
|
|
in lingua italiana/europea (ASCII + Latin-1).
|
|
"""
|
|
# Tentativo 1: python-docx (per .docx rinominati o ZIP-based)
|
|
result = _extract_docx(content)
|
|
if result.strip():
|
|
return result
|
|
|
|
# Tentativo 2: scan binario ASCII per OLE2
|
|
try:
|
|
run: list[int] = []
|
|
parts: list[str] = []
|
|
for byte in content:
|
|
if 32 <= byte <= 126: # ASCII stampabile
|
|
run.append(byte)
|
|
else:
|
|
if len(run) >= 5:
|
|
parts.append(bytes(run).decode("ascii", errors="ignore"))
|
|
run = []
|
|
if len(run) >= 5:
|
|
parts.append(bytes(run).decode("ascii", errors="ignore"))
|
|
|
|
# Mantieni solo sequenze con almeno 3 lettere (filtra sequenze di simboli)
|
|
meaningful = [p for p in parts if sum(1 for c in p if c.isalpha()) >= 3]
|
|
if meaningful:
|
|
text = " ".join(meaningful)
|
|
return " ".join(text.split())[:MAX_EXTRACTED_TEXT_LEN]
|
|
except Exception as e:
|
|
logger.debug(f"Errore estrazione DOC OLE2 binario: {e}")
|
|
|
|
return ""
|
|
|
|
|
|
def _extract_plain(content: bytes) -> str:
|
|
try:
|
|
try:
|
|
txt = content.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
txt = content.decode("latin-1", errors="replace")
|
|
if "<" in txt and ">" in txt:
|
|
txt = re.sub(r"<[^>]+>", " ", txt)
|
|
txt = re.sub(r"&[a-zA-Z]+;", " ", txt)
|
|
return " ".join(txt.split())
|
|
except Exception as e:
|
|
logger.debug(f"Errore estrazione plain: {e}")
|
|
return ""
|
|
|
|
|
|
def _extract_eml(content: bytes) -> str:
|
|
try:
|
|
import email as emaillib
|
|
msg = emaillib.message_from_bytes(content)
|
|
parts: list[str] = []
|
|
subject = msg.get("Subject", "")
|
|
if subject:
|
|
parts.append(subject)
|
|
sender = msg.get("From", "")
|
|
if sender:
|
|
parts.append(sender)
|
|
if msg.is_multipart():
|
|
for part in msg.walk():
|
|
ct = part.get_content_type()
|
|
if ct == "text/plain":
|
|
try:
|
|
payload = part.get_payload(decode=True)
|
|
if payload:
|
|
charset = part.get_content_charset() or "utf-8"
|
|
parts.append(payload.decode(charset, errors="replace"))
|
|
except Exception:
|
|
pass
|
|
else:
|
|
payload = msg.get_payload(decode=True)
|
|
if payload:
|
|
charset = msg.get_content_charset() or "utf-8"
|
|
parts.append(payload.decode(charset, errors="replace")) # type: ignore[arg-type]
|
|
return " ".join(parts)
|
|
except Exception as e:
|
|
logger.debug(f"Errore estrazione EML: {e}")
|
|
return ""
|
|
|
|
|
|
def _unwrap_p7m_asn1(data: bytes) -> bytes | None:
|
|
def read_tag_length(buf: bytes, offset: int):
|
|
tag = buf[offset]
|
|
offset += 1
|
|
lb = buf[offset]
|
|
offset += 1
|
|
if lb & 0x80:
|
|
num_bytes = lb & 0x7F
|
|
ln = int.from_bytes(buf[offset:offset + num_bytes], "big")
|
|
offset += num_bytes
|
|
else:
|
|
ln = lb
|
|
return tag, ln, offset
|
|
|
|
pos = 0
|
|
try:
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x30:
|
|
return None
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x06:
|
|
return None
|
|
pos += ln
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0xA0:
|
|
return None
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x30:
|
|
return None
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x02:
|
|
return None
|
|
pos += ln
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x31:
|
|
return None
|
|
pos += ln
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x30:
|
|
return None
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x06:
|
|
return None
|
|
pos += ln
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0xA0:
|
|
return None
|
|
tag, ln, pos = read_tag_length(data, pos)
|
|
if tag != 0x04:
|
|
return None
|
|
return data[pos: pos + ln]
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _extract_p7m(content: bytes, original_filename: str | None = None) -> str:
|
|
inner_content = _unwrap_p7m_asn1(content)
|
|
if not inner_content:
|
|
return ""
|
|
inner_ext = ""
|
|
if original_filename:
|
|
fn = original_filename.lower()
|
|
if fn.endswith(".p7m"):
|
|
fn = fn[:-4]
|
|
idx = fn.rfind(".")
|
|
if idx >= 0:
|
|
inner_ext = fn[idx + 1:]
|
|
extractor = _EXTRACTORS_SYNC.get(inner_ext)
|
|
if extractor:
|
|
return extractor(inner_content)
|
|
if inner_content[:4] == b"%PDF":
|
|
return _extract_pdf(inner_content)
|
|
if inner_content[:2] == b"PK":
|
|
for fn_try in (_extract_docx, _extract_plain):
|
|
result = fn_try(inner_content)
|
|
if result.strip():
|
|
return result
|
|
return _extract_plain(inner_content)
|
|
|
|
|
|
_EXTRACTORS_SYNC: dict = {
|
|
"pdf": _extract_pdf,
|
|
"docx": _extract_docx,
|
|
"doc": _extract_doc, # usa fallback OLE2 per .doc legacy
|
|
"txt": _extract_plain,
|
|
"csv": _extract_plain,
|
|
"xml": _extract_plain,
|
|
"html": _extract_plain,
|
|
"htm": _extract_plain,
|
|
"json": _extract_plain,
|
|
"md": _extract_plain, # Markdown e' testo semplice
|
|
"eml": _extract_eml,
|
|
"msg": _extract_eml,
|
|
"p7m": _extract_p7m,
|
|
# Immagini (OCR)
|
|
"png": _extract_image_ocr,
|
|
"jpg": _extract_image_ocr,
|
|
"jpeg": _extract_image_ocr,
|
|
"tiff": _extract_image_ocr,
|
|
"tif": _extract_image_ocr,
|
|
"bmp": _extract_image_ocr,
|
|
"gif": _extract_image_ocr,
|
|
"webp": _extract_image_ocr,
|
|
}
|
|
|
|
|
|
def _resolve_extractor(content_type: str | None, filename: str | None):
|
|
"""Ritorna la funzione estrattore appropriata, o None."""
|
|
e = _ext(filename)
|
|
if e in _EXTRACTORS_SYNC:
|
|
return _EXTRACTORS_SYNC[e]
|
|
ct = (content_type or "").lower()
|
|
_ct_map = {
|
|
"application/pdf": "pdf",
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
|
|
"application/msword": "doc",
|
|
"application/vnd.ms-word": "doc",
|
|
"text/plain": "txt",
|
|
"text/csv": "csv",
|
|
"text/xml": "xml",
|
|
"application/xml": "xml",
|
|
"text/html": "html",
|
|
"text/markdown": "md",
|
|
"message/rfc822": "eml",
|
|
"application/pkcs7-mime": "p7m",
|
|
"application/x-pkcs7-mime": "p7m",
|
|
# Immagini (OCR)
|
|
"image/png": "png",
|
|
"image/jpeg": "jpeg",
|
|
"image/tiff": "tiff",
|
|
"image/bmp": "bmp",
|
|
"image/gif": "gif",
|
|
"image/webp": "webp",
|
|
}
|
|
mapped = _ct_map.get(ct)
|
|
if mapped:
|
|
return _EXTRACTORS_SYNC.get(mapped)
|
|
return None
|
|
|
|
|
|
def _is_extractable(content_type: str | None, filename: str | None) -> bool:
|
|
e = _ext(filename)
|
|
if e in _SUPPORTED_EXTENSIONS:
|
|
return True
|
|
ct = (content_type or "").lower()
|
|
return ct in _SUPPORTED_CONTENT_TYPES
|
|
|
|
|
|
# ─── Servizio ──────────────────────────────────────────────────────────────────
|
|
|
|
class IndexingService:
|
|
"""Gestisce le operazioni di indicizzazione full-text per un tenant."""
|
|
|
|
def __init__(self, db: AsyncSession) -> None:
|
|
self.db = db
|
|
|
|
# ── Statistiche ──────────────────────────────────────────────────────────
|
|
|
|
async def get_stats(self, tenant_id: uuid.UUID) -> dict:
|
|
"""
|
|
Restituisce le statistiche di copertura dell'indicizzazione per il tenant.
|
|
"""
|
|
from app.models.message import Attachment, Message
|
|
|
|
total_q = await self.db.execute(
|
|
select(func.count(Message.id)).where(
|
|
Message.tenant_id == tenant_id,
|
|
Message.parent_message_id.is_(None),
|
|
)
|
|
)
|
|
total_messages: int = total_q.scalar_one()
|
|
|
|
indexed_q = await self.db.execute(
|
|
select(func.count(Message.id)).where(
|
|
Message.tenant_id == tenant_id,
|
|
Message.parent_message_id.is_(None),
|
|
Message.search_vector.isnot(None),
|
|
)
|
|
)
|
|
indexed_messages: int = indexed_q.scalar_one()
|
|
|
|
_supported_content_types_list = list(_SUPPORTED_CONTENT_TYPES)
|
|
_supported_extensions_like = [f"%.{e}" for e in _SUPPORTED_EXTENSIONS]
|
|
|
|
from sqlalchemy import or_
|
|
ext_conditions = [Attachment.filename.ilike(ext) for ext in _supported_extensions_like]
|
|
|
|
att_total_q = await self.db.execute(
|
|
select(func.count(Attachment.id)).where(
|
|
Attachment.tenant_id == tenant_id,
|
|
or_(
|
|
Attachment.content_type.in_(_supported_content_types_list),
|
|
*ext_conditions,
|
|
),
|
|
)
|
|
)
|
|
attachments_total: int = att_total_q.scalar_one()
|
|
|
|
att_extracted_q = await self.db.execute(
|
|
select(func.count(Attachment.id)).where(
|
|
Attachment.tenant_id == tenant_id,
|
|
Attachment.extracted_text.isnot(None),
|
|
)
|
|
)
|
|
attachments_extracted: int = att_extracted_q.scalar_one()
|
|
|
|
unindexed_messages = total_messages - indexed_messages
|
|
coverage_pct = (
|
|
round(indexed_messages / total_messages * 100, 1)
|
|
if total_messages > 0
|
|
else 100.0
|
|
)
|
|
attachments_pct = (
|
|
round(attachments_extracted / attachments_total * 100, 1)
|
|
if attachments_total > 0
|
|
else 100.0
|
|
)
|
|
|
|
return {
|
|
"total_messages": total_messages,
|
|
"indexed_messages": indexed_messages,
|
|
"unindexed_messages": unindexed_messages,
|
|
"coverage_pct": coverage_pct,
|
|
"attachments_total": attachments_total,
|
|
"attachments_extracted": attachments_extracted,
|
|
"attachments_pct": attachments_pct,
|
|
}
|
|
|
|
# ── Stato job reindex ─────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def get_job_status(tenant_id: uuid.UUID, redis_url: str) -> dict:
|
|
"""Legge lo stato del job di reindex da Redis."""
|
|
return await IndexingService._read_job_state(
|
|
_redis_key(tenant_id), redis_url
|
|
)
|
|
|
|
# ── Stato job rescan ──────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def get_rescan_status(tenant_id: uuid.UUID, redis_url: str) -> dict:
|
|
"""Legge lo stato del job di rescan allegati da Redis."""
|
|
return await IndexingService._read_job_state(
|
|
_redis_rescan_key(tenant_id), redis_url
|
|
)
|
|
|
|
@staticmethod
|
|
async def _read_job_state(redis_key_str: str, redis_url: str) -> dict:
|
|
"""Helper generico: legge uno stato job da Redis."""
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await client.get(redis_key_str)
|
|
finally:
|
|
await client.aclose()
|
|
|
|
if not raw:
|
|
return {
|
|
"status": "idle",
|
|
"mode": None,
|
|
"total": 0,
|
|
"processed": 0,
|
|
"progress_pct": 0.0,
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"started_by": None,
|
|
"elapsed_seconds": None,
|
|
"is_stale": False,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
data: dict = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return {"status": "idle"}
|
|
|
|
is_stale = False
|
|
elapsed_seconds = None
|
|
if data.get("started_at"):
|
|
try:
|
|
started = datetime.fromisoformat(data["started_at"])
|
|
finished_str = data.get("finished_at")
|
|
ref_time = (
|
|
datetime.fromisoformat(finished_str)
|
|
if finished_str
|
|
else datetime.now(timezone.utc)
|
|
)
|
|
elapsed_seconds = int((ref_time - started).total_seconds())
|
|
if data.get("status") == "running":
|
|
elapsed_hours = elapsed_seconds / 3600
|
|
is_stale = elapsed_hours >= STALE_THRESHOLD_HOURS
|
|
except Exception:
|
|
pass
|
|
|
|
total = data.get("total", 0)
|
|
processed = data.get("processed", 0)
|
|
progress_pct = round(processed / total * 100, 1) if total > 0 else 0.0
|
|
|
|
return {
|
|
"status": data.get("status", "idle"),
|
|
"mode": data.get("mode"),
|
|
"total": total,
|
|
"processed": processed,
|
|
"progress_pct": progress_pct,
|
|
"started_at": data.get("started_at"),
|
|
"finished_at": data.get("finished_at"),
|
|
"started_by": data.get("started_by"),
|
|
"elapsed_seconds": elapsed_seconds,
|
|
"is_stale": is_stale,
|
|
"error": data.get("error"),
|
|
}
|
|
|
|
# ── Avvio reindex ─────────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def start_reindex(
|
|
tenant_id: uuid.UUID,
|
|
mode: ReindexMode,
|
|
started_by_email: str,
|
|
redis_url: str,
|
|
db_url: str,
|
|
) -> None:
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await client.get(_redis_key(tenant_id))
|
|
if raw:
|
|
data = json.loads(raw)
|
|
if data.get("status") == "running":
|
|
raise ValueError("Un job di reindex e' gia' in corso per questo tenant")
|
|
# Controlla anche se il rescan e' in corso
|
|
raw_rescan = await client.get(_redis_rescan_key(tenant_id))
|
|
if raw_rescan:
|
|
data_rescan = json.loads(raw_rescan)
|
|
if data_rescan.get("status") == "running":
|
|
raise ValueError(
|
|
"Un job di scansione allegati e' in corso. "
|
|
"Attendi il termine prima di avviare il reindex."
|
|
)
|
|
finally:
|
|
await client.aclose()
|
|
|
|
await IndexingService._set_state(
|
|
_redis_key(tenant_id),
|
|
redis_url,
|
|
{
|
|
"status": "running",
|
|
"mode": mode,
|
|
"total": 0,
|
|
"processed": 0,
|
|
"started_at": datetime.now(timezone.utc).isoformat(),
|
|
"finished_at": None,
|
|
"started_by": started_by_email,
|
|
"error": None,
|
|
},
|
|
)
|
|
|
|
asyncio.create_task(
|
|
IndexingService._run_reindex_bg(tenant_id, mode, redis_url, db_url),
|
|
name=f"reindex-{tenant_id}",
|
|
)
|
|
|
|
# ── Avvio rescan allegati ─────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def start_rescan(
|
|
tenant_id: uuid.UUID,
|
|
started_by_email: str,
|
|
redis_url: str,
|
|
db_url: str,
|
|
force: bool = False,
|
|
) -> None:
|
|
"""
|
|
Avvia il job di rescan allegati in background.
|
|
|
|
force=False: processa solo allegati con extracted_text IS NULL
|
|
force=True: processa tutti gli allegati (ri-estrae anche quelli gia' estratti)
|
|
"""
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await client.get(_redis_rescan_key(tenant_id))
|
|
if raw:
|
|
data = json.loads(raw)
|
|
if data.get("status") == "running":
|
|
raise ValueError("Un job di scansione allegati e' gia' in corso per questo tenant")
|
|
# Controlla anche se il reindex e' in corso
|
|
raw_reindex = await client.get(_redis_key(tenant_id))
|
|
if raw_reindex:
|
|
data_reindex = json.loads(raw_reindex)
|
|
if data_reindex.get("status") == "running":
|
|
raise ValueError(
|
|
"Un job di reindex e' in corso. "
|
|
"Attendi il termine prima di avviare la scansione allegati."
|
|
)
|
|
finally:
|
|
await client.aclose()
|
|
|
|
mode_label = "force" if force else "differential"
|
|
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id),
|
|
redis_url,
|
|
{
|
|
"status": "running",
|
|
"mode": mode_label,
|
|
"total": 0,
|
|
"processed": 0,
|
|
"started_at": datetime.now(timezone.utc).isoformat(),
|
|
"finished_at": None,
|
|
"started_by": started_by_email,
|
|
"error": None,
|
|
},
|
|
)
|
|
|
|
asyncio.create_task(
|
|
IndexingService._run_rescan_bg(tenant_id, force, redis_url, db_url),
|
|
name=f"rescan-{tenant_id}",
|
|
)
|
|
|
|
# ── Cancellazione reindex ─────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def cancel_reindex(tenant_id: uuid.UUID, redis_url: str) -> bool:
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await client.get(_redis_key(tenant_id))
|
|
if raw:
|
|
data = json.loads(raw)
|
|
if data.get("status") == "running":
|
|
await client.setex(
|
|
_redis_cancel_key(tenant_id),
|
|
REDIS_TTL_CANCEL,
|
|
"1",
|
|
)
|
|
return True
|
|
finally:
|
|
await client.aclose()
|
|
return False
|
|
|
|
# ── Cancellazione rescan ──────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def cancel_rescan(tenant_id: uuid.UUID, redis_url: str) -> bool:
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await client.get(_redis_rescan_key(tenant_id))
|
|
if raw:
|
|
data = json.loads(raw)
|
|
if data.get("status") == "running":
|
|
await client.setex(
|
|
_redis_rescan_cancel_key(tenant_id),
|
|
REDIS_TTL_CANCEL,
|
|
"1",
|
|
)
|
|
return True
|
|
finally:
|
|
await client.aclose()
|
|
return False
|
|
|
|
# ── Helpers Redis ─────────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def _set_state(redis_key_str: str, redis_url: str, state: dict) -> None:
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
await client.setex(
|
|
redis_key_str,
|
|
REDIS_TTL_STATUS,
|
|
json.dumps(state, default=str),
|
|
)
|
|
finally:
|
|
await client.aclose()
|
|
|
|
@staticmethod
|
|
async def _check_cancel_flag(cancel_key: str, redis_url: str) -> bool:
|
|
import redis.asyncio as aioredis
|
|
|
|
client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
flag = await client.get(cancel_key)
|
|
return flag == "1"
|
|
finally:
|
|
await client.aclose()
|
|
|
|
# Alias per retrocompatibilita' con il codice esistente
|
|
@staticmethod
|
|
async def _set_job_state(tenant_id: uuid.UUID, redis_url: str, state: dict) -> None:
|
|
await IndexingService._set_state(_redis_key(tenant_id), redis_url, state)
|
|
|
|
@staticmethod
|
|
async def _check_cancel(tenant_id: uuid.UUID, redis_url: str) -> bool:
|
|
return await IndexingService._check_cancel_flag(
|
|
_redis_cancel_key(tenant_id), redis_url
|
|
)
|
|
|
|
# ── Logica interna reindex ─────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def _run_reindex_bg(
|
|
tenant_id: uuid.UUID,
|
|
mode: ReindexMode,
|
|
redis_url: str,
|
|
db_url: str,
|
|
) -> None:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.info(f"Avvio reindex {mode} per tenant {tenant_id}")
|
|
|
|
engine = create_async_engine(db_url, echo=False)
|
|
AsyncSessionFactory = sessionmaker( # type: ignore[call-overload]
|
|
engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
|
|
state: dict = {
|
|
"status": "running",
|
|
"mode": mode,
|
|
"total": 0,
|
|
"processed": 0,
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"started_by": None,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
async with AsyncSessionFactory() as db:
|
|
from app.models.message import Message
|
|
|
|
count_q = select(func.count(Message.id)).where(
|
|
Message.tenant_id == tenant_id,
|
|
Message.parent_message_id.is_(None),
|
|
)
|
|
if mode == "differential":
|
|
count_q = count_q.where(Message.search_vector.is_(None))
|
|
|
|
total: int = (await db.execute(count_q)).scalar_one()
|
|
|
|
import redis.asyncio as aioredis
|
|
redis_client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await redis_client.get(_redis_key(tenant_id))
|
|
if raw:
|
|
state = json.loads(raw)
|
|
state["total"] = total
|
|
state["processed"] = 0
|
|
await redis_client.setex(
|
|
_redis_key(tenant_id),
|
|
REDIS_TTL_STATUS,
|
|
json.dumps(state, default=str),
|
|
)
|
|
finally:
|
|
await redis_client.aclose()
|
|
|
|
log.info(f"Reindex {mode}: {total} messaggi da processare")
|
|
|
|
if total == 0:
|
|
state.update({
|
|
"status": "completed",
|
|
"processed": 0,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_job_state(tenant_id, redis_url, state)
|
|
return
|
|
|
|
ids_q = select(Message.id).where(
|
|
Message.tenant_id == tenant_id,
|
|
Message.parent_message_id.is_(None),
|
|
)
|
|
if mode == "differential":
|
|
ids_q = ids_q.where(Message.search_vector.is_(None))
|
|
|
|
ids_result = await db.execute(ids_q)
|
|
all_ids = [str(row[0]) for row in ids_result.fetchall()]
|
|
|
|
processed = 0
|
|
for batch_start in range(0, len(all_ids), BATCH_SIZE):
|
|
if await IndexingService._check_cancel(tenant_id, redis_url):
|
|
log.info(f"Reindex {mode} annullato al batch {batch_start}")
|
|
state.update({
|
|
"status": "cancelled",
|
|
"processed": processed,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_job_state(tenant_id, redis_url, state)
|
|
return
|
|
|
|
batch_ids = all_ids[batch_start: batch_start + BATCH_SIZE]
|
|
|
|
await db.execute(
|
|
text("""
|
|
UPDATE messages
|
|
SET search_vector =
|
|
setweight(to_tsvector('italian', coalesce(subject, '')), 'A') ||
|
|
setweight(to_tsvector('simple', coalesce(from_address, '')), 'B') ||
|
|
setweight(to_tsvector('simple',
|
|
coalesce(array_to_string(to_addresses, ' '), '')), 'B') ||
|
|
setweight(to_tsvector('italian', coalesce(body_text, '')), 'C') ||
|
|
setweight(to_tsvector('italian', coalesce((
|
|
SELECT string_agg(a.extracted_text, ' ')
|
|
FROM attachments a
|
|
WHERE a.message_id = messages.id
|
|
AND a.extracted_text IS NOT NULL
|
|
), '')), 'D')
|
|
WHERE id = ANY(:ids)
|
|
"""),
|
|
{"ids": batch_ids},
|
|
)
|
|
await db.commit()
|
|
|
|
processed += len(batch_ids)
|
|
state["processed"] = processed
|
|
await IndexingService._set_job_state(tenant_id, redis_url, state)
|
|
|
|
await asyncio.sleep(0.05)
|
|
|
|
state.update({
|
|
"status": "completed",
|
|
"processed": processed,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_job_state(tenant_id, redis_url, state)
|
|
log.info(f"Reindex {mode} completato: {processed}/{total} messaggi")
|
|
|
|
except Exception as exc:
|
|
log.error(f"Errore reindex {mode} tenant {tenant_id}: {exc}", exc_info=True)
|
|
state.update({
|
|
"status": "failed",
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
"error": str(exc),
|
|
})
|
|
await IndexingService._set_job_state(tenant_id, redis_url, state)
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
# ── Logica interna rescan allegati ────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
async def _run_rescan_bg(
|
|
tenant_id: uuid.UUID,
|
|
force: bool,
|
|
redis_url: str,
|
|
db_url: str,
|
|
) -> None:
|
|
"""
|
|
Coroutine di rescan allegati eseguita in background.
|
|
|
|
Algoritmo:
|
|
1. Trova gli allegati del tenant con formato supportato
|
|
(solo quelli con extracted_text IS NULL se force=False)
|
|
2. Per ogni batch: scarica da MinIO, estrae testo, aggiorna extracted_text
|
|
3. Dopo ogni batch: ricostruisce search_vector per i messaggi interessati
|
|
4. Aggiorna progresso in Redis dopo ogni batch
|
|
5. Controlla flag di cancellazione tra un batch e l'altro
|
|
"""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
log = logging.getLogger(__name__)
|
|
mode_str = "force" if force else "differential"
|
|
log.info(f"Avvio rescan allegati {mode_str} per tenant {tenant_id}")
|
|
|
|
engine = create_async_engine(db_url, echo=False)
|
|
AsyncSessionFactory = sessionmaker( # type: ignore[call-overload]
|
|
engine, class_=AsyncSession, expire_on_commit=False
|
|
)
|
|
|
|
state: dict = {
|
|
"status": "running",
|
|
"mode": mode_str,
|
|
"total": 0,
|
|
"processed": 0,
|
|
"started_at": None,
|
|
"finished_at": None,
|
|
"started_by": None,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
async with AsyncSessionFactory() as db:
|
|
from app.config import get_settings
|
|
from app.models.message import Attachment
|
|
|
|
settings = get_settings()
|
|
|
|
# ── 1. Conta allegati da processare ───────────────────────────
|
|
from sqlalchemy import or_
|
|
|
|
ext_conditions = [
|
|
Attachment.filename.ilike(f"%.{e}") for e in _SUPPORTED_EXTENSIONS
|
|
]
|
|
base_filter = [
|
|
Attachment.tenant_id == tenant_id,
|
|
or_(
|
|
Attachment.content_type.in_(list(_SUPPORTED_CONTENT_TYPES)),
|
|
*ext_conditions,
|
|
),
|
|
]
|
|
if not force:
|
|
base_filter.append(Attachment.extracted_text.is_(None))
|
|
|
|
count_q = await db.execute(
|
|
select(func.count(Attachment.id)).where(*base_filter)
|
|
)
|
|
total: int = count_q.scalar_one()
|
|
|
|
# Aggiorna totale in Redis
|
|
import redis.asyncio as aioredis
|
|
redis_client = aioredis.from_url(redis_url, decode_responses=True)
|
|
try:
|
|
raw = await redis_client.get(_redis_rescan_key(tenant_id))
|
|
if raw:
|
|
state = json.loads(raw)
|
|
state["total"] = total
|
|
state["processed"] = 0
|
|
await redis_client.setex(
|
|
_redis_rescan_key(tenant_id),
|
|
REDIS_TTL_STATUS,
|
|
json.dumps(state, default=str),
|
|
)
|
|
finally:
|
|
await redis_client.aclose()
|
|
|
|
log.info(f"Rescan {mode_str}: {total} allegati da processare")
|
|
|
|
if total == 0:
|
|
state.update({
|
|
"status": "completed",
|
|
"processed": 0,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id), redis_url, state
|
|
)
|
|
return
|
|
|
|
# ── 2. Recupera IDs allegati da processare ────────────────────
|
|
ids_q = select(Attachment.id).where(*base_filter)
|
|
ids_result = await db.execute(ids_q)
|
|
all_att_ids = [row[0] for row in ids_result.fetchall()]
|
|
|
|
# ── 3. Crea client MinIO ───────────────────────────────────────
|
|
try:
|
|
from miniopy_async import Minio # type: ignore[import]
|
|
minio = Minio(
|
|
endpoint=settings.minio_endpoint,
|
|
access_key=settings.minio_access_key,
|
|
secret_key=settings.minio_secret_key,
|
|
secure=settings.minio_use_ssl,
|
|
)
|
|
bucket = settings.minio_bucket
|
|
except Exception as e:
|
|
raise RuntimeError(f"Impossibile creare client MinIO: {e}") from e
|
|
|
|
# ── 4. Processa in batch ───────────────────────────────────────
|
|
processed = 0
|
|
for batch_start in range(0, len(all_att_ids), RESCAN_BATCH_SIZE):
|
|
# Controlla cancellazione
|
|
if await IndexingService._check_cancel_flag(
|
|
_redis_rescan_cancel_key(tenant_id), redis_url
|
|
):
|
|
log.info(f"Rescan {mode_str} annullato al batch {batch_start}")
|
|
state.update({
|
|
"status": "cancelled",
|
|
"processed": processed,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id), redis_url, state
|
|
)
|
|
return
|
|
|
|
batch_ids = all_att_ids[batch_start: batch_start + RESCAN_BATCH_SIZE]
|
|
|
|
# Carica allegati del batch
|
|
att_result = await db.execute(
|
|
select(Attachment).where(Attachment.id.in_(batch_ids))
|
|
)
|
|
attachments = list(att_result.scalars().all())
|
|
|
|
affected_message_ids: set[str] = set()
|
|
|
|
for att in attachments:
|
|
extractor = _resolve_extractor(att.content_type, att.filename)
|
|
if extractor is None:
|
|
continue
|
|
|
|
try:
|
|
response = await minio.get_object(bucket, att.storage_path)
|
|
content = await response.content.read()
|
|
response.close()
|
|
except Exception as e:
|
|
log.warning(
|
|
f"Impossibile scaricare allegato {att.id} "
|
|
f"({att.filename!r}) da MinIO: {e}"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
e_name = _ext(att.filename)
|
|
if e_name == "p7m":
|
|
extracted = _extract_p7m(content, att.filename)
|
|
else:
|
|
extracted = extractor(content) # type: ignore[operator]
|
|
except Exception as ex:
|
|
log.debug(f"Errore estrazione {att.filename!r}: {ex}")
|
|
continue
|
|
|
|
if not extracted or not extracted.strip():
|
|
continue
|
|
|
|
att.extracted_text = extracted[:MAX_EXTRACTED_TEXT_LEN]
|
|
affected_message_ids.add(str(att.message_id))
|
|
log.debug(
|
|
f"Testo estratto da {att.filename!r}: "
|
|
f"{len(att.extracted_text)} caratteri"
|
|
)
|
|
|
|
await db.flush()
|
|
|
|
# Ricostruisce search_vector per i messaggi interessati
|
|
if affected_message_ids:
|
|
await db.execute(
|
|
text("""
|
|
UPDATE messages
|
|
SET search_vector =
|
|
setweight(to_tsvector('italian', coalesce(subject, '')), 'A') ||
|
|
setweight(to_tsvector('simple', coalesce(from_address, '')), 'B') ||
|
|
setweight(to_tsvector('simple',
|
|
coalesce(array_to_string(to_addresses, ' '), '')), 'B') ||
|
|
setweight(to_tsvector('italian', coalesce(body_text, '')), 'C') ||
|
|
setweight(to_tsvector('italian', coalesce((
|
|
SELECT string_agg(a.extracted_text, ' ')
|
|
FROM attachments a
|
|
WHERE a.message_id = messages.id
|
|
AND a.extracted_text IS NOT NULL
|
|
), '')), 'D')
|
|
WHERE id = ANY(:ids)
|
|
"""),
|
|
{"ids": list(affected_message_ids)},
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
processed += len(batch_ids)
|
|
state["processed"] = processed
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id), redis_url, state
|
|
)
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
# ── 5. Completato ──────────────────────────────────────────────
|
|
state.update({
|
|
"status": "completed",
|
|
"processed": processed,
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
})
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id), redis_url, state
|
|
)
|
|
log.info(f"Rescan {mode_str} completato: {processed}/{total} allegati")
|
|
|
|
except Exception as exc:
|
|
log.error(f"Errore rescan tenant {tenant_id}: {exc}", exc_info=True)
|
|
state.update({
|
|
"status": "failed",
|
|
"finished_at": datetime.now(timezone.utc).isoformat(),
|
|
"error": str(exc),
|
|
})
|
|
await IndexingService._set_state(
|
|
_redis_rescan_key(tenant_id), redis_url, state
|
|
)
|
|
finally:
|
|
await engine.dispose()
|