mirror of
https://github.com/malmeloo/FindMy.py.git
synced 2026-04-17 21:53:57 +02:00
Merge pull request #144 from malmeloo/feat/migrate-api
feat!(reports): Migrate to new acsn API
This commit is contained in:
@@ -343,7 +343,7 @@ class AsyncAppleAccount(BaseAppleAccount):
|
||||
_ENDPOINT_2FA_TD_SUBMIT = "https://gsa.apple.com/grandslam/GsService2/validate"
|
||||
|
||||
# reports endpoints
|
||||
_ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/acsnservice/fetch"
|
||||
_ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/findmyservice/v2/fetch"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -593,13 +593,37 @@ class AsyncAppleAccount(BaseAppleAccount):
|
||||
return await self._login_mobileme()
|
||||
|
||||
@require_login_state(LoginState.LOGGED_IN)
|
||||
async def fetch_raw_reports(self, start: int, end: int, ids: list[str]) -> dict[str, Any]:
|
||||
async def fetch_raw_reports(
|
||||
self,
|
||||
start: datetime,
|
||||
end: datetime,
|
||||
devices: list[list[str]],
|
||||
) -> dict[str, Any]:
|
||||
"""Make a request for location reports, returning raw data."""
|
||||
auth = (
|
||||
self._login_state_data["dsid"],
|
||||
self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"],
|
||||
)
|
||||
data = {"search": [{"startDate": start, "endDate": end, "ids": ids}]}
|
||||
start_ts = int(start.timestamp() * 1000)
|
||||
end_ts = int(end.timestamp() * 1000)
|
||||
data = {
|
||||
"clientContext": {
|
||||
"clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent",
|
||||
"policy": "foregroundClient",
|
||||
},
|
||||
"fetch": [
|
||||
{
|
||||
"ownedDeviceIds": [],
|
||||
"keyType": 1,
|
||||
"startDate": start_ts,
|
||||
"startDateSecondary": start_ts,
|
||||
"endDate": end_ts,
|
||||
# passing all keys as primary seems to work fine
|
||||
"primaryIds": device_keys,
|
||||
}
|
||||
for device_keys in devices
|
||||
],
|
||||
}
|
||||
|
||||
async def _do_request() -> HttpResponse:
|
||||
return await self._http.post(
|
||||
@@ -629,11 +653,11 @@ class AsyncAppleAccount(BaseAppleAccount):
|
||||
resp = r.json()
|
||||
except json.JSONDecodeError:
|
||||
resp = {}
|
||||
if not r.ok or resp.get("statusCode") != "200":
|
||||
if not r.ok or resp.get("acsnLocations", {}).get("statusCode") != "200":
|
||||
msg = f"Failed to fetch reports: {resp.get('statusCode')}"
|
||||
raise UnhandledProtocolError(msg)
|
||||
|
||||
return resp
|
||||
return resp["acsnLocations"]
|
||||
|
||||
@overload
|
||||
async def fetch_reports(
|
||||
|
||||
@@ -33,14 +33,10 @@ class LocationReport(HasHashedPublicKey):
|
||||
self,
|
||||
payload: bytes,
|
||||
hashed_adv_key: bytes,
|
||||
published_at: datetime,
|
||||
description: str = "",
|
||||
) -> None:
|
||||
"""Initialize a `KeyReport`. You should probably use `KeyReport.from_payload` instead."""
|
||||
self._payload: bytes = payload
|
||||
self._hashed_adv_key: bytes = hashed_adv_key
|
||||
self._published_at: datetime = published_at
|
||||
self._description: str = description
|
||||
|
||||
self._decrypted_data: tuple[KeyPair, bytes] | None = None
|
||||
|
||||
@@ -109,16 +105,6 @@ class LocationReport(HasHashedPublicKey):
|
||||
|
||||
self._decrypted_data = (key, decrypted_payload)
|
||||
|
||||
@property
|
||||
def published_at(self) -> datetime:
|
||||
"""The `datetime` when this report was published by a device."""
|
||||
return self._published_at
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
"""Description of the location report as published by Apple."""
|
||||
return self._description
|
||||
|
||||
@property
|
||||
def timestamp(self) -> datetime:
|
||||
"""The `datetime` when this report was recorded by a device."""
|
||||
@@ -261,7 +247,7 @@ class LocationReportsFetcher:
|
||||
device: Sequence[HasHashedPublicKey | RollingKeyPairSource],
|
||||
) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
|
||||
|
||||
async def fetch_reports(
|
||||
async def fetch_reports( # noqa: C901
|
||||
self,
|
||||
date_from: datetime,
|
||||
date_to: datetime,
|
||||
@@ -283,46 +269,48 @@ class LocationReportsFetcher:
|
||||
as key, and a list of location reports as value.
|
||||
"""
|
||||
key_devs: dict[HasHashedPublicKey, HasHashedPublicKey | RollingKeyPairSource] = {}
|
||||
key_batches: list[list[HasHashedPublicKey]] = []
|
||||
if isinstance(device, HasHashedPublicKey):
|
||||
# single key
|
||||
key_devs = {device: device}
|
||||
key_batches.append([device])
|
||||
elif isinstance(device, RollingKeyPairSource):
|
||||
# key generator
|
||||
# add 12h margin to the generator
|
||||
key_devs = { # noqa: C420
|
||||
key: device
|
||||
for key in device.keys_between(
|
||||
date_from - timedelta(hours=12),
|
||||
date_to + timedelta(hours=12),
|
||||
)
|
||||
}
|
||||
|
||||
keys = device.keys_between(
|
||||
date_from - timedelta(hours=12),
|
||||
date_to + timedelta(hours=12),
|
||||
)
|
||||
key_devs = dict.fromkeys(keys, device)
|
||||
key_batches.append(list(keys))
|
||||
elif isinstance(device, list) and all(
|
||||
isinstance(x, HasHashedPublicKey | RollingKeyPairSource) for x in device
|
||||
):
|
||||
# multiple key generators
|
||||
# add 12h margin to each generator
|
||||
device = cast("list[HasHashedPublicKey | RollingKeyPairSource]", device)
|
||||
key_devs = {key: key for key in device if isinstance(key, HasHashedPublicKey)} | {
|
||||
key: dev
|
||||
for dev in device
|
||||
if isinstance(dev, RollingKeyPairSource)
|
||||
for key in dev.keys_between(
|
||||
date_from - timedelta(hours=12),
|
||||
date_to + timedelta(hours=12),
|
||||
)
|
||||
}
|
||||
for dev in device:
|
||||
if isinstance(dev, HasHashedPublicKey):
|
||||
key_devs[dev] = dev
|
||||
key_batches.append([dev])
|
||||
elif isinstance(dev, RollingKeyPairSource):
|
||||
keys = dev.keys_between(
|
||||
date_from - timedelta(hours=12),
|
||||
date_to + timedelta(hours=12),
|
||||
)
|
||||
for key in keys:
|
||||
key_devs[key] = dev
|
||||
key_batches.append(list(keys))
|
||||
else:
|
||||
msg = "Unknown device type: %s"
|
||||
raise ValueError(msg, type(device))
|
||||
|
||||
# sequence of keys (fetch 256 max at a time)
|
||||
key_reports: dict[HasHashedPublicKey, list[LocationReport]] = {}
|
||||
keys = list(key_devs.keys())
|
||||
for key_offset in range(0, len(keys), 256):
|
||||
chunk_keys = keys[key_offset : key_offset + 256]
|
||||
chunk_reports = await self._fetch_reports(date_from, date_to, chunk_keys)
|
||||
key_reports |= chunk_reports
|
||||
key_reports: dict[HasHashedPublicKey, list[LocationReport]] = await self._fetch_reports(
|
||||
date_from,
|
||||
date_to,
|
||||
key_batches,
|
||||
)
|
||||
|
||||
# combine (key -> list[report]) and (key -> device) into (device -> list[report])
|
||||
device_reports = defaultdict(list)
|
||||
@@ -342,40 +330,38 @@ class LocationReportsFetcher:
|
||||
self,
|
||||
date_from: datetime,
|
||||
date_to: datetime,
|
||||
keys: Sequence[HasHashedPublicKey],
|
||||
device_keys: Sequence[Sequence[HasHashedPublicKey]],
|
||||
) -> dict[HasHashedPublicKey, list[LocationReport]]:
|
||||
logger.debug("Fetching reports for %s keys", len(keys))
|
||||
logger.debug("Fetching reports for %s device(s)", len(device_keys))
|
||||
|
||||
# lock requested time range to the past 7 days, +- 12 hours, then filter the response.
|
||||
# this is due to an Apple backend bug where the time range is not respected.
|
||||
# More info: https://github.com/biemster/FindMy/issues/7
|
||||
now = datetime.now().astimezone()
|
||||
start_date = int((now - timedelta(days=7, hours=12)).timestamp() * 1000)
|
||||
end_date = int((now + timedelta(hours=12)).timestamp() * 1000)
|
||||
ids = [key.hashed_adv_key_b64 for key in keys]
|
||||
start_date = now - timedelta(days=7, hours=12)
|
||||
end_date = now + timedelta(hours=12)
|
||||
ids = [[key.hashed_adv_key_b64 for key in keys] for keys in device_keys]
|
||||
data = await self._account.fetch_raw_reports(start_date, end_date, ids)
|
||||
|
||||
id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys}
|
||||
id_to_key: dict[bytes, HasHashedPublicKey] = {
|
||||
key.hashed_adv_key_bytes: key for keys in device_keys for key in keys
|
||||
}
|
||||
reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list)
|
||||
for report in data.get("results", []):
|
||||
payload = base64.b64decode(report["payload"])
|
||||
hashed_adv_key = base64.b64decode(report["id"])
|
||||
date_published = datetime.fromtimestamp(
|
||||
report.get("datePublished", 0) / 1000,
|
||||
tz=timezone.utc,
|
||||
).astimezone()
|
||||
description = report.get("description", "")
|
||||
for key_reports in data.get("locationPayload", []):
|
||||
hashed_adv_key_bytes = base64.b64decode(key_reports["id"])
|
||||
key = id_to_key[hashed_adv_key_bytes]
|
||||
|
||||
loc_report = LocationReport(payload, hashed_adv_key, date_published, description)
|
||||
for report in key_reports.get("locationInfo", []):
|
||||
payload = base64.b64decode(report)
|
||||
loc_report = LocationReport(payload, hashed_adv_key_bytes)
|
||||
|
||||
# pre-decrypt if possible
|
||||
key = id_to_key[hashed_adv_key]
|
||||
if isinstance(key, KeyPair):
|
||||
loc_report.decrypt(key)
|
||||
if loc_report.timestamp < date_from or loc_report.timestamp > date_to:
|
||||
continue
|
||||
|
||||
if loc_report.timestamp < date_from or loc_report.timestamp > date_to:
|
||||
continue
|
||||
# pre-decrypt if possible
|
||||
if isinstance(key, KeyPair):
|
||||
loc_report.decrypt(key)
|
||||
|
||||
reports[key].append(loc_report)
|
||||
reports[key].append(loc_report)
|
||||
|
||||
return reports
|
||||
|
||||
Reference in New Issue
Block a user