246 lines
9.2 KiB
Python
246 lines
9.2 KiB
Python
from datetime import datetime, timezone, timedelta
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, delete
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
import structlog
|
|
|
|
from app.core.config import settings
|
|
from app.models.vehicle import VehicleRegistry, VehicleVersion, PlateVersionCandidate, ApiUsageLog
|
|
from app.integrations.motornet import lookup_plate as motornet_lookup, MotornetUnavailableError
|
|
from app.schemas.vehicle import VehicleLookupResponse, VehicleVersionItem
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
|
|
def _is_cache_valid(registry: VehicleRegistry) -> bool:
|
|
if registry.expires_at is None:
|
|
return True
|
|
now = datetime.now(timezone.utc)
|
|
expires = registry.expires_at
|
|
if expires.tzinfo is None:
|
|
expires = expires.replace(tzinfo=timezone.utc)
|
|
return expires > now
|
|
|
|
|
|
async def _log_usage(
|
|
db: AsyncSession,
|
|
plate: str,
|
|
source: str,
|
|
hit_cache: bool,
|
|
remaining_queries: int | None,
|
|
) -> None:
|
|
log = ApiUsageLog(
|
|
source=source,
|
|
plate=plate,
|
|
hit_cache=hit_cache,
|
|
remaining_queries=remaining_queries,
|
|
called_at=datetime.now(timezone.utc),
|
|
)
|
|
db.add(log)
|
|
|
|
|
|
async def _build_response_from_db(
|
|
registry: VehicleRegistry,
|
|
db: AsyncSession,
|
|
) -> VehicleLookupResponse:
|
|
result = await db.execute(
|
|
select(PlateVersionCandidate, VehicleVersion)
|
|
.join(VehicleVersion, PlateVersionCandidate.motornet_code == VehicleVersion.motornet_code)
|
|
.where(PlateVersionCandidate.plate == registry.plate)
|
|
)
|
|
rows = result.all()
|
|
|
|
versions = []
|
|
for candidate, version in rows:
|
|
versions.append(
|
|
VehicleVersionItem(
|
|
motornet_code=version.motornet_code,
|
|
version_label=version.version_label,
|
|
body_type=version.body_type,
|
|
doors=version.doors,
|
|
wheelbase=version.wheelbase,
|
|
list_price=version.list_price,
|
|
production_start=version.production_start,
|
|
production_end=version.production_end,
|
|
commercial_start=version.commercial_start,
|
|
commercial_end=version.commercial_end,
|
|
brand_name=version.brand_name,
|
|
model_description=version.model_description,
|
|
gamma_description=version.gamma_description,
|
|
series_description=version.series_description,
|
|
is_selected=candidate.is_selected,
|
|
)
|
|
)
|
|
|
|
return VehicleLookupResponse(
|
|
plate=registry.plate,
|
|
vin=registry.vin,
|
|
vehicle_type=registry.vehicle_type,
|
|
registration_date=registry.registration_date,
|
|
homologation_code=registry.homologation_code,
|
|
engine_code=registry.engine_code,
|
|
last_revision_date=registry.last_revision_date,
|
|
source=registry.source,
|
|
selected_motornet_code=registry.selected_motornet_code,
|
|
versions=versions,
|
|
from_cache=True,
|
|
)
|
|
|
|
|
|
async def lookup(plate: str, db: AsyncSession) -> VehicleLookupResponse:
|
|
normalized = plate.upper().replace(" ", "")
|
|
|
|
result = await db.execute(
|
|
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
|
|
)
|
|
registry = result.scalar_one_or_none()
|
|
|
|
if registry and _is_cache_valid(registry):
|
|
logger.info("vehicle_lookup_cache_hit", plate=normalized)
|
|
await _log_usage(db, normalized, registry.source or "cache", True, None)
|
|
await db.commit()
|
|
return await _build_response_from_db(registry, db)
|
|
|
|
logger.info("vehicle_lookup_cache_miss", plate=normalized)
|
|
|
|
try:
|
|
mn_result = await motornet_lookup(normalized)
|
|
source = "motornet"
|
|
|
|
ttl_days = settings.vehicle_cache_ttl_days
|
|
expires_at = None
|
|
if ttl_days > 0:
|
|
expires_at = datetime.now(timezone.utc) + timedelta(days=ttl_days)
|
|
|
|
stmt = pg_insert(VehicleRegistry).values(
|
|
plate=normalized,
|
|
vin=mn_result.vin,
|
|
vehicle_type=mn_result.vehicle_type,
|
|
registration_date=mn_result.registration_date,
|
|
homologation_code=mn_result.homologation_code,
|
|
engine_code=mn_result.engine_code,
|
|
last_revision_date=mn_result.last_revision_date,
|
|
foreign_plate=mn_result.foreign_plate,
|
|
foreign_registration_date=mn_result.foreign_registration_date,
|
|
foreign_country=mn_result.foreign_country,
|
|
is_foreign_registered=mn_result.is_foreign_registered,
|
|
source=source,
|
|
raw_response=mn_result.raw_response,
|
|
fetched_at=datetime.now(timezone.utc),
|
|
expires_at=expires_at,
|
|
selected_motornet_code=None,
|
|
).on_conflict_do_update(
|
|
index_elements=["plate"],
|
|
set_={
|
|
"vin": mn_result.vin,
|
|
"vehicle_type": mn_result.vehicle_type,
|
|
"registration_date": mn_result.registration_date,
|
|
"homologation_code": mn_result.homologation_code,
|
|
"engine_code": mn_result.engine_code,
|
|
"last_revision_date": mn_result.last_revision_date,
|
|
"foreign_plate": mn_result.foreign_plate,
|
|
"foreign_registration_date": mn_result.foreign_registration_date,
|
|
"foreign_country": mn_result.foreign_country,
|
|
"is_foreign_registered": mn_result.is_foreign_registered,
|
|
"source": source,
|
|
"raw_response": mn_result.raw_response,
|
|
"fetched_at": datetime.now(timezone.utc),
|
|
"expires_at": expires_at,
|
|
},
|
|
)
|
|
await db.execute(stmt)
|
|
|
|
for mv in mn_result.versions:
|
|
vstmt = pg_insert(VehicleVersion).values(
|
|
motornet_code=mv.motornet_code,
|
|
brand_acronym=mn_result.brand_acronym,
|
|
brand_name=mn_result.brand_name,
|
|
model_code=mn_result.model_code,
|
|
model_description=mn_result.model_description,
|
|
gamma_code=mn_result.gamma_code,
|
|
gamma_description=mn_result.gamma_description,
|
|
series_code=mn_result.series_code,
|
|
series_description=mn_result.series_description,
|
|
historical_group_code=mn_result.historical_group_code,
|
|
historical_group_desc=mn_result.historical_group_desc,
|
|
cod_desc_model_code=mn_result.cod_desc_model_code,
|
|
cod_desc_model_desc=mn_result.cod_desc_model_desc,
|
|
version_label=mv.version_label,
|
|
body_type=mv.body_type,
|
|
doors=mv.doors,
|
|
wheelbase=mv.wheelbase,
|
|
list_price=mv.list_price,
|
|
production_start=mv.production_start,
|
|
production_end=mv.production_end,
|
|
commercial_start=mv.commercial_start,
|
|
commercial_end=mv.commercial_end,
|
|
first_seen_at=datetime.now(timezone.utc),
|
|
last_seen_at=datetime.now(timezone.utc),
|
|
seen_count=1,
|
|
source=source,
|
|
).on_conflict_do_update(
|
|
index_elements=["motornet_code"],
|
|
set_={
|
|
"last_seen_at": datetime.now(timezone.utc),
|
|
"seen_count": VehicleVersion.seen_count + 1,
|
|
"brand_name": mn_result.brand_name,
|
|
"model_description": mn_result.model_description,
|
|
"gamma_description": mn_result.gamma_description,
|
|
"series_description": mn_result.series_description,
|
|
},
|
|
)
|
|
await db.execute(vstmt)
|
|
|
|
cstmt = pg_insert(PlateVersionCandidate).values(
|
|
plate=normalized,
|
|
motornet_code=mv.motornet_code,
|
|
is_selected=False,
|
|
).on_conflict_do_nothing()
|
|
await db.execute(cstmt)
|
|
|
|
await _log_usage(db, normalized, source, False, mn_result.remaining_queries)
|
|
await db.commit()
|
|
|
|
refreshed = await db.execute(
|
|
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
|
|
)
|
|
registry = refreshed.scalar_one()
|
|
resp = await _build_response_from_db(registry, db)
|
|
resp.from_cache = False
|
|
return resp
|
|
|
|
except MotornetUnavailableError:
|
|
raise
|
|
|
|
|
|
async def select_version(plate: str, motornet_code: str, db: AsyncSession) -> None:
|
|
normalized = plate.upper().replace(" ", "")
|
|
|
|
await db.execute(
|
|
pg_insert(PlateVersionCandidate.__table__)
|
|
.values(plate=normalized, motornet_code=motornet_code, is_selected=True)
|
|
.on_conflict_do_update(
|
|
index_elements=["plate", "motornet_code"],
|
|
set_={"is_selected": True},
|
|
)
|
|
)
|
|
|
|
result = await db.execute(
|
|
select(PlateVersionCandidate).where(
|
|
PlateVersionCandidate.plate == normalized,
|
|
PlateVersionCandidate.motornet_code != motornet_code,
|
|
)
|
|
)
|
|
others = result.scalars().all()
|
|
for other in others:
|
|
other.is_selected = False
|
|
|
|
result = await db.execute(
|
|
select(VehicleRegistry).where(VehicleRegistry.plate == normalized)
|
|
)
|
|
registry = result.scalar_one_or_none()
|
|
if registry:
|
|
registry.selected_motornet_code = motornet_code
|
|
|
|
await db.commit()
|