From 5e649156218b9174b52dd3435b2e07af63d116bb Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Tue, 8 Jul 2025 13:19:14 +0200 Subject: [PATCH 1/2] feat!(reports): Migrate to new acsn API Apple no longer returns the `published_at` and `description` fields on location reports, so these have been deprecated. --- findmy/reports/account.py | 23 +++++++++++++++++---- findmy/reports/reports.py | 42 +++++++++++---------------------------- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 64381e1..3c62f80 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -343,7 +343,7 @@ class AsyncAppleAccount(BaseAppleAccount): _ENDPOINT_2FA_TD_SUBMIT = "https://gsa.apple.com/grandslam/GsService2/validate" # reports endpoints - _ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/acsnservice/fetch" + _ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/findmyservice/v2/fetch" def __init__( self, @@ -599,7 +599,22 @@ class AsyncAppleAccount(BaseAppleAccount): self._login_state_data["dsid"], self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], ) - data = {"search": [{"startDate": start, "endDate": end, "ids": ids}]} + data = { + "clientContext": { + "clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent", + "policy": "foregroundClient", + }, + "fetch": [ + { + "startDateSecondary": start, + "ownedDeviceIds": [], + "keyType": 1, + "startDate": start, + "endDate": end, + "primaryIds": ids, + }, + ], + } async def _do_request() -> HttpResponse: return await self._http.post( @@ -629,11 +644,11 @@ class AsyncAppleAccount(BaseAppleAccount): resp = r.json() except json.JSONDecodeError: resp = {} - if not r.ok or resp.get("statusCode") != "200": + if not r.ok or resp.get("acsnLocations", {}).get("statusCode") != "200": msg = f"Failed to fetch reports: {resp.get('statusCode')}" raise UnhandledProtocolError(msg) - return resp + return resp["acsnLocations"] @overload async def fetch_reports( diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index 8502d45..2db3d90 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -33,14 +33,10 @@ class LocationReport(HasHashedPublicKey): self, payload: bytes, hashed_adv_key: bytes, - published_at: datetime, - description: str = "", ) -> None: """Initialize a `KeyReport`. You should probably use `KeyReport.from_payload` instead.""" self._payload: bytes = payload self._hashed_adv_key: bytes = hashed_adv_key - self._published_at: datetime = published_at - self._description: str = description self._decrypted_data: tuple[KeyPair, bytes] | None = None @@ -109,16 +105,6 @@ class LocationReport(HasHashedPublicKey): self._decrypted_data = (key, decrypted_payload) - @property - def published_at(self) -> datetime: - """The `datetime` when this report was published by a device.""" - return self._published_at - - @property - def description(self) -> str: - """Description of the location report as published by Apple.""" - return self._description - @property def timestamp(self) -> datetime: """The `datetime` when this report was recorded by a device.""" @@ -357,25 +343,21 @@ class LocationReportsFetcher: id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list) - for report in data.get("results", []): - payload = base64.b64decode(report["payload"]) - hashed_adv_key = base64.b64decode(report["id"]) - date_published = datetime.fromtimestamp( - report.get("datePublished", 0) / 1000, - tz=timezone.utc, - ).astimezone() - description = report.get("description", "") + for key_reports in data.get("locationPayload", []): + hashed_adv_key_bytes = base64.b64decode(key_reports["id"]) + key = id_to_key[hashed_adv_key_bytes] - loc_report = LocationReport(payload, hashed_adv_key, date_published, description) + for report in key_reports.get("locationInfo", []): + payload = base64.b64decode(report) + loc_report = LocationReport(payload, hashed_adv_key_bytes) - # pre-decrypt if possible - key = id_to_key[hashed_adv_key] - if isinstance(key, KeyPair): - loc_report.decrypt(key) + if loc_report.timestamp < date_from or loc_report.timestamp > date_to: + continue - if loc_report.timestamp < date_from or loc_report.timestamp > date_to: - continue + # pre-decrypt if possible + if isinstance(key, KeyPair): + loc_report.decrypt(key) - reports[key].append(loc_report) + reports[key].append(loc_report) return reports From e74197372f484ab24d83c1c4bcac26cee61672e6 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 10 Jul 2025 16:19:33 +0200 Subject: [PATCH 2/2] fix(reports): split key queries into devices Apple only returns up to 20 location reports per device. --- findmy/reports/account.py | 21 +++++++++---- findmy/reports/reports.py | 64 +++++++++++++++++++++------------------ 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 3c62f80..05ee744 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -593,12 +593,19 @@ class AsyncAppleAccount(BaseAppleAccount): return await self._login_mobileme() @require_login_state(LoginState.LOGGED_IN) - async def fetch_raw_reports(self, start: int, end: int, ids: list[str]) -> dict[str, Any]: + async def fetch_raw_reports( + self, + start: datetime, + end: datetime, + devices: list[list[str]], + ) -> dict[str, Any]: """Make a request for location reports, returning raw data.""" auth = ( self._login_state_data["dsid"], self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], ) + start_ts = int(start.timestamp() * 1000) + end_ts = int(end.timestamp() * 1000) data = { "clientContext": { "clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent", @@ -606,13 +613,15 @@ class AsyncAppleAccount(BaseAppleAccount): }, "fetch": [ { - "startDateSecondary": start, "ownedDeviceIds": [], "keyType": 1, - "startDate": start, - "endDate": end, - "primaryIds": ids, - }, + "startDate": start_ts, + "startDateSecondary": start_ts, + "endDate": end_ts, + # passing all keys as primary seems to work fine + "primaryIds": device_keys, + } + for device_keys in devices ], } diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index 2db3d90..df532fc 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -247,7 +247,7 @@ class LocationReportsFetcher: device: Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ... - async def fetch_reports( + async def fetch_reports( # noqa: C901 self, date_from: datetime, date_to: datetime, @@ -269,46 +269,48 @@ class LocationReportsFetcher: as key, and a list of location reports as value. """ key_devs: dict[HasHashedPublicKey, HasHashedPublicKey | RollingKeyPairSource] = {} + key_batches: list[list[HasHashedPublicKey]] = [] if isinstance(device, HasHashedPublicKey): # single key key_devs = {device: device} + key_batches.append([device]) elif isinstance(device, RollingKeyPairSource): # key generator # add 12h margin to the generator - key_devs = { # noqa: C420 - key: device - for key in device.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - } - + keys = device.keys_between( + date_from - timedelta(hours=12), + date_to + timedelta(hours=12), + ) + key_devs = dict.fromkeys(keys, device) + key_batches.append(list(keys)) elif isinstance(device, list) and all( isinstance(x, HasHashedPublicKey | RollingKeyPairSource) for x in device ): # multiple key generators # add 12h margin to each generator device = cast("list[HasHashedPublicKey | RollingKeyPairSource]", device) - key_devs = {key: key for key in device if isinstance(key, HasHashedPublicKey)} | { - key: dev - for dev in device - if isinstance(dev, RollingKeyPairSource) - for key in dev.keys_between( - date_from - timedelta(hours=12), - date_to + timedelta(hours=12), - ) - } + for dev in device: + if isinstance(dev, HasHashedPublicKey): + key_devs[dev] = dev + key_batches.append([dev]) + elif isinstance(dev, RollingKeyPairSource): + keys = dev.keys_between( + date_from - timedelta(hours=12), + date_to + timedelta(hours=12), + ) + for key in keys: + key_devs[key] = dev + key_batches.append(list(keys)) else: msg = "Unknown device type: %s" raise ValueError(msg, type(device)) # sequence of keys (fetch 256 max at a time) - key_reports: dict[HasHashedPublicKey, list[LocationReport]] = {} - keys = list(key_devs.keys()) - for key_offset in range(0, len(keys), 256): - chunk_keys = keys[key_offset : key_offset + 256] - chunk_reports = await self._fetch_reports(date_from, date_to, chunk_keys) - key_reports |= chunk_reports + key_reports: dict[HasHashedPublicKey, list[LocationReport]] = await self._fetch_reports( + date_from, + date_to, + key_batches, + ) # combine (key -> list[report]) and (key -> device) into (device -> list[report]) device_reports = defaultdict(list) @@ -328,20 +330,22 @@ class LocationReportsFetcher: self, date_from: datetime, date_to: datetime, - keys: Sequence[HasHashedPublicKey], + device_keys: Sequence[Sequence[HasHashedPublicKey]], ) -> dict[HasHashedPublicKey, list[LocationReport]]: - logger.debug("Fetching reports for %s keys", len(keys)) + logger.debug("Fetching reports for %s device(s)", len(device_keys)) # lock requested time range to the past 7 days, +- 12 hours, then filter the response. # this is due to an Apple backend bug where the time range is not respected. # More info: https://github.com/biemster/FindMy/issues/7 now = datetime.now().astimezone() - start_date = int((now - timedelta(days=7, hours=12)).timestamp() * 1000) - end_date = int((now + timedelta(hours=12)).timestamp() * 1000) - ids = [key.hashed_adv_key_b64 for key in keys] + start_date = now - timedelta(days=7, hours=12) + end_date = now + timedelta(hours=12) + ids = [[key.hashed_adv_key_b64 for key in keys] for keys in device_keys] data = await self._account.fetch_raw_reports(start_date, end_date, ids) - id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} + id_to_key: dict[bytes, HasHashedPublicKey] = { + key.hashed_adv_key_bytes: key for keys in device_keys for key in keys + } reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list) for key_reports in data.get("locationPayload", []): hashed_adv_key_bytes = base64.b64decode(key_reports["id"])