Files
GMG-Smart-Quote/backend/app/services/vehicle_lookup.py
T
2026-06-04 15:55:48 +02:00

246 lines
9.2 KiB
Python

from datetime import datetime, timezone, timedelta
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from sqlalchemy.dialects.postgresql import insert as pg_insert
import structlog
from app.core.config import settings
from app.models.vehicle import VehicleRegistry, VehicleVersion, PlateVersionCandidate, ApiUsageLog
from app.integrations.motornet import lookup_plate as motornet_lookup, MotornetUnavailableError
from app.schemas.vehicle import VehicleLookupResponse, VehicleVersionItem
logger = structlog.get_logger()
def _is_cache_valid(registry: VehicleRegistry) -> bool:
if registry.expires_at is None:
return True
now = datetime.now(timezone.utc)
expires = registry.expires_at
if expires.tzinfo is None:
expires = expires.replace(tzinfo=timezone.utc)
return expires > now
async def _log_usage(
db: AsyncSession,
plate: str,
source: str,
hit_cache: bool,
remaining_queries: int | None,
) -> None:
log = ApiUsageLog(
source=source,
plate=plate,
hit_cache=hit_cache,
remaining_queries=remaining_queries,
called_at=datetime.now(timezone.utc),
)
db.add(log)
async def _build_response_from_db(
registry: VehicleRegistry,
db: AsyncSession,
) -> VehicleLookupResponse:
result = await db.execute(
select(PlateVersionCandidate, VehicleVersion)
.join(VehicleVersion, PlateVersionCandidate.motornet_code == VehicleVersion.motornet_code)
.where(PlateVersionCandidate.plate == registry.plate)
)
rows = result.all()
versions = []
for candidate, version in rows:
versions.append(
VehicleVersionItem(
motornet_code=version.motornet_code,
version_label=version.version_label,
body_type=version.body_type,
doors=version.doors,
wheelbase=version.wheelbase,
list_price=version.list_price,
production_start=version.production_start,
production_end=version.production_end,
commercial_start=version.commercial_start,
commercial_end=version.commercial_end,
brand_name=version.brand_name,
model_description=version.model_description,
gamma_description=version.gamma_description,
series_description=version.series_description,
is_selected=candidate.is_selected,
)
)
return VehicleLookupResponse(
plate=registry.plate,
vin=registry.vin,
vehicle_type=registry.vehicle_type,
registration_date=registry.registration_date,
homologation_code=registry.homologation_code,
engine_code=registry.engine_code,
last_revision_date=registry.last_revision_date,
source=registry.source,
selected_motornet_code=registry.selected_motornet_code,
versions=versions,
from_cache=True,
)
async def lookup(plate: str, db: AsyncSession) -> VehicleLookupResponse:
normalized = plate.upper().replace(" ", "")
result = await db.execute(
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
)
registry = result.scalar_one_or_none()
if registry and _is_cache_valid(registry):
logger.info("vehicle_lookup_cache_hit", plate=normalized)
await _log_usage(db, normalized, registry.source or "cache", True, None)
await db.commit()
return await _build_response_from_db(registry, db)
logger.info("vehicle_lookup_cache_miss", plate=normalized)
try:
mn_result = await motornet_lookup(normalized)
source = "motornet"
ttl_days = settings.vehicle_cache_ttl_days
expires_at = None
if ttl_days > 0:
expires_at = datetime.now(timezone.utc) + timedelta(days=ttl_days)
stmt = pg_insert(VehicleRegistry).values(
plate=normalized,
vin=mn_result.vin,
vehicle_type=mn_result.vehicle_type,
registration_date=mn_result.registration_date,
homologation_code=mn_result.homologation_code,
engine_code=mn_result.engine_code,
last_revision_date=mn_result.last_revision_date,
foreign_plate=mn_result.foreign_plate,
foreign_registration_date=mn_result.foreign_registration_date,
foreign_country=mn_result.foreign_country,
is_foreign_registered=mn_result.is_foreign_registered,
source=source,
raw_response=mn_result.raw_response,
fetched_at=datetime.now(timezone.utc),
expires_at=expires_at,
selected_motornet_code=None,
).on_conflict_do_update(
index_elements=["plate"],
set_={
"vin": mn_result.vin,
"vehicle_type": mn_result.vehicle_type,
"registration_date": mn_result.registration_date,
"homologation_code": mn_result.homologation_code,
"engine_code": mn_result.engine_code,
"last_revision_date": mn_result.last_revision_date,
"foreign_plate": mn_result.foreign_plate,
"foreign_registration_date": mn_result.foreign_registration_date,
"foreign_country": mn_result.foreign_country,
"is_foreign_registered": mn_result.is_foreign_registered,
"source": source,
"raw_response": mn_result.raw_response,
"fetched_at": datetime.now(timezone.utc),
"expires_at": expires_at,
},
)
await db.execute(stmt)
for mv in mn_result.versions:
vstmt = pg_insert(VehicleVersion).values(
motornet_code=mv.motornet_code,
brand_acronym=mn_result.brand_acronym,
brand_name=mn_result.brand_name,
model_code=mn_result.model_code,
model_description=mn_result.model_description,
gamma_code=mn_result.gamma_code,
gamma_description=mn_result.gamma_description,
series_code=mn_result.series_code,
series_description=mn_result.series_description,
historical_group_code=mn_result.historical_group_code,
historical_group_desc=mn_result.historical_group_desc,
cod_desc_model_code=mn_result.cod_desc_model_code,
cod_desc_model_desc=mn_result.cod_desc_model_desc,
version_label=mv.version_label,
body_type=mv.body_type,
doors=mv.doors,
wheelbase=mv.wheelbase,
list_price=mv.list_price,
production_start=mv.production_start,
production_end=mv.production_end,
commercial_start=mv.commercial_start,
commercial_end=mv.commercial_end,
first_seen_at=datetime.now(timezone.utc),
last_seen_at=datetime.now(timezone.utc),
seen_count=1,
source=source,
).on_conflict_do_update(
index_elements=["motornet_code"],
set_={
"last_seen_at": datetime.now(timezone.utc),
"seen_count": VehicleVersion.seen_count + 1,
"brand_name": mn_result.brand_name,
"model_description": mn_result.model_description,
"gamma_description": mn_result.gamma_description,
"series_description": mn_result.series_description,
},
)
await db.execute(vstmt)
cstmt = pg_insert(PlateVersionCandidate).values(
plate=normalized,
motornet_code=mv.motornet_code,
is_selected=False,
).on_conflict_do_nothing()
await db.execute(cstmt)
await _log_usage(db, normalized, source, False, mn_result.remaining_queries)
await db.commit()
refreshed = await db.execute(
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
)
registry = refreshed.scalar_one()
resp = await _build_response_from_db(registry, db)
resp.from_cache = False
return resp
except MotornetUnavailableError:
raise
async def select_version(plate: str, motornet_code: str, db: AsyncSession) -> None:
normalized = plate.upper().replace(" ", "")
await db.execute(
pg_insert(PlateVersionCandidate.__table__)
.values(plate=normalized, motornet_code=motornet_code, is_selected=True)
.on_conflict_do_update(
index_elements=["plate", "motornet_code"],
set_={"is_selected": True},
)
)
result = await db.execute(
select(PlateVersionCandidate).where(
PlateVersionCandidate.plate == normalized,
PlateVersionCandidate.motornet_code != motornet_code,
)
)
others = result.scalars().all()
for other in others:
other.is_selected = False
result = await db.execute(
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
)
registry = result.scalar_one_or_none()
if registry:
registry.selected_motornet_code = motornet_code
await db.commit()