mirror of
https://github.com/malmeloo/FindMy.py.git
synced 2026-04-18 09:54:04 +02:00
294 lines
9.1 KiB
Python
294 lines
9.1 KiB
Python
"""Module providing functionality to look up location reports."""
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import logging
|
|
import struct
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import TYPE_CHECKING, Sequence, overload
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import ec
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from typing_extensions import override
|
|
|
|
from findmy.accessory import RollingKeyPairSource
|
|
from findmy.keys import KeyPair
|
|
|
|
if TYPE_CHECKING:
|
|
from .account import AsyncAppleAccount
|
|
|
|
logging.getLogger(__name__)
|
|
|
|
|
|
def _decrypt_payload(payload: bytes, key: KeyPair) -> bytes:
|
|
eph_key = ec.EllipticCurvePublicKey.from_encoded_point(
|
|
ec.SECP224R1(),
|
|
payload[5:62],
|
|
)
|
|
shared_key = key.dh_exchange(eph_key)
|
|
symmetric_key = hashlib.sha256(
|
|
shared_key + b"\x00\x00\x00\x01" + payload[5:62],
|
|
).digest()
|
|
|
|
decryption_key = symmetric_key[:16]
|
|
iv = symmetric_key[16:]
|
|
enc_data = payload[62:72]
|
|
tag = payload[72:]
|
|
|
|
decryptor = Cipher(
|
|
algorithms.AES(decryption_key),
|
|
modes.GCM(iv, tag),
|
|
default_backend(),
|
|
).decryptor()
|
|
return decryptor.update(enc_data) + decryptor.finalize()
|
|
|
|
|
|
class LocationReport:
|
|
"""Location report corresponding to a certain `KeyPair`."""
|
|
|
|
def __init__( # noqa: PLR0913
|
|
self,
|
|
key: KeyPair,
|
|
publish_date: datetime,
|
|
timestamp: datetime,
|
|
description: str,
|
|
lat: float,
|
|
lng: float,
|
|
confidence: int,
|
|
status: int,
|
|
) -> None:
|
|
"""Initialize a `KeyReport`. You should probably use `KeyReport.from_payload` instead."""
|
|
self._key = key
|
|
self._publish_date = publish_date
|
|
self._timestamp = timestamp
|
|
self._description = description
|
|
|
|
self._lat = lat
|
|
self._lng = lng
|
|
self._confidence = confidence
|
|
|
|
self._status = status
|
|
|
|
@property
|
|
def key(self) -> KeyPair:
|
|
"""The `KeyPair` corresponding to this location report."""
|
|
return self._key
|
|
|
|
@property
|
|
def published_at(self) -> datetime:
|
|
"""The `datetime` when this report was published by a device."""
|
|
return self._publish_date
|
|
|
|
@property
|
|
def timestamp(self) -> datetime:
|
|
"""The `datetime` when this report was recorded by a device."""
|
|
return self._timestamp
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
"""Description of the location report as published by Apple."""
|
|
return self._description
|
|
|
|
@property
|
|
def latitude(self) -> float:
|
|
"""Latitude of the location of this report."""
|
|
return self._lat
|
|
|
|
@property
|
|
def longitude(self) -> float:
|
|
"""Longitude of the location of this report."""
|
|
return self._lng
|
|
|
|
@property
|
|
def confidence(self) -> int:
|
|
"""Confidence of the location of this report."""
|
|
return self._confidence
|
|
|
|
@property
|
|
def status(self) -> int:
|
|
"""Status byte of the accessory as recorded by a device, as an integer."""
|
|
return self._status
|
|
|
|
@classmethod
|
|
def from_payload(
|
|
cls,
|
|
key: KeyPair,
|
|
publish_date: datetime,
|
|
description: str,
|
|
payload: bytes,
|
|
) -> LocationReport:
|
|
"""
|
|
Create a `KeyReport` from fields and a payload as reported by Apple.
|
|
|
|
Requires a `KeyPair` to decrypt the report's payload.
|
|
"""
|
|
timestamp_int = int.from_bytes(payload[0:4], "big") + (60 * 60 * 24 * 11323)
|
|
timestamp = datetime.fromtimestamp(timestamp_int, tz=timezone.utc).astimezone()
|
|
|
|
data = _decrypt_payload(payload, key)
|
|
latitude = struct.unpack(">i", data[0:4])[0] / 10000000
|
|
longitude = struct.unpack(">i", data[4:8])[0] / 10000000
|
|
confidence = int.from_bytes(data[8:9], "big")
|
|
status = int.from_bytes(data[9:10], "big")
|
|
|
|
return cls(
|
|
key,
|
|
publish_date,
|
|
timestamp,
|
|
description,
|
|
latitude,
|
|
longitude,
|
|
confidence,
|
|
status,
|
|
)
|
|
|
|
def __lt__(self, other: LocationReport) -> bool:
|
|
"""
|
|
Compare against another `KeyReport`.
|
|
|
|
A `KeyReport` is said to be "less than" another `KeyReport` iff its recorded
|
|
timestamp is strictly less than the other report.
|
|
"""
|
|
if isinstance(other, LocationReport):
|
|
return self.timestamp < other.timestamp
|
|
return NotImplemented
|
|
|
|
@override
|
|
def __repr__(self) -> str:
|
|
"""Human-readable string representation of the location report."""
|
|
return (
|
|
f"KeyReport(key={self._key.hashed_adv_key_b64}, timestamp={self._timestamp},"
|
|
f" lat={self._lat}, lng={self._lng})"
|
|
)
|
|
|
|
|
|
class LocationReportsFetcher:
|
|
"""Fetcher class to retrieve location reports."""
|
|
|
|
def __init__(self, account: AsyncAppleAccount) -> None:
|
|
"""
|
|
Initialize the fetcher.
|
|
|
|
:param account: Apple account.
|
|
"""
|
|
self._account: AsyncAppleAccount = account
|
|
|
|
@overload
|
|
async def fetch_reports(
|
|
self,
|
|
date_from: datetime,
|
|
date_to: datetime,
|
|
device: KeyPair,
|
|
) -> list[LocationReport]:
|
|
...
|
|
|
|
@overload
|
|
async def fetch_reports(
|
|
self,
|
|
date_from: datetime,
|
|
date_to: datetime,
|
|
device: Sequence[KeyPair],
|
|
) -> dict[KeyPair, list[LocationReport]]:
|
|
...
|
|
|
|
@overload
|
|
async def fetch_reports(
|
|
self,
|
|
date_from: datetime,
|
|
date_to: datetime,
|
|
device: RollingKeyPairSource,
|
|
) -> list[LocationReport]:
|
|
...
|
|
|
|
async def fetch_reports(
|
|
self,
|
|
date_from: datetime,
|
|
date_to: datetime,
|
|
device: KeyPair | Sequence[KeyPair] | RollingKeyPairSource,
|
|
) -> list[LocationReport] | dict[KeyPair, list[LocationReport]]:
|
|
"""
|
|
Fetch location reports for a certain device.
|
|
|
|
When ``device`` is a single :class:`.KeyPair`, this method will return
|
|
a list of location reports corresponding to that pair.
|
|
When ``device`` is a sequence of :class:`.KeyPair`s, it will return a dictionary
|
|
with the :class:`.KeyPair` as key, and a list of location reports as value.
|
|
"""
|
|
# single KeyPair
|
|
if isinstance(device, KeyPair):
|
|
return await self._fetch_reports(date_from, date_to, [device])
|
|
|
|
# KeyPair generator
|
|
# add 12h margin to the generator
|
|
if isinstance(device, RollingKeyPairSource):
|
|
keys = list(
|
|
device.keys_between(
|
|
date_from - timedelta(hours=12),
|
|
date_to + timedelta(hours=12),
|
|
),
|
|
)
|
|
else:
|
|
keys = device
|
|
|
|
# sequence of KeyPairs (fetch 256 max at a time)
|
|
reports: list[LocationReport] = []
|
|
for key_offset in range(0, len(keys), 256):
|
|
chunk = keys[key_offset : key_offset + 256]
|
|
reports.extend(await self._fetch_reports(date_from, date_to, chunk))
|
|
|
|
if isinstance(device, RollingKeyPairSource):
|
|
return reports
|
|
|
|
res: dict[KeyPair, list[LocationReport]] = {key: [] for key in keys}
|
|
for report in reports:
|
|
res[report.key].append(report)
|
|
return res
|
|
|
|
async def _fetch_reports(
|
|
self,
|
|
date_from: datetime,
|
|
date_to: datetime,
|
|
keys: Sequence[KeyPair],
|
|
) -> list[LocationReport]:
|
|
logging.debug("Fetching reports for %s keys", len(keys))
|
|
|
|
start_date = int(date_from.timestamp() * 1000)
|
|
end_date = int(date_to.timestamp() * 1000)
|
|
ids = [key.hashed_adv_key_b64 for key in keys]
|
|
data = await self._account.fetch_raw_reports(start_date, end_date, ids)
|
|
|
|
id_to_key: dict[str, KeyPair] = {key.hashed_adv_key_b64: key for key in keys}
|
|
reports: list[LocationReport] = []
|
|
for report in data.get("results", []):
|
|
key = id_to_key[report["id"]]
|
|
date_published = datetime.fromtimestamp(
|
|
report.get("datePublished", 0) / 1000,
|
|
tz=timezone.utc,
|
|
).astimezone()
|
|
description = report.get("description", "")
|
|
payload = base64.b64decode(report["payload"])
|
|
|
|
# Fix decryption for new report format via MacOS 14+ and daugher OSes
|
|
# See: https://github.com/MatthewKuKanich/FindMyFlipper/issues/61#issuecomment-2065003410
|
|
if len(payload) == 89:
|
|
payload = payload[0:4] + payload[5:]
|
|
|
|
try:
|
|
loc_report = LocationReport.from_payload(key, date_published, description, payload)
|
|
except ValueError as e:
|
|
logging.warning(
|
|
"Location report was not decodable. Please report this full message"
|
|
" at https://github.com/malmeloo/FindMy.py/issues/. This report does not"
|
|
" reveal your location. Payload length: %d Payload: %s, Original error: %s",
|
|
len(payload),
|
|
payload.hex(),
|
|
e,
|
|
)
|
|
continue
|
|
|
|
reports.append(loc_report)
|
|
|
|
return reports
|