from datetime import datetime, timezone, timedelta from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, delete from sqlalchemy.dialects.postgresql import insert as pg_insert import structlog from app.core.config import settings from app.models.vehicle import VehicleRegistry, VehicleVersion, PlateVersionCandidate, ApiUsageLog from app.integrations.motornet import lookup_plate as motornet_lookup, MotornetUnavailableError from app.schemas.vehicle import VehicleLookupResponse, VehicleVersionItem logger = structlog.get_logger() def _is_cache_valid(registry: VehicleRegistry) -> bool: if registry.expires_at is None: return True now = datetime.now(timezone.utc) expires = registry.expires_at if expires.tzinfo is None: expires = expires.replace(tzinfo=timezone.utc) return expires > now async def _log_usage( db: AsyncSession, plate: str, source: str, hit_cache: bool, remaining_queries: int | None, ) -> None: log = ApiUsageLog( source=source, plate=plate, hit_cache=hit_cache, remaining_queries=remaining_queries, called_at=datetime.now(timezone.utc), ) db.add(log) async def _build_response_from_db( registry: VehicleRegistry, db: AsyncSession, ) -> VehicleLookupResponse: result = await db.execute( select(PlateVersionCandidate, VehicleVersion) .join(VehicleVersion, PlateVersionCandidate.motornet_code == VehicleVersion.motornet_code) .where(PlateVersionCandidate.plate == registry.plate) ) rows = result.all() versions = [] for candidate, version in rows: versions.append( VehicleVersionItem( motornet_code=version.motornet_code, version_label=version.version_label, body_type=version.body_type, doors=version.doors, wheelbase=version.wheelbase, list_price=version.list_price, production_start=version.production_start, production_end=version.production_end, commercial_start=version.commercial_start, commercial_end=version.commercial_end, brand_name=version.brand_name, model_description=version.model_description, gamma_description=version.gamma_description, series_description=version.series_description, is_selected=candidate.is_selected, ) ) return VehicleLookupResponse( plate=registry.plate, vin=registry.vin, vehicle_type=registry.vehicle_type, registration_date=registry.registration_date, homologation_code=registry.homologation_code, engine_code=registry.engine_code, last_revision_date=registry.last_revision_date, source=registry.source, selected_motornet_code=registry.selected_motornet_code, versions=versions, from_cache=True, ) async def lookup(plate: str, db: AsyncSession) -> VehicleLookupResponse: normalized = plate.upper().replace(" ", "") result = await db.execute( select(VehicleRegistry).where(VehicleRegistry.plate == normalized) ) registry = result.scalar_one_or_none() if registry and _is_cache_valid(registry): logger.info("vehicle_lookup_cache_hit", plate=normalized) await _log_usage(db, normalized, registry.source or "cache", True, None) await db.commit() return await _build_response_from_db(registry, db) logger.info("vehicle_lookup_cache_miss", plate=normalized) try: mn_result = await motornet_lookup(normalized) source = "motornet" ttl_days = settings.vehicle_cache_ttl_days expires_at = None if ttl_days > 0: expires_at = datetime.now(timezone.utc) + timedelta(days=ttl_days) stmt = pg_insert(VehicleRegistry).values( plate=normalized, vin=mn_result.vin, vehicle_type=mn_result.vehicle_type, registration_date=mn_result.registration_date, homologation_code=mn_result.homologation_code, engine_code=mn_result.engine_code, last_revision_date=mn_result.last_revision_date, foreign_plate=mn_result.foreign_plate, foreign_registration_date=mn_result.foreign_registration_date, foreign_country=mn_result.foreign_country, is_foreign_registered=mn_result.is_foreign_registered, source=source, raw_response=mn_result.raw_response, fetched_at=datetime.now(timezone.utc), expires_at=expires_at, selected_motornet_code=None, ).on_conflict_do_update( index_elements=["plate"], set_={ "vin": mn_result.vin, "vehicle_type": mn_result.vehicle_type, "registration_date": mn_result.registration_date, "homologation_code": mn_result.homologation_code, "engine_code": mn_result.engine_code, "last_revision_date": mn_result.last_revision_date, "foreign_plate": mn_result.foreign_plate, "foreign_registration_date": mn_result.foreign_registration_date, "foreign_country": mn_result.foreign_country, "is_foreign_registered": mn_result.is_foreign_registered, "source": source, "raw_response": mn_result.raw_response, "fetched_at": datetime.now(timezone.utc), "expires_at": expires_at, }, ) await db.execute(stmt) for mv in mn_result.versions: vstmt = pg_insert(VehicleVersion).values( motornet_code=mv.motornet_code, brand_acronym=mn_result.brand_acronym, brand_name=mn_result.brand_name, model_code=mn_result.model_code, model_description=mn_result.model_description, gamma_code=mn_result.gamma_code, gamma_description=mn_result.gamma_description, series_code=mn_result.series_code, series_description=mn_result.series_description, historical_group_code=mn_result.historical_group_code, historical_group_desc=mn_result.historical_group_desc, cod_desc_model_code=mn_result.cod_desc_model_code, cod_desc_model_desc=mn_result.cod_desc_model_desc, version_label=mv.version_label, body_type=mv.body_type, doors=mv.doors, wheelbase=mv.wheelbase, list_price=mv.list_price, production_start=mv.production_start, production_end=mv.production_end, commercial_start=mv.commercial_start, commercial_end=mv.commercial_end, first_seen_at=datetime.now(timezone.utc), last_seen_at=datetime.now(timezone.utc), seen_count=1, source=source, ).on_conflict_do_update( index_elements=["motornet_code"], set_={ "last_seen_at": datetime.now(timezone.utc), "seen_count": VehicleVersion.seen_count + 1, "brand_name": mn_result.brand_name, "model_description": mn_result.model_description, "gamma_description": mn_result.gamma_description, "series_description": mn_result.series_description, }, ) await db.execute(vstmt) cstmt = pg_insert(PlateVersionCandidate).values( plate=normalized, motornet_code=mv.motornet_code, is_selected=False, ).on_conflict_do_nothing() await db.execute(cstmt) await _log_usage(db, normalized, source, False, mn_result.remaining_queries) await db.commit() refreshed = await db.execute( select(VehicleRegistry).where(VehicleRegistry.plate == normalized) ) registry = refreshed.scalar_one() resp = await _build_response_from_db(registry, db) resp.from_cache = False return resp except MotornetUnavailableError: raise async def select_version(plate: str, motornet_code: str, db: AsyncSession) -> None: normalized = plate.upper().replace(" ", "") await db.execute( pg_insert(PlateVersionCandidate.__table__) .values(plate=normalized, motornet_code=motornet_code, is_selected=True) .on_conflict_do_update( index_elements=["plate", "motornet_code"], set_={"is_selected": True}, ) ) result = await db.execute( select(PlateVersionCandidate).where( PlateVersionCandidate.plate == normalized, PlateVersionCandidate.motornet_code != motornet_code, ) ) others = result.scalars().all() for other in others: other.is_selected = False result = await db.execute( select(VehicleRegistry).where(VehicleRegistry.plate == normalized) ) registry = result.scalar_one_or_none() if registry: registry.selected_motornet_code = motornet_code await db.commit()