Files
2026-03-27 13:54:07 +01:00

719 lines
25 KiB
Python

"""
Indicizzazione full-text dei messaggi PEC.
Responsabilita':
1. Scarica gli allegati da MinIO
2. Estrae il testo in base al formato del file
3. Aggiorna la colonna extracted_text in attachments
4. Aggiorna la colonna search_vector in messages includendo il testo degli allegati
Formati supportati:
- PDF (.pdf) tramite pypdf
- Word (.docx, .doc) tramite python-docx
- Excel (.xlsx, .xls) tramite openpyxl
- PowerPoint(.pptx, .ppt) tramite python-pptx
- LibreOffice (.odt, .ods, .odp) tramite odfpy
- RTF (.rtf) tramite striprtf
- Testo (.txt, .csv, .xml, .html, .htm) testo grezzo
- Email (.eml, .msg) tramite stdlib email
- Firmati (.p7m) unwrap CMS poi estrae in base all'estensione interna
Viene chiamato alla fine di _save_message in sync.py, in modo non bloccante:
un'eccezione qui non interrompe la sincronizzazione del messaggio.
"""
import io
import logging
import re
import uuid
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
# Dimensione massima del testo estratto per allegato (caratteri)
MAX_EXTRACTED_TEXT_LEN = 50_000
# Dimensione massima del testo aggregato degli allegati per il search_vector
MAX_COMBINED_TEXT_LEN = 200_000
# ─── Rilevamento tipo file ────────────────────────────────────────────────────
def _ext(filename: str | None) -> str:
"""Restituisce l'estensione del file in minuscolo, senza punto."""
if not filename:
return ""
fn = filename.lower()
# Gestione doppia estensione es. documento.pdf.p7m
if fn.endswith(".p7m"):
return "p7m"
idx = fn.rfind(".")
return fn[idx + 1:] if idx >= 0 else ""
def _is_extractable(content_type: str | None, filename: str | None) -> bool:
"""Ritorna True se il formato e' supportato dall'estrattore."""
ct = (content_type or "").lower()
e = _ext(filename)
return e in _EXTRACTORS or ct in _CONTENT_TYPE_MAP
def _resolve_extractor(content_type: str | None, filename: str | None):
"""Ritorna la funzione estrattore appropriata, o None."""
e = _ext(filename)
if e in _EXTRACTORS:
return _EXTRACTORS[e]
ct = (content_type or "").lower()
if ct in _CONTENT_TYPE_MAP:
return _EXTRACTORS.get(_CONTENT_TYPE_MAP[ct])
return None
# ─── Estrattori ───────────────────────────────────────────────────────────────
# Soglia minima di caratteri estratti da pypdf prima di ricorrere all'OCR.
# Un PDF di testo reale produce migliaia di caratteri; una scansione ne produce
# zero o pochissimi (artefatti). 50 char e' un valore conservativo sicuro.
_PDF_OCR_THRESHOLD = 50
# Numero massimo di pagine su cui eseguire OCR per evitare timeout su PDF lunghi.
_PDF_OCR_MAX_PAGES = 15
def _extract_pdf(content: bytes) -> str:
"""
Estrae testo da PDF tramite pypdf.
Se il testo estratto e' inferiore a _PDF_OCR_THRESHOLD caratteri (PDF
image-only / scansione), attiva il fallback OCR via Tesseract.
"""
try:
import pypdf # type: ignore[import]
reader = pypdf.PdfReader(io.BytesIO(content))
parts: list[str] = []
for page in reader.pages:
try:
t = page.extract_text()
if t:
parts.append(t)
except Exception:
continue
text = " ".join(parts)
except ImportError:
logger.warning("pypdf non installato: impossibile estrarre testo da PDF")
return ""
except Exception as e:
logger.debug(f"Errore estrazione PDF: {e}")
return ""
# Se il testo e' troppo corto, il PDF e' probabilmente una scansione
if len(text.strip()) < _PDF_OCR_THRESHOLD:
logger.debug(
f"PDF con testo insufficiente ({len(text.strip())} char), "
"tentativo OCR..."
)
ocr_text = _extract_pdf_ocr(content)
if len(ocr_text.strip()) > len(text.strip()):
return ocr_text
return text
def _extract_pdf_ocr(content: bytes) -> str:
"""
OCR su PDF image-only tramite pdf2image + Tesseract.
Converte le pagine del PDF in immagini PIL a 200 DPI (buon compromesso
qualita'/velocita' su CPU) e applica Tesseract con lingua italiana + inglese.
Processa al massimo _PDF_OCR_MAX_PAGES pagine per evitare timeout.
"""
try:
from pdf2image import convert_from_bytes # type: ignore[import]
import pytesseract # type: ignore[import]
pages = convert_from_bytes(
content,
dpi=200,
last_page=_PDF_OCR_MAX_PAGES,
)
parts: list[str] = []
for page_img in pages:
try:
t = pytesseract.image_to_string(page_img, lang="ita+eng")
if t and t.strip():
parts.append(t.strip())
except Exception:
continue
return " ".join(parts)
except ImportError:
logger.warning(
"pdf2image o pytesseract non installati: impossibile OCR PDF"
)
return ""
except Exception as e:
logger.debug(f"Errore OCR PDF: {e}")
return ""
def _extract_image_ocr(content: bytes) -> str:
"""
Estrae testo da un file immagine (PNG, JPEG, TIFF, BMP, ecc.) tramite OCR.
Usa Tesseract con lingua italiana + inglese per massima copertura
su documenti italiani.
"""
try:
import pytesseract # type: ignore[import]
from PIL import Image # type: ignore[import]
img = Image.open(io.BytesIO(content))
# Converti in RGB se necessario (TIFF multi-frame, palette, ecc.)
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
text = pytesseract.image_to_string(img, lang="ita+eng")
return " ".join(text.split())
except ImportError:
logger.warning(
"pytesseract o Pillow non installati: impossibile OCR immagine"
)
return ""
except Exception as e:
logger.debug(f"Errore OCR immagine: {e}")
return ""
def _extract_docx(content: bytes) -> str:
"""Estrae testo da DOCX/DOC tramite python-docx."""
try:
import docx # type: ignore[import]
doc = docx.Document(io.BytesIO(content))
parts = [p.text for p in doc.paragraphs if p.text and p.text.strip()]
# Include anche le tabelle
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if cell.text and cell.text.strip():
parts.append(cell.text.strip())
return " ".join(parts)
except ImportError:
logger.warning("python-docx non installato: impossibile estrarre testo da DOCX")
return ""
except Exception as e:
logger.debug(f"Errore estrazione DOCX: {e}")
return ""
def _extract_xlsx(content: bytes) -> str:
"""Estrae testo da XLSX/XLS tramite openpyxl."""
try:
import openpyxl # type: ignore[import]
wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True)
parts: list[str] = []
for ws in wb.worksheets:
for row in ws.iter_rows():
for cell in row:
if cell.value is not None:
v = str(cell.value).strip()
if v:
parts.append(v)
return " ".join(parts)
except ImportError:
logger.warning("openpyxl non installato: impossibile estrarre testo da XLSX")
return ""
except Exception as e:
logger.debug(f"Errore estrazione XLSX: {e}")
return ""
def _extract_pptx(content: bytes) -> str:
"""Estrae testo da PPTX/PPT tramite python-pptx."""
try:
from pptx import Presentation # type: ignore[import]
prs = Presentation(io.BytesIO(content))
parts: list[str] = []
for slide in prs.slides:
for shape in slide.shapes:
if shape.has_text_frame:
for para in shape.text_frame.paragraphs:
t = para.text.strip()
if t:
parts.append(t)
return " ".join(parts)
except ImportError:
logger.warning("python-pptx non installato: impossibile estrarre testo da PPTX")
return ""
except Exception as e:
logger.debug(f"Errore estrazione PPTX: {e}")
return ""
def _extract_odt(content: bytes) -> str:
"""Estrae testo da ODT/ODS/ODP tramite odfpy."""
try:
from odf import opendocument, teletype # type: ignore[import]
from odf.text import P # type: ignore[import]
doc = opendocument.load(io.BytesIO(content))
parts: list[str] = []
for el in doc.body.getElementsByType(P):
t = teletype.extractText(el).strip()
if t:
parts.append(t)
return " ".join(parts)
except ImportError:
logger.warning("odfpy non installato: impossibile estrarre testo da ODT")
return ""
except Exception as e:
logger.debug(f"Errore estrazione ODT: {e}")
return ""
def _extract_rtf(content: bytes) -> str:
"""Estrae testo da RTF tramite striprtf."""
try:
from striprtf.striprtf import rtf_to_text # type: ignore[import]
raw = content.decode("latin-1", errors="replace")
return rtf_to_text(raw)
except ImportError:
logger.warning("striprtf non installato: impossibile estrarre testo da RTF")
return ""
except Exception as e:
logger.debug(f"Errore estrazione RTF: {e}")
return ""
def _extract_plain(content: bytes) -> str:
"""Estrae testo da file di testo puro (txt, csv, xml, html, ecc.)."""
try:
# Prova UTF-8 prima, poi latin-1 come fallback
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
text = content.decode("latin-1", errors="replace")
# Per XML/HTML: rimuove i tag
if "<" in text and ">" in text:
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"&[a-zA-Z]+;", " ", text)
return " ".join(text.split())
except Exception as e:
logger.debug(f"Errore estrazione testo plain: {e}")
return ""
def _extract_eml(content: bytes) -> str:
"""Estrae testo da un file EML allegato."""
try:
import email as emaillib
msg = emaillib.message_from_bytes(content)
parts: list[str] = []
subject = msg.get("Subject", "")
if subject:
parts.append(subject)
sender = msg.get("From", "")
if sender:
parts.append(sender)
# Estrae body
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
except Exception:
pass
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace")) # type: ignore[arg-type]
return " ".join(parts)
except Exception as e:
logger.debug(f"Errore estrazione EML: {e}")
return ""
def _extract_p7m(content: bytes, original_filename: str | None = None) -> str:
"""
Estrae testo da un documento con firma digitale CAdES (.p7m).
Prova a fare l'unwrap del CMS envelope tramite la libreria cryptography
(gia' presente nel worker). Se l'unwrap ha successo, determina il formato
del documento interno dall'estensione del nome originale (es. fattura.pdf.p7m
-> PDF) e applica l'estrattore appropriato.
"""
inner_content: bytes | None = None
# Metodo 1: cryptography (CMS/PKCS7)
try:
from cryptography.hazmat.primitives.serialization import pkcs7 # type: ignore[import]
# load_pem_pkcs7_certificates / load_der_pkcs7_certificates non espongono il payload
# Usiamo il modulo backend direttamente
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
from cryptography.x509 import load_der_x509_certificate # noqa: F401
# Prova parsing DER diretto della struttura CMS ContentInfo
# La struttura ASN.1 di SignedData contiene encapContentInfo -> eContent
from cryptography.hazmat.bindings._rust import ( # type: ignore[import]
x509 as rust_x509,
)
_ = rust_x509 # solo per verificare import
except Exception:
pass
# Metodo piu' semplice: parsing ASN.1 manuale per estrarre eContent
# La struttura DER di CMS SignedData:
# SEQUENCE {
# OID (signedData)
# [0] EXPLICIT SEQUENCE {
# INTEGER (version)
# SET (digestAlgorithms)
# SEQUENCE (encapContentInfo) {
# OID (contentType = data)
# [0] EXPLICIT OCTET STRING (eContent) <- questo e' il contenuto originale
# }
# ...
# }
# }
try:
inner_content = _unwrap_p7m_asn1(content)
except Exception as e:
logger.debug(f"Unwrap P7M ASN1 fallito: {e}")
if not inner_content:
logger.debug("Impossibile estrarre contenuto dal file .p7m")
return ""
# Determina l'estensione interna dal nome file originale
# es. "fattura.pdf.p7m" -> inner ext = "pdf"
inner_ext = ""
if original_filename:
fn = original_filename.lower()
if fn.endswith(".p7m"):
fn = fn[:-4] # rimuove .p7m
idx = fn.rfind(".")
if idx >= 0:
inner_ext = fn[idx + 1:]
extractor = _EXTRACTORS.get(inner_ext)
if extractor:
logger.debug(f"P7M: estrazione interna come {inner_ext!r}")
return extractor(inner_content)
# Fallback: prova a riconoscere il formato dall'header del contenuto
if inner_content[:4] == b"%PDF":
return _extract_pdf(inner_content)
if inner_content[:2] in (b"PK",): # ZIP-based (docx, xlsx, pptx, odt)
# Prova nell'ordine piu' comune
for fn in (_extract_docx, _extract_xlsx, _extract_pptx, _extract_odt):
result = fn(inner_content)
if result.strip():
return result
# Ultimo tentativo: plain text
return _extract_plain(inner_content)
def _unwrap_p7m_asn1(data: bytes) -> bytes | None:
"""
Parsing ASN.1 DER minimale per estrarre eContent da una struttura CMS SignedData.
Non verifica la firma: serve solo per l'estrazione del testo.
"""
pos = 0
length = len(data)
def read_tag_length(buf: bytes, offset: int) -> tuple[int, int, int]:
"""Ritorna (tag, length, new_offset)."""
tag = buf[offset]
offset += 1
lb = buf[offset]
offset += 1
if lb & 0x80:
num_bytes = lb & 0x7F
ln = int.from_bytes(buf[offset:offset + num_bytes], "big")
offset += num_bytes
else:
ln = lb
return tag, ln, offset
# outer SEQUENCE
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
# OID (contentType = signedData)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x06:
return None
pos += ln # skip OID value
# [0] EXPLICIT
tag, ln, pos = read_tag_length(data, pos)
if tag != 0xA0:
return None
# SEQUENCE (SignedData)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
# INTEGER (version)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x02:
return None
pos += ln
# SET (digestAlgorithms)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x31:
return None
pos += ln
# SEQUENCE (encapContentInfo)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
# OID (contentType dentro encapContentInfo)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x06:
return None
pos += ln
# [0] EXPLICIT (eContent, opzionale)
tag, ln, pos = read_tag_length(data, pos)
if tag != 0xA0:
return None
# OCTET STRING con il contenuto originale
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x04:
return None
return data[pos: pos + ln]
# ─── Mapping formato -> estrattore ────────────────────────────────────────────
_EXTRACTORS: dict[str, object] = {
# Documenti Office
"pdf": _extract_pdf,
"docx": _extract_docx,
"doc": _extract_docx,
"xlsx": _extract_xlsx,
"xls": _extract_xlsx,
"pptx": _extract_pptx,
"ppt": _extract_pptx,
# LibreOffice
"odt": _extract_odt,
"ods": _extract_odt,
"odp": _extract_odt,
# Testo
"txt": _extract_plain,
"csv": _extract_plain,
"xml": _extract_plain,
"html": _extract_plain,
"htm": _extract_plain,
"json": _extract_plain,
# RTF
"rtf": _extract_rtf,
# Email
"eml": _extract_eml,
"msg": _extract_eml,
# Firma digitale CAdES
"p7m": _extract_p7m,
# Immagini (OCR)
"png": _extract_image_ocr,
"jpg": _extract_image_ocr,
"jpeg": _extract_image_ocr,
"tiff": _extract_image_ocr,
"tif": _extract_image_ocr,
"bmp": _extract_image_ocr,
"gif": _extract_image_ocr,
"webp": _extract_image_ocr,
}
# Mapping content-type -> estensione normalizzata (per fallback quando il filename manca)
_CONTENT_TYPE_MAP: dict[str, str] = {
"application/pdf": "pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
"application/msword": "doc",
"application/vnd.ms-word": "doc",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
"application/vnd.ms-excel": "xls",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": "pptx",
"application/vnd.ms-powerpoint": "ppt",
"application/vnd.oasis.opendocument.text": "odt",
"application/vnd.oasis.opendocument.spreadsheet": "ods",
"application/vnd.oasis.opendocument.presentation": "odp",
"application/rtf": "rtf",
"text/rtf": "rtf",
"text/plain": "txt",
"text/csv": "csv",
"text/xml": "xml",
"application/xml": "xml",
"text/html": "html",
"message/rfc822": "eml",
"application/pkcs7-mime": "p7m",
"application/x-pkcs7-mime": "p7m",
# Immagini (OCR)
"image/png": "png",
"image/jpeg": "jpeg",
"image/jpg": "jpeg",
"image/tiff": "tiff",
"image/bmp": "bmp",
"image/gif": "gif",
"image/webp": "webp",
}
# ─── Job principale ───────────────────────────────────────────────────────────
async def index_message(
message_id: uuid.UUID,
db: AsyncSession,
) -> None:
"""
Indicizza un messaggio per la ricerca full-text.
Non solleva eccezioni: tutti gli errori vengono loggati ma non propagati,
per non interrompere il flusso di sincronizzazione.
"""
try:
await _do_index_message(message_id, db)
except Exception as e:
logger.error(
f"Errore indicizzazione messaggio {message_id}: {e}",
exc_info=True,
)
async def _do_index_message(
message_id: uuid.UUID,
db: AsyncSession,
) -> None:
"""Logica interna di indicizzazione (puo' sollevare eccezioni)."""
from app.config import get_settings
from app.models import Attachment, Message
settings = get_settings()
# ── Carica il messaggio ───────────────────────────────────────────────────
msg_result = await db.execute(
select(Message).where(Message.id == message_id)
)
message = msg_result.scalar_one_or_none()
if not message:
logger.warning(f"index_message: messaggio {message_id} non trovato in DB")
return
# ── Carica gli allegati ───────────────────────────────────────────────────
att_result = await db.execute(
select(Attachment).where(Attachment.message_id == message_id)
)
attachments = list(att_result.scalars().all())
if not attachments:
logger.debug(f"Messaggio {message_id}: nessun allegato, skip indicizzazione allegati")
return
# ── Crea client MinIO ─────────────────────────────────────────────────────
try:
from miniopy_async import Minio # type: ignore[import]
minio = Minio(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
)
except Exception as e:
logger.warning(f"Impossibile creare client MinIO per indicizzazione {message_id}: {e}")
return
bucket = settings.minio_bucket
attachment_texts: list[str] = []
indexed_count = 0
for att in attachments:
# Se gia' indicizzato, usa il testo cached
if att.extracted_text is not None:
attachment_texts.append(att.extracted_text)
continue
# Controlla se il formato e' supportato
extractor = _resolve_extractor(att.content_type, att.filename)
if extractor is None:
logger.debug(
f"Formato non supportato per indicizzazione: "
f"{att.filename!r} ({att.content_type!r})"
)
continue
# Scarica da MinIO
try:
response = await minio.get_object(bucket, att.storage_path)
content = await response.content.read()
response.close()
except Exception as e:
logger.warning(
f"Impossibile scaricare allegato {att.id} "
f"({att.filename!r}) da MinIO: {e}"
)
continue
# Estrai testo - per p7m passa anche il filename originale
try:
e = _ext(att.filename)
if e == "p7m":
extracted = _extract_p7m(content, att.filename)
else:
extracted = extractor(content) # type: ignore[operator]
except Exception as ex:
logger.debug(f"Errore estrazione {att.filename!r}: {ex}")
continue
if not extracted or not extracted.strip():
logger.debug(f"Nessun testo estratto da {att.filename!r}")
continue
# Limita la dimensione e salva sull'ORM object (col. mappata)
att.extracted_text = extracted[:MAX_EXTRACTED_TEXT_LEN]
attachment_texts.append(att.extracted_text)
indexed_count += 1
logger.debug(
f"Testo estratto da {att.filename!r}: "
f"{len(att.extracted_text)} caratteri"
)
# ── Aggiorna search_vector includendo il testo degli allegati ─────────────
if attachment_texts:
combined = " ".join(attachment_texts)[:MAX_COMBINED_TEXT_LEN]
await db.execute(
text("""
UPDATE messages
SET search_vector =
setweight(to_tsvector('italian', coalesce(subject, '')), 'A') ||
setweight(to_tsvector('simple', coalesce(from_address, '')), 'B') ||
setweight(to_tsvector('simple',
coalesce(array_to_string(to_addresses, ' '), '')), 'B') ||
setweight(to_tsvector('italian', coalesce(body_text, '')), 'C') ||
setweight(to_tsvector('italian', :att_text), 'D')
WHERE id = :message_id
"""),
{"att_text": combined, "message_id": str(message_id)},
)
await db.flush()
logger.info(
f"Indicizzazione completata: messaggio {message_id}, "
f"{indexed_count} allegati indicizzati su {len(attachments)} totali"
)
else:
logger.debug(
f"Messaggio {message_id}: nessun allegato con testo estraibile"
)