Files
PecHub/backend/app/services/indexing_service.py
T
2026-03-27 13:54:07 +01:00

1227 lines
46 KiB
Python

"""
Servizio di gestione indicizzazione full-text dei messaggi.
Funzionalita':
- Statistiche sull'indicizzazione (messaggi indicizzati vs totali)
- Avvio reindex totale o differenziale in background
- Monitoraggio progresso tramite Redis
- Cancellazione di un job in corso
- Avvio rescan allegati: ri-estrazione testo da MinIO + aggiornamento search_vector
Stato del job reindex salvato in Redis:
pechub:reindex:{tenant_id} -> JSON con stato corrente (TTL 24h)
pechub:reindex:{tenant_id}:cancel -> flag cancellazione (TTL 10min)
Stato del job rescan salvato in Redis:
pechub:rescan:{tenant_id} -> JSON con stato corrente (TTL 24h)
pechub:rescan:{tenant_id}:cancel -> flag cancellazione (TTL 10min)
Il background task usa una sessione DB propria (non quella della request).
"""
import asyncio
import io
import json
import logging
import re
import uuid
from datetime import datetime, timezone
from typing import Literal, Optional
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.logging import get_logger
logger = get_logger(__name__)
# ─── Costanti ─────────────────────────────────────────────────────────────────
REDIS_KEY_PREFIX = "pechub:reindex"
REDIS_RESCAN_PREFIX = "pechub:rescan"
REDIS_TTL_STATUS = 60 * 60 * 24 # 24 ore
REDIS_TTL_CANCEL = 60 * 10 # 10 minuti
BATCH_SIZE = 500 # messaggi per batch (reindex)
RESCAN_BATCH_SIZE = 50 # allegati per batch (rescan - piu' pesante)
STALE_THRESHOLD_HOURS = 2 # ore prima di segnalare un job come stale
MAX_EXTRACTED_TEXT_LEN = 50_000
MAX_COMBINED_TEXT_LEN = 200_000
ReindexMode = Literal["full", "differential"]
JobStatus = Literal["idle", "running", "completed", "failed", "cancelled"]
# ─── Content-type e estensioni supportate per rescan ─────────────────────────
_SUPPORTED_EXTENSIONS = {
"pdf", "docx", "doc", "xlsx", "xls", "pptx", "ppt",
"odt", "ods", "odp", "rtf", "txt", "csv", "xml",
"html", "htm", "json", "eml", "msg", "p7m",
"md",
# Immagini (OCR)
"png", "jpg", "jpeg", "tiff", "tif", "bmp", "gif", "webp",
}
_SUPPORTED_CONTENT_TYPES = {
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword",
"application/vnd.ms-word",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.ms-powerpoint",
"application/vnd.oasis.opendocument.text",
"application/vnd.oasis.opendocument.spreadsheet",
"application/vnd.oasis.opendocument.presentation",
"application/rtf",
"text/rtf",
"text/plain",
"text/csv",
"text/xml",
"application/xml",
"text/html",
"message/rfc822",
"application/pkcs7-mime",
"application/x-pkcs7-mime",
"text/markdown",
# Immagini (OCR)
"image/png",
"image/jpeg",
"image/tiff",
"image/bmp",
"image/gif",
"image/webp",
}
# ─── Helpers Redis ─────────────────────────────────────────────────────────────
def _redis_key(tenant_id: uuid.UUID) -> str:
return f"{REDIS_KEY_PREFIX}:{tenant_id}"
def _redis_cancel_key(tenant_id: uuid.UUID) -> str:
return f"{REDIS_KEY_PREFIX}:{tenant_id}:cancel"
def _redis_rescan_key(tenant_id: uuid.UUID) -> str:
return f"{REDIS_RESCAN_PREFIX}:{tenant_id}"
def _redis_rescan_cancel_key(tenant_id: uuid.UUID) -> str:
return f"{REDIS_RESCAN_PREFIX}:{tenant_id}:cancel"
# ─── Estrattori testo allegati ─────────────────────────────────────────────────
def _ext(filename: str | None) -> str:
"""Restituisce l'estensione del file in minuscolo, senza punto."""
if not filename:
return ""
fn = filename.lower()
if fn.endswith(".p7m"):
return "p7m"
idx = fn.rfind(".")
return fn[idx + 1:] if idx >= 0 else ""
# Soglia minima di caratteri estratti da pypdf prima di ricorrere all'OCR.
_PDF_OCR_THRESHOLD = 50
# Numero massimo di pagine OCR per evitare timeout su PDF lunghi.
_PDF_OCR_MAX_PAGES = 15
def _extract_pdf(content: bytes) -> str:
"""
Estrae testo da PDF tramite pypdf.
Se il testo estratto e' inferiore a _PDF_OCR_THRESHOLD caratteri (PDF
image-only / scansione), attiva il fallback OCR via Tesseract.
"""
try:
import pypdf # type: ignore[import]
reader = pypdf.PdfReader(io.BytesIO(content))
parts: list[str] = []
for page in reader.pages:
try:
t = page.extract_text()
if t:
parts.append(t)
except Exception:
continue
text = " ".join(parts)
except ImportError:
logger.warning("pypdf non installato: impossibile estrarre testo da PDF")
return ""
except Exception as e:
logger.debug(f"Errore estrazione PDF: {e}")
return ""
if len(text.strip()) < _PDF_OCR_THRESHOLD:
logger.debug(
f"PDF con testo insufficiente ({len(text.strip())} char), "
"tentativo OCR..."
)
ocr_text = _extract_pdf_ocr(content)
if len(ocr_text.strip()) > len(text.strip()):
return ocr_text
return text
def _extract_pdf_ocr(content: bytes) -> str:
"""
OCR su PDF image-only tramite pdf2image + Tesseract.
Converte le pagine a 200 DPI e applica Tesseract con lingua italiana + inglese.
Processa al massimo _PDF_OCR_MAX_PAGES pagine per evitare timeout.
"""
try:
from pdf2image import convert_from_bytes # type: ignore[import]
import pytesseract # type: ignore[import]
pages = convert_from_bytes(
content,
dpi=200,
last_page=_PDF_OCR_MAX_PAGES,
)
parts: list[str] = []
for page_img in pages:
try:
t = pytesseract.image_to_string(page_img, lang="ita+eng")
if t and t.strip():
parts.append(t.strip())
except Exception:
continue
return " ".join(parts)
except ImportError:
logger.warning(
"pdf2image o pytesseract non installati: impossibile OCR PDF"
)
return ""
except Exception as e:
logger.debug(f"Errore OCR PDF: {e}")
return ""
def _extract_image_ocr(content: bytes) -> str:
"""
Estrae testo da un file immagine (PNG, JPEG, TIFF, BMP, ecc.) tramite OCR.
Usa Tesseract con lingua italiana + inglese per massima copertura
su documenti italiani.
"""
try:
import pytesseract # type: ignore[import]
from PIL import Image # type: ignore[import]
img = Image.open(io.BytesIO(content))
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
text = pytesseract.image_to_string(img, lang="ita+eng")
return " ".join(text.split())
except ImportError:
logger.warning(
"pytesseract o Pillow non installati: impossibile OCR immagine"
)
return ""
except Exception as e:
logger.debug(f"Errore OCR immagine: {e}")
return ""
def _extract_docx(content: bytes) -> str:
try:
import docx # type: ignore[import]
doc = docx.Document(io.BytesIO(content))
parts = [p.text for p in doc.paragraphs if p.text and p.text.strip()]
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if cell.text and cell.text.strip():
parts.append(cell.text.strip())
return " ".join(parts)
except ImportError:
logger.warning("python-docx non installato: impossibile estrarre testo da DOCX")
return ""
except Exception as e:
logger.debug(f"Errore estrazione DOCX: {e}")
return ""
def _extract_doc(content: bytes) -> str:
"""
Estrae testo da file DOC (formato OLE2/legacy Microsoft Word).
Prima tenta python-docx (gestisce .docx eventualmente rinominati come .doc).
Se fallisce, esegue uno scan del binario OLE2 estraendo sequenze di caratteri
stampabili di almeno 5 caratteri consecutivi (approccio 'strings').
Non richiede librerie aggiuntive e funziona per la maggior parte dei .doc
in lingua italiana/europea (ASCII + Latin-1).
"""
# Tentativo 1: python-docx (per .docx rinominati o ZIP-based)
result = _extract_docx(content)
if result.strip():
return result
# Tentativo 2: scan binario ASCII per OLE2
try:
run: list[int] = []
parts: list[str] = []
for byte in content:
if 32 <= byte <= 126: # ASCII stampabile
run.append(byte)
else:
if len(run) >= 5:
parts.append(bytes(run).decode("ascii", errors="ignore"))
run = []
if len(run) >= 5:
parts.append(bytes(run).decode("ascii", errors="ignore"))
# Mantieni solo sequenze con almeno 3 lettere (filtra sequenze di simboli)
meaningful = [p for p in parts if sum(1 for c in p if c.isalpha()) >= 3]
if meaningful:
text = " ".join(meaningful)
return " ".join(text.split())[:MAX_EXTRACTED_TEXT_LEN]
except Exception as e:
logger.debug(f"Errore estrazione DOC OLE2 binario: {e}")
return ""
def _extract_plain(content: bytes) -> str:
try:
try:
txt = content.decode("utf-8")
except UnicodeDecodeError:
txt = content.decode("latin-1", errors="replace")
if "<" in txt and ">" in txt:
txt = re.sub(r"<[^>]+>", " ", txt)
txt = re.sub(r"&[a-zA-Z]+;", " ", txt)
return " ".join(txt.split())
except Exception as e:
logger.debug(f"Errore estrazione plain: {e}")
return ""
def _extract_eml(content: bytes) -> str:
try:
import email as emaillib
msg = emaillib.message_from_bytes(content)
parts: list[str] = []
subject = msg.get("Subject", "")
if subject:
parts.append(subject)
sender = msg.get("From", "")
if sender:
parts.append(sender)
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain":
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
except Exception:
pass
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace")) # type: ignore[arg-type]
return " ".join(parts)
except Exception as e:
logger.debug(f"Errore estrazione EML: {e}")
return ""
def _unwrap_p7m_asn1(data: bytes) -> bytes | None:
def read_tag_length(buf: bytes, offset: int):
tag = buf[offset]
offset += 1
lb = buf[offset]
offset += 1
if lb & 0x80:
num_bytes = lb & 0x7F
ln = int.from_bytes(buf[offset:offset + num_bytes], "big")
offset += num_bytes
else:
ln = lb
return tag, ln, offset
pos = 0
try:
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x06:
return None
pos += ln
tag, ln, pos = read_tag_length(data, pos)
if tag != 0xA0:
return None
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x02:
return None
pos += ln
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x31:
return None
pos += ln
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x30:
return None
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x06:
return None
pos += ln
tag, ln, pos = read_tag_length(data, pos)
if tag != 0xA0:
return None
tag, ln, pos = read_tag_length(data, pos)
if tag != 0x04:
return None
return data[pos: pos + ln]
except Exception:
return None
def _extract_p7m(content: bytes, original_filename: str | None = None) -> str:
inner_content = _unwrap_p7m_asn1(content)
if not inner_content:
return ""
inner_ext = ""
if original_filename:
fn = original_filename.lower()
if fn.endswith(".p7m"):
fn = fn[:-4]
idx = fn.rfind(".")
if idx >= 0:
inner_ext = fn[idx + 1:]
extractor = _EXTRACTORS_SYNC.get(inner_ext)
if extractor:
return extractor(inner_content)
if inner_content[:4] == b"%PDF":
return _extract_pdf(inner_content)
if inner_content[:2] == b"PK":
for fn_try in (_extract_docx, _extract_plain):
result = fn_try(inner_content)
if result.strip():
return result
return _extract_plain(inner_content)
_EXTRACTORS_SYNC: dict = {
"pdf": _extract_pdf,
"docx": _extract_docx,
"doc": _extract_doc, # usa fallback OLE2 per .doc legacy
"txt": _extract_plain,
"csv": _extract_plain,
"xml": _extract_plain,
"html": _extract_plain,
"htm": _extract_plain,
"json": _extract_plain,
"md": _extract_plain, # Markdown e' testo semplice
"eml": _extract_eml,
"msg": _extract_eml,
"p7m": _extract_p7m,
# Immagini (OCR)
"png": _extract_image_ocr,
"jpg": _extract_image_ocr,
"jpeg": _extract_image_ocr,
"tiff": _extract_image_ocr,
"tif": _extract_image_ocr,
"bmp": _extract_image_ocr,
"gif": _extract_image_ocr,
"webp": _extract_image_ocr,
}
def _resolve_extractor(content_type: str | None, filename: str | None):
"""Ritorna la funzione estrattore appropriata, o None."""
e = _ext(filename)
if e in _EXTRACTORS_SYNC:
return _EXTRACTORS_SYNC[e]
ct = (content_type or "").lower()
_ct_map = {
"application/pdf": "pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
"application/msword": "doc",
"application/vnd.ms-word": "doc",
"text/plain": "txt",
"text/csv": "csv",
"text/xml": "xml",
"application/xml": "xml",
"text/html": "html",
"text/markdown": "md",
"message/rfc822": "eml",
"application/pkcs7-mime": "p7m",
"application/x-pkcs7-mime": "p7m",
# Immagini (OCR)
"image/png": "png",
"image/jpeg": "jpeg",
"image/tiff": "tiff",
"image/bmp": "bmp",
"image/gif": "gif",
"image/webp": "webp",
}
mapped = _ct_map.get(ct)
if mapped:
return _EXTRACTORS_SYNC.get(mapped)
return None
def _is_extractable(content_type: str | None, filename: str | None) -> bool:
e = _ext(filename)
if e in _SUPPORTED_EXTENSIONS:
return True
ct = (content_type or "").lower()
return ct in _SUPPORTED_CONTENT_TYPES
# ─── Servizio ──────────────────────────────────────────────────────────────────
class IndexingService:
"""Gestisce le operazioni di indicizzazione full-text per un tenant."""
def __init__(self, db: AsyncSession) -> None:
self.db = db
# ── Statistiche ──────────────────────────────────────────────────────────
async def get_stats(self, tenant_id: uuid.UUID) -> dict:
"""
Restituisce le statistiche di copertura dell'indicizzazione per il tenant.
"""
from app.models.message import Attachment, Message
total_q = await self.db.execute(
select(func.count(Message.id)).where(
Message.tenant_id == tenant_id,
Message.parent_message_id.is_(None),
)
)
total_messages: int = total_q.scalar_one()
indexed_q = await self.db.execute(
select(func.count(Message.id)).where(
Message.tenant_id == tenant_id,
Message.parent_message_id.is_(None),
Message.search_vector.isnot(None),
)
)
indexed_messages: int = indexed_q.scalar_one()
_supported_content_types_list = list(_SUPPORTED_CONTENT_TYPES)
_supported_extensions_like = [f"%.{e}" for e in _SUPPORTED_EXTENSIONS]
from sqlalchemy import or_
ext_conditions = [Attachment.filename.ilike(ext) for ext in _supported_extensions_like]
att_total_q = await self.db.execute(
select(func.count(Attachment.id)).where(
Attachment.tenant_id == tenant_id,
or_(
Attachment.content_type.in_(_supported_content_types_list),
*ext_conditions,
),
)
)
attachments_total: int = att_total_q.scalar_one()
att_extracted_q = await self.db.execute(
select(func.count(Attachment.id)).where(
Attachment.tenant_id == tenant_id,
Attachment.extracted_text.isnot(None),
)
)
attachments_extracted: int = att_extracted_q.scalar_one()
unindexed_messages = total_messages - indexed_messages
coverage_pct = (
round(indexed_messages / total_messages * 100, 1)
if total_messages > 0
else 100.0
)
attachments_pct = (
round(attachments_extracted / attachments_total * 100, 1)
if attachments_total > 0
else 100.0
)
return {
"total_messages": total_messages,
"indexed_messages": indexed_messages,
"unindexed_messages": unindexed_messages,
"coverage_pct": coverage_pct,
"attachments_total": attachments_total,
"attachments_extracted": attachments_extracted,
"attachments_pct": attachments_pct,
}
# ── Stato job reindex ─────────────────────────────────────────────────────
@staticmethod
async def get_job_status(tenant_id: uuid.UUID, redis_url: str) -> dict:
"""Legge lo stato del job di reindex da Redis."""
return await IndexingService._read_job_state(
_redis_key(tenant_id), redis_url
)
# ── Stato job rescan ──────────────────────────────────────────────────────
@staticmethod
async def get_rescan_status(tenant_id: uuid.UUID, redis_url: str) -> dict:
"""Legge lo stato del job di rescan allegati da Redis."""
return await IndexingService._read_job_state(
_redis_rescan_key(tenant_id), redis_url
)
@staticmethod
async def _read_job_state(redis_key_str: str, redis_url: str) -> dict:
"""Helper generico: legge uno stato job da Redis."""
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await client.get(redis_key_str)
finally:
await client.aclose()
if not raw:
return {
"status": "idle",
"mode": None,
"total": 0,
"processed": 0,
"progress_pct": 0.0,
"started_at": None,
"finished_at": None,
"started_by": None,
"elapsed_seconds": None,
"is_stale": False,
"error": None,
}
try:
data: dict = json.loads(raw)
except json.JSONDecodeError:
return {"status": "idle"}
is_stale = False
elapsed_seconds = None
if data.get("started_at"):
try:
started = datetime.fromisoformat(data["started_at"])
finished_str = data.get("finished_at")
ref_time = (
datetime.fromisoformat(finished_str)
if finished_str
else datetime.now(timezone.utc)
)
elapsed_seconds = int((ref_time - started).total_seconds())
if data.get("status") == "running":
elapsed_hours = elapsed_seconds / 3600
is_stale = elapsed_hours >= STALE_THRESHOLD_HOURS
except Exception:
pass
total = data.get("total", 0)
processed = data.get("processed", 0)
progress_pct = round(processed / total * 100, 1) if total > 0 else 0.0
return {
"status": data.get("status", "idle"),
"mode": data.get("mode"),
"total": total,
"processed": processed,
"progress_pct": progress_pct,
"started_at": data.get("started_at"),
"finished_at": data.get("finished_at"),
"started_by": data.get("started_by"),
"elapsed_seconds": elapsed_seconds,
"is_stale": is_stale,
"error": data.get("error"),
}
# ── Avvio reindex ─────────────────────────────────────────────────────────
@staticmethod
async def start_reindex(
tenant_id: uuid.UUID,
mode: ReindexMode,
started_by_email: str,
redis_url: str,
db_url: str,
) -> None:
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await client.get(_redis_key(tenant_id))
if raw:
data = json.loads(raw)
if data.get("status") == "running":
raise ValueError("Un job di reindex e' gia' in corso per questo tenant")
# Controlla anche se il rescan e' in corso
raw_rescan = await client.get(_redis_rescan_key(tenant_id))
if raw_rescan:
data_rescan = json.loads(raw_rescan)
if data_rescan.get("status") == "running":
raise ValueError(
"Un job di scansione allegati e' in corso. "
"Attendi il termine prima di avviare il reindex."
)
finally:
await client.aclose()
await IndexingService._set_state(
_redis_key(tenant_id),
redis_url,
{
"status": "running",
"mode": mode,
"total": 0,
"processed": 0,
"started_at": datetime.now(timezone.utc).isoformat(),
"finished_at": None,
"started_by": started_by_email,
"error": None,
},
)
asyncio.create_task(
IndexingService._run_reindex_bg(tenant_id, mode, redis_url, db_url),
name=f"reindex-{tenant_id}",
)
# ── Avvio rescan allegati ─────────────────────────────────────────────────
@staticmethod
async def start_rescan(
tenant_id: uuid.UUID,
started_by_email: str,
redis_url: str,
db_url: str,
force: bool = False,
) -> None:
"""
Avvia il job di rescan allegati in background.
force=False: processa solo allegati con extracted_text IS NULL
force=True: processa tutti gli allegati (ri-estrae anche quelli gia' estratti)
"""
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await client.get(_redis_rescan_key(tenant_id))
if raw:
data = json.loads(raw)
if data.get("status") == "running":
raise ValueError("Un job di scansione allegati e' gia' in corso per questo tenant")
# Controlla anche se il reindex e' in corso
raw_reindex = await client.get(_redis_key(tenant_id))
if raw_reindex:
data_reindex = json.loads(raw_reindex)
if data_reindex.get("status") == "running":
raise ValueError(
"Un job di reindex e' in corso. "
"Attendi il termine prima di avviare la scansione allegati."
)
finally:
await client.aclose()
mode_label = "force" if force else "differential"
await IndexingService._set_state(
_redis_rescan_key(tenant_id),
redis_url,
{
"status": "running",
"mode": mode_label,
"total": 0,
"processed": 0,
"started_at": datetime.now(timezone.utc).isoformat(),
"finished_at": None,
"started_by": started_by_email,
"error": None,
},
)
asyncio.create_task(
IndexingService._run_rescan_bg(tenant_id, force, redis_url, db_url),
name=f"rescan-{tenant_id}",
)
# ── Cancellazione reindex ─────────────────────────────────────────────────
@staticmethod
async def cancel_reindex(tenant_id: uuid.UUID, redis_url: str) -> bool:
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await client.get(_redis_key(tenant_id))
if raw:
data = json.loads(raw)
if data.get("status") == "running":
await client.setex(
_redis_cancel_key(tenant_id),
REDIS_TTL_CANCEL,
"1",
)
return True
finally:
await client.aclose()
return False
# ── Cancellazione rescan ──────────────────────────────────────────────────
@staticmethod
async def cancel_rescan(tenant_id: uuid.UUID, redis_url: str) -> bool:
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await client.get(_redis_rescan_key(tenant_id))
if raw:
data = json.loads(raw)
if data.get("status") == "running":
await client.setex(
_redis_rescan_cancel_key(tenant_id),
REDIS_TTL_CANCEL,
"1",
)
return True
finally:
await client.aclose()
return False
# ── Helpers Redis ─────────────────────────────────────────────────────────
@staticmethod
async def _set_state(redis_key_str: str, redis_url: str, state: dict) -> None:
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
await client.setex(
redis_key_str,
REDIS_TTL_STATUS,
json.dumps(state, default=str),
)
finally:
await client.aclose()
@staticmethod
async def _check_cancel_flag(cancel_key: str, redis_url: str) -> bool:
import redis.asyncio as aioredis
client = aioredis.from_url(redis_url, decode_responses=True)
try:
flag = await client.get(cancel_key)
return flag == "1"
finally:
await client.aclose()
# Alias per retrocompatibilita' con il codice esistente
@staticmethod
async def _set_job_state(tenant_id: uuid.UUID, redis_url: str, state: dict) -> None:
await IndexingService._set_state(_redis_key(tenant_id), redis_url, state)
@staticmethod
async def _check_cancel(tenant_id: uuid.UUID, redis_url: str) -> bool:
return await IndexingService._check_cancel_flag(
_redis_cancel_key(tenant_id), redis_url
)
# ── Logica interna reindex ─────────────────────────────────────────────────
@staticmethod
async def _run_reindex_bg(
tenant_id: uuid.UUID,
mode: ReindexMode,
redis_url: str,
db_url: str,
) -> None:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
log = logging.getLogger(__name__)
log.info(f"Avvio reindex {mode} per tenant {tenant_id}")
engine = create_async_engine(db_url, echo=False)
AsyncSessionFactory = sessionmaker( # type: ignore[call-overload]
engine, class_=AsyncSession, expire_on_commit=False
)
state: dict = {
"status": "running",
"mode": mode,
"total": 0,
"processed": 0,
"started_at": None,
"finished_at": None,
"started_by": None,
"error": None,
}
try:
async with AsyncSessionFactory() as db:
from app.models.message import Message
count_q = select(func.count(Message.id)).where(
Message.tenant_id == tenant_id,
Message.parent_message_id.is_(None),
)
if mode == "differential":
count_q = count_q.where(Message.search_vector.is_(None))
total: int = (await db.execute(count_q)).scalar_one()
import redis.asyncio as aioredis
redis_client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await redis_client.get(_redis_key(tenant_id))
if raw:
state = json.loads(raw)
state["total"] = total
state["processed"] = 0
await redis_client.setex(
_redis_key(tenant_id),
REDIS_TTL_STATUS,
json.dumps(state, default=str),
)
finally:
await redis_client.aclose()
log.info(f"Reindex {mode}: {total} messaggi da processare")
if total == 0:
state.update({
"status": "completed",
"processed": 0,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_job_state(tenant_id, redis_url, state)
return
ids_q = select(Message.id).where(
Message.tenant_id == tenant_id,
Message.parent_message_id.is_(None),
)
if mode == "differential":
ids_q = ids_q.where(Message.search_vector.is_(None))
ids_result = await db.execute(ids_q)
all_ids = [str(row[0]) for row in ids_result.fetchall()]
processed = 0
for batch_start in range(0, len(all_ids), BATCH_SIZE):
if await IndexingService._check_cancel(tenant_id, redis_url):
log.info(f"Reindex {mode} annullato al batch {batch_start}")
state.update({
"status": "cancelled",
"processed": processed,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_job_state(tenant_id, redis_url, state)
return
batch_ids = all_ids[batch_start: batch_start + BATCH_SIZE]
await db.execute(
text("""
UPDATE messages
SET search_vector =
setweight(to_tsvector('italian', coalesce(subject, '')), 'A') ||
setweight(to_tsvector('simple', coalesce(from_address, '')), 'B') ||
setweight(to_tsvector('simple',
coalesce(array_to_string(to_addresses, ' '), '')), 'B') ||
setweight(to_tsvector('italian', coalesce(body_text, '')), 'C') ||
setweight(to_tsvector('italian', coalesce((
SELECT string_agg(a.extracted_text, ' ')
FROM attachments a
WHERE a.message_id = messages.id
AND a.extracted_text IS NOT NULL
), '')), 'D')
WHERE id = ANY(:ids)
"""),
{"ids": batch_ids},
)
await db.commit()
processed += len(batch_ids)
state["processed"] = processed
await IndexingService._set_job_state(tenant_id, redis_url, state)
await asyncio.sleep(0.05)
state.update({
"status": "completed",
"processed": processed,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_job_state(tenant_id, redis_url, state)
log.info(f"Reindex {mode} completato: {processed}/{total} messaggi")
except Exception as exc:
log.error(f"Errore reindex {mode} tenant {tenant_id}: {exc}", exc_info=True)
state.update({
"status": "failed",
"finished_at": datetime.now(timezone.utc).isoformat(),
"error": str(exc),
})
await IndexingService._set_job_state(tenant_id, redis_url, state)
finally:
await engine.dispose()
# ── Logica interna rescan allegati ────────────────────────────────────────
@staticmethod
async def _run_rescan_bg(
tenant_id: uuid.UUID,
force: bool,
redis_url: str,
db_url: str,
) -> None:
"""
Coroutine di rescan allegati eseguita in background.
Algoritmo:
1. Trova gli allegati del tenant con formato supportato
(solo quelli con extracted_text IS NULL se force=False)
2. Per ogni batch: scarica da MinIO, estrae testo, aggiorna extracted_text
3. Dopo ogni batch: ricostruisce search_vector per i messaggi interessati
4. Aggiorna progresso in Redis dopo ogni batch
5. Controlla flag di cancellazione tra un batch e l'altro
"""
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
log = logging.getLogger(__name__)
mode_str = "force" if force else "differential"
log.info(f"Avvio rescan allegati {mode_str} per tenant {tenant_id}")
engine = create_async_engine(db_url, echo=False)
AsyncSessionFactory = sessionmaker( # type: ignore[call-overload]
engine, class_=AsyncSession, expire_on_commit=False
)
state: dict = {
"status": "running",
"mode": mode_str,
"total": 0,
"processed": 0,
"started_at": None,
"finished_at": None,
"started_by": None,
"error": None,
}
try:
async with AsyncSessionFactory() as db:
from app.config import get_settings
from app.models.message import Attachment
settings = get_settings()
# ── 1. Conta allegati da processare ───────────────────────────
from sqlalchemy import or_
ext_conditions = [
Attachment.filename.ilike(f"%.{e}") for e in _SUPPORTED_EXTENSIONS
]
base_filter = [
Attachment.tenant_id == tenant_id,
or_(
Attachment.content_type.in_(list(_SUPPORTED_CONTENT_TYPES)),
*ext_conditions,
),
]
if not force:
base_filter.append(Attachment.extracted_text.is_(None))
count_q = await db.execute(
select(func.count(Attachment.id)).where(*base_filter)
)
total: int = count_q.scalar_one()
# Aggiorna totale in Redis
import redis.asyncio as aioredis
redis_client = aioredis.from_url(redis_url, decode_responses=True)
try:
raw = await redis_client.get(_redis_rescan_key(tenant_id))
if raw:
state = json.loads(raw)
state["total"] = total
state["processed"] = 0
await redis_client.setex(
_redis_rescan_key(tenant_id),
REDIS_TTL_STATUS,
json.dumps(state, default=str),
)
finally:
await redis_client.aclose()
log.info(f"Rescan {mode_str}: {total} allegati da processare")
if total == 0:
state.update({
"status": "completed",
"processed": 0,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_state(
_redis_rescan_key(tenant_id), redis_url, state
)
return
# ── 2. Recupera IDs allegati da processare ────────────────────
ids_q = select(Attachment.id).where(*base_filter)
ids_result = await db.execute(ids_q)
all_att_ids = [row[0] for row in ids_result.fetchall()]
# ── 3. Crea client MinIO ───────────────────────────────────────
try:
from miniopy_async import Minio # type: ignore[import]
minio = Minio(
endpoint=settings.minio_endpoint,
access_key=settings.minio_access_key,
secret_key=settings.minio_secret_key,
secure=settings.minio_use_ssl,
)
bucket = settings.minio_bucket
except Exception as e:
raise RuntimeError(f"Impossibile creare client MinIO: {e}") from e
# ── 4. Processa in batch ───────────────────────────────────────
processed = 0
for batch_start in range(0, len(all_att_ids), RESCAN_BATCH_SIZE):
# Controlla cancellazione
if await IndexingService._check_cancel_flag(
_redis_rescan_cancel_key(tenant_id), redis_url
):
log.info(f"Rescan {mode_str} annullato al batch {batch_start}")
state.update({
"status": "cancelled",
"processed": processed,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_state(
_redis_rescan_key(tenant_id), redis_url, state
)
return
batch_ids = all_att_ids[batch_start: batch_start + RESCAN_BATCH_SIZE]
# Carica allegati del batch
att_result = await db.execute(
select(Attachment).where(Attachment.id.in_(batch_ids))
)
attachments = list(att_result.scalars().all())
affected_message_ids: set[str] = set()
for att in attachments:
extractor = _resolve_extractor(att.content_type, att.filename)
if extractor is None:
continue
try:
response = await minio.get_object(bucket, att.storage_path)
content = await response.content.read()
response.close()
except Exception as e:
log.warning(
f"Impossibile scaricare allegato {att.id} "
f"({att.filename!r}) da MinIO: {e}"
)
continue
try:
e_name = _ext(att.filename)
if e_name == "p7m":
extracted = _extract_p7m(content, att.filename)
else:
extracted = extractor(content) # type: ignore[operator]
except Exception as ex:
log.debug(f"Errore estrazione {att.filename!r}: {ex}")
continue
if not extracted or not extracted.strip():
continue
att.extracted_text = extracted[:MAX_EXTRACTED_TEXT_LEN]
affected_message_ids.add(str(att.message_id))
log.debug(
f"Testo estratto da {att.filename!r}: "
f"{len(att.extracted_text)} caratteri"
)
await db.flush()
# Ricostruisce search_vector per i messaggi interessati
if affected_message_ids:
await db.execute(
text("""
UPDATE messages
SET search_vector =
setweight(to_tsvector('italian', coalesce(subject, '')), 'A') ||
setweight(to_tsvector('simple', coalesce(from_address, '')), 'B') ||
setweight(to_tsvector('simple',
coalesce(array_to_string(to_addresses, ' '), '')), 'B') ||
setweight(to_tsvector('italian', coalesce(body_text, '')), 'C') ||
setweight(to_tsvector('italian', coalesce((
SELECT string_agg(a.extracted_text, ' ')
FROM attachments a
WHERE a.message_id = messages.id
AND a.extracted_text IS NOT NULL
), '')), 'D')
WHERE id = ANY(:ids)
"""),
{"ids": list(affected_message_ids)},
)
await db.commit()
processed += len(batch_ids)
state["processed"] = processed
await IndexingService._set_state(
_redis_rescan_key(tenant_id), redis_url, state
)
await asyncio.sleep(0.1)
# ── 5. Completato ──────────────────────────────────────────────
state.update({
"status": "completed",
"processed": processed,
"finished_at": datetime.now(timezone.utc).isoformat(),
})
await IndexingService._set_state(
_redis_rescan_key(tenant_id), redis_url, state
)
log.info(f"Rescan {mode_str} completato: {processed}/{total} allegati")
except Exception as exc:
log.error(f"Errore rescan tenant {tenant_id}: {exc}", exc_info=True)
state.update({
"status": "failed",
"finished_at": datetime.now(timezone.utc).isoformat(),
"error": str(exc),
})
await IndexingService._set_state(
_redis_rescan_key(tenant_id), redis_url, state
)
finally:
await engine.dispose()