Files
GMG-Smart-Quote/backend/app/api/valuations.py
T
2026-05-06 15:45:04 +02:00

608 lines
21 KiB
Python

import math
from datetime import datetime, timezone
from decimal import Decimal
from typing import Annotated
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy.dialects.postgresql import insert as pg_insert
from app.core.deps import get_db, get_current_user
from app.models.user import User, UserRole
from app.models.valuation import Valuation, ValuationComment, ValuationHistory, ValuationPhoto, ValuationStatus, ValuationPriority
from app.models.vehicle import VehicleRegistry, VehicleVersion, PlateVersionCandidate, ApiUsageLog
from app.models.motornet_valuation import MotornetValuation
from app.schemas.valuation import (
CommentCreate,
CommentResponse,
HistoryResponse,
MotornetValuationResponse,
PaginatedValuations,
PhotoResponse,
ValuationCreate,
ValuationListItem,
ValuationResponse,
VehicleInfo,
)
from app.services import storage
from app.integrations.motornet import valuate_vehicle, MotornetUnavailableError
import structlog
logger = structlog.get_logger()
router = APIRouter()
PRIORITY_RANK = {
ValuationPriority.contratto: 1,
ValuationPriority.preventivo: 2,
ValuationPriority.valutazione: 3,
}
SELLER_ROLES = {UserRole.venditore}
EVALUATOR_ROLES = {UserRole.valutatore, UserRole.admin}
ALL_ALLOWED = SELLER_ROLES | EVALUATOR_ROLES | {UserRole.backoffice}
def _check_access(valuation: Valuation, user: User) -> None:
if user.role in EVALUATOR_ROLES:
return
if user.role in {UserRole.backoffice}:
return
if valuation.user_id == user.id:
return
if user.group_id and valuation.group_id == user.group_id:
return
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Accesso non consentito")
async def _build_vehicle_info(plate: str, motornet_code: str | None, db: AsyncSession) -> VehicleInfo | None:
result = await db.execute(select(VehicleRegistry).where(VehicleRegistry.plate == plate))
reg = result.scalar_one_or_none()
if not reg:
return VehicleInfo(
plate=plate,
vin=None,
vehicle_type=None,
registration_date=None,
version_label=None,
brand_name=None,
model_description=None,
gamma_description=None,
list_price=None,
)
version_label = None
brand_name = None
model_description = None
gamma_description = None
list_price = None
code = motornet_code or reg.selected_motornet_code
if code:
vr = await db.execute(select(VehicleVersion).where(VehicleVersion.motornet_code == code))
ver = vr.scalar_one_or_none()
if ver:
version_label = ver.version_label
brand_name = ver.brand_name
model_description = ver.model_description
gamma_description = ver.gamma_description
list_price = ver.list_price
return VehicleInfo(
plate=reg.plate,
vin=reg.vin,
vehicle_type=reg.vehicle_type,
registration_date=reg.registration_date,
version_label=version_label,
brand_name=brand_name,
model_description=model_description,
gamma_description=gamma_description,
list_price=list_price,
)
async def _build_list_item(valuation: Valuation, db: AsyncSession) -> ValuationListItem:
brand_name = None
model_description = None
version_label = None
code = valuation.motornet_code
vr = await db.execute(select(VehicleRegistry).where(VehicleRegistry.plate == valuation.plate))
reg = vr.scalar_one_or_none()
if not code and reg:
code = reg.selected_motornet_code
if code:
vres = await db.execute(select(VehicleVersion).where(VehicleVersion.motornet_code == code))
ver = vres.scalar_one_or_none()
if ver:
brand_name = ver.brand_name
model_description = ver.model_description
version_label = ver.version_label
return ValuationListItem(
id=valuation.id,
plate=valuation.plate,
status=valuation.status.value,
priority=valuation.priority.value,
priority_rank=valuation.priority_rank,
mileage=valuation.mileage,
is_frozen=valuation.is_frozen,
created_at=valuation.created_at,
updated_at=valuation.updated_at,
brand_name=brand_name,
model_description=model_description,
version_label=version_label,
final_value=valuation.final_value,
)
async def _try_motornet_valuation(
valuation: Valuation,
db: AsyncSession,
) -> MotornetValuation | None:
motornet_code = valuation.motornet_code
if not motornet_code:
logger.info("motornet_valuation_skip_no_code", valuation_id=valuation.id)
return None
reg_result = await db.execute(
select(VehicleRegistry).where(VehicleRegistry.plate == valuation.plate)
)
reg = reg_result.scalar_one_or_none()
registration_date = reg.registration_date if reg else None
if not registration_date:
logger.info("motornet_valuation_skip_no_date", valuation_id=valuation.id, plate=valuation.plate)
return None
anno = registration_date.year
mese = registration_date.month
try:
data = await valuate_vehicle(
motornet_code=motornet_code,
anno=anno,
mese=mese,
km=valuation.mileage,
targa=valuation.plate,
)
except MotornetUnavailableError as exc:
logger.warning("motornet_valuation_failed", valuation_id=valuation.id, error=str(exc))
return None
val_data = data.get("valutazione") or {}
valutazioni_disponibili: int | None = data.get("valutazioniDisponibili")
usage_log = ApiUsageLog(
source="motornet_valuation",
plate=valuation.plate,
hit_cache=False,
remaining_queries=valutazioni_disponibili,
called_at=datetime.now(timezone.utc),
)
db.add(usage_log)
alimentazione_desc: str | None = None
alim = val_data.get("alimentazione")
if alim and isinstance(alim, dict):
alimentazione_desc = alim.get("descrizione")
brand_name: str | None = None
marca = val_data.get("marca")
if marca and isinstance(marca, dict):
brand_name = marca.get("nome")
def _dec(v) -> Decimal | None:
if v is None:
return None
try:
return Decimal(str(v))
except Exception:
return None
mn_val = MotornetValuation(
valuation_id=valuation.id,
plate=valuation.plate,
motornet_code=motornet_code,
fetched_at=datetime.now(timezone.utc),
registration_year=anno,
registration_month=mese,
mileage=valuation.mileage,
motornet_id=val_data.get("id"),
ediz_dati=val_data.get("edizDati"),
brand_name=brand_name,
model_description=val_data.get("modello"),
allestimento=val_data.get("allestimento"),
alimentazione=alimentazione_desc,
immagine_url=val_data.get("immagine"),
xml_url=val_data.get("xml"),
percorrenza_media_km=val_data.get("percorrenzaMediaKm"),
quotazione_blu=_dec(val_data.get("quotazioneEurotaxBlu")),
quotazione_blu_km=_dec(val_data.get("quotazioneEurotaxBluKm")),
quotazione_giallo=_dec(val_data.get("quotazioneEurotaxGiallo")),
quotazione_giallo_km=_dec(val_data.get("quotazioneEurotaxGialloKm")),
quotazione_blu_totale=_dec(val_data.get("quotazioneEurotaxBluTotale")),
quotazione_giallo_totale=_dec(val_data.get("quotazioneEurotaxGialloTotale")),
variazione_km=_dec(val_data.get("variazioneKm")),
prezzo_listino=_dec(val_data.get("prezzoListino")),
prezzo_accessori=_dec(val_data.get("prezzoAccessori")),
totale_riparazioni_carrozzeria=_dec(val_data.get("totaleRiparazioniCarrozzeria")),
totale_riparazioni_meccanica=_dec(val_data.get("totaleRiparazioniMeccanica")),
raw_response=data,
)
db.add(mn_val)
await db.commit()
await db.refresh(mn_val)
return mn_val
def _build_motornet_response(mn: MotornetValuation) -> MotornetValuationResponse:
return MotornetValuationResponse(
id=mn.id,
valuation_id=mn.valuation_id,
plate=mn.plate,
motornet_code=mn.motornet_code,
fetched_at=mn.fetched_at,
registration_year=mn.registration_year,
registration_month=mn.registration_month,
mileage=mn.mileage,
motornet_id=mn.motornet_id,
ediz_dati=mn.ediz_dati,
brand_name=mn.brand_name,
model_description=mn.model_description,
allestimento=mn.allestimento,
alimentazione=mn.alimentazione,
immagine_url=mn.immagine_url,
xml_url=mn.xml_url,
percorrenza_media_km=mn.percorrenza_media_km,
quotazione_blu=mn.quotazione_blu,
quotazione_blu_km=mn.quotazione_blu_km,
quotazione_giallo=mn.quotazione_giallo,
quotazione_giallo_km=mn.quotazione_giallo_km,
quotazione_blu_totale=mn.quotazione_blu_totale,
quotazione_giallo_totale=mn.quotazione_giallo_totale,
variazione_km=mn.variazione_km,
prezzo_listino=mn.prezzo_listino,
prezzo_accessori=mn.prezzo_accessori,
totale_riparazioni_carrozzeria=mn.totale_riparazioni_carrozzeria,
totale_riparazioni_meccanica=mn.totale_riparazioni_meccanica,
raw_response=mn.raw_response,
)
@router.post("", response_model=ValuationResponse, status_code=status.HTTP_201_CREATED)
async def create_valuation(
body: ValuationCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if current_user.role not in {UserRole.venditore, UserRole.valutatore, UserRole.admin}:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permessi insufficienti")
try:
priority_enum = ValuationPriority(body.priority)
except ValueError:
priority_enum = ValuationPriority.valutazione
priority_rank = PRIORITY_RANK[priority_enum]
normalized_plate = body.plate.upper().strip()
reg_stmt = pg_insert(VehicleRegistry).values(
plate=normalized_plate,
source="manual",
fetched_at=datetime.now(timezone.utc),
).on_conflict_do_nothing(index_elements=["plate"])
await db.execute(reg_stmt)
valuation = Valuation(
plate=body.plate.upper().strip(),
user_id=current_user.id,
group_id=current_user.group_id,
motornet_code=body.motornet_code,
mileage=body.mileage,
retake_regime=body.retake_regime,
expected_return_date=body.expected_return_date,
interest_fuel=body.interest_fuel,
priority=priority_enum,
priority_rank=priority_rank,
notes=body.notes,
status=ValuationStatus.inviata,
is_frozen=True,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
db.add(valuation)
await db.flush()
history_entry = ValuationHistory(
valuation_id=valuation.id,
changed_by=current_user.id,
field_name="status",
old_value=None,
new_value=ValuationStatus.inviata.value,
changed_at=datetime.now(timezone.utc),
)
db.add(history_entry)
await db.commit()
await db.refresh(valuation)
mn_record = await _try_motornet_valuation(valuation, db)
vehicle = await _build_vehicle_info(valuation.plate, valuation.motornet_code, db)
motornet_valuations = []
if mn_record:
motornet_valuations = [_build_motornet_response(mn_record)]
return ValuationResponse(
id=valuation.id,
plate=valuation.plate,
user_id=valuation.user_id,
group_id=valuation.group_id,
motornet_code=valuation.motornet_code,
mileage=valuation.mileage,
retake_regime=valuation.retake_regime.value if valuation.retake_regime else None,
expected_return_date=valuation.expected_return_date,
interest_fuel=valuation.interest_fuel,
priority=valuation.priority.value,
notes=valuation.notes,
status=valuation.status.value,
final_value=valuation.final_value,
evaluator_notes=valuation.evaluator_notes,
is_frozen=valuation.is_frozen,
priority_rank=valuation.priority_rank,
created_at=valuation.created_at,
updated_at=valuation.updated_at,
vehicle=vehicle,
photos=[],
comments=[],
history=[
HistoryResponse(
id=history_entry.id,
field_name=history_entry.field_name,
old_value=history_entry.old_value,
new_value=history_entry.new_value,
changed_by=history_entry.changed_by,
changed_by_name=current_user.full_name,
changed_at=history_entry.changed_at,
)
],
motornet_valuations=motornet_valuations,
)
@router.get("", response_model=PaginatedValuations)
async def list_valuations(
plate: str | None = Query(None),
status_filter: str | None = Query(None, alias="status"),
priority: str | None = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = select(Valuation)
if current_user.role == UserRole.venditore:
if current_user.group_id:
query = query.where(Valuation.group_id == current_user.group_id)
else:
query = query.where(Valuation.user_id == current_user.id)
if plate:
query = query.where(Valuation.plate.ilike(f"%{plate.upper()}%"))
if status_filter:
query = query.where(Valuation.status == status_filter)
if priority:
query = query.where(Valuation.priority == priority)
count_q = select(func.count()).select_from(query.subquery())
total_result = await db.execute(count_q)
total = total_result.scalar_one()
query = query.order_by(Valuation.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
valuations = result.scalars().all()
items = []
for v in valuations:
items.append(await _build_list_item(v, db))
return PaginatedValuations(
items=items,
total=total,
page=page,
page_size=page_size,
pages=max(1, math.ceil(total / page_size)),
)
@router.get("/{valuation_id}", response_model=ValuationResponse)
async def get_valuation(
valuation_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(Valuation)
.options(
selectinload(Valuation.photos),
selectinload(Valuation.comments),
selectinload(Valuation.history),
selectinload(Valuation.motornet_valuations),
)
.where(Valuation.id == valuation_id)
)
valuation = result.scalar_one_or_none()
if not valuation:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Valutazione non trovata")
_check_access(valuation, current_user)
vehicle = await _build_vehicle_info(valuation.plate, valuation.motornet_code, db)
user_ids = {valuation.user_id}
user_ids.update(c.user_id for c in valuation.comments)
user_ids.update(h.changed_by for h in valuation.history)
user_ids.update(p.uploaded_by for p in valuation.photos)
users_result = await db.execute(select(User).where(User.id.in_(user_ids)))
users_map = {u.id: u.full_name for u in users_result.scalars().all()}
comments = []
for c in sorted(valuation.comments, key=lambda x: x.created_at):
if not c.visible_to_all and current_user.role not in EVALUATOR_ROLES:
continue
comments.append(
CommentResponse(
id=c.id,
valuation_id=c.valuation_id,
user_id=c.user_id,
user_full_name=users_map.get(c.user_id),
content=c.content,
visible_to_all=c.visible_to_all,
created_at=c.created_at,
)
)
photos = []
for p in sorted(valuation.photos, key=lambda x: x.uploaded_at):
url = storage.get_presigned_url(p.storage_path)
photos.append(
PhotoResponse(
id=p.id,
valuation_id=p.valuation_id,
filename=p.filename,
storage_path=p.storage_path,
uploaded_by=p.uploaded_by,
uploaded_at=p.uploaded_at,
url=url,
)
)
history = []
for h in sorted(valuation.history, key=lambda x: x.changed_at):
history.append(
HistoryResponse(
id=h.id,
field_name=h.field_name,
old_value=h.old_value,
new_value=h.new_value,
changed_by=h.changed_by,
changed_by_name=users_map.get(h.changed_by),
changed_at=h.changed_at,
)
)
motornet_valuations = sorted(valuation.motornet_valuations, key=lambda x: x.fetched_at)
motornet_responses = [_build_motornet_response(mn) for mn in motornet_valuations]
return ValuationResponse(
id=valuation.id,
plate=valuation.plate,
user_id=valuation.user_id,
group_id=valuation.group_id,
motornet_code=valuation.motornet_code,
mileage=valuation.mileage,
retake_regime=valuation.retake_regime.value if valuation.retake_regime else None,
expected_return_date=valuation.expected_return_date,
interest_fuel=valuation.interest_fuel,
priority=valuation.priority.value,
notes=valuation.notes,
status=valuation.status.value,
final_value=valuation.final_value,
evaluator_notes=valuation.evaluator_notes,
is_frozen=valuation.is_frozen,
priority_rank=valuation.priority_rank,
created_at=valuation.created_at,
updated_at=valuation.updated_at,
vehicle=vehicle,
photos=photos,
comments=comments,
history=history,
motornet_valuations=motornet_responses,
)
@router.post("/{valuation_id}/photos", response_model=PhotoResponse, status_code=status.HTTP_201_CREATED)
async def upload_photo(
valuation_id: int,
file: Annotated[UploadFile, File(...)],
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(select(Valuation).where(Valuation.id == valuation_id))
valuation = result.scalar_one_or_none()
if not valuation:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Valutazione non trovata")
_check_access(valuation, current_user)
storage_path = await storage.upload_photo(file, valuation_id)
filename = file.filename or "foto"
photo = ValuationPhoto(
valuation_id=valuation_id,
filename=filename,
storage_path=storage_path,
uploaded_by=current_user.id,
uploaded_at=datetime.now(timezone.utc),
)
db.add(photo)
await db.commit()
await db.refresh(photo)
url = storage.get_presigned_url(storage_path)
return PhotoResponse(
id=photo.id,
valuation_id=photo.valuation_id,
filename=photo.filename,
storage_path=photo.storage_path,
uploaded_by=photo.uploaded_by,
uploaded_at=photo.uploaded_at,
url=url,
)
@router.post("/{valuation_id}/comments", response_model=CommentResponse, status_code=status.HTTP_201_CREATED)
async def add_comment(
valuation_id: int,
body: CommentCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(select(Valuation).where(Valuation.id == valuation_id))
valuation = result.scalar_one_or_none()
if not valuation:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Valutazione non trovata")
_check_access(valuation, current_user)
comment = ValuationComment(
valuation_id=valuation_id,
user_id=current_user.id,
content=body.content,
visible_to_all=body.visible_to_all,
created_at=datetime.now(timezone.utc),
)
db.add(comment)
await db.commit()
await db.refresh(comment)
return CommentResponse(
id=comment.id,
valuation_id=comment.valuation_id,
user_id=comment.user_id,
user_full_name=current_user.full_name,
content=comment.content,
visible_to_all=comment.visible_to_all,
created_at=comment.created_at,
)