From b97b094ccae0ca0c48a3900d1320a724d988d441 Mon Sep 17 00:00:00 2001 From: Mike A Date: Tue, 5 Mar 2024 23:55:40 +0100 Subject: [PATCH] chore(ani): Refactoring to improve readability --- findmy/reports/account.py | 154 +++++++++++++-------------- findmy/reports/anisette.py | 211 ++++++++++++++++++++++++++++++------- findmy/util/crypto.py | 37 ++++++- pyproject.toml | 1 + 4 files changed, 276 insertions(+), 127 deletions(-) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index bf6ae08..1efb967 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -3,8 +3,6 @@ from __future__ import annotations import asyncio import base64 -import hashlib -import hmac import json import logging import plistlib @@ -26,12 +24,10 @@ from typing import ( import bs4 import srp._pysrp as srp -from cryptography.hazmat.primitives import hashes, padding -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from typing_extensions import override from findmy.errors import InvalidCredentialsError, InvalidStateError, UnhandledProtocolError +from findmy.util import crypto from findmy.util.closable import Closable from findmy.util.http import HttpSession, decode_plist @@ -96,32 +92,6 @@ def require_login_state(*states: LoginState) -> Callable[[_F], _F]: return decorator -def _encrypt_password(password: str, salt: bytes, iterations: int) -> bytes: - p = hashlib.sha256(password.encode("utf-8")).digest() - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=32, - salt=salt, - iterations=iterations, - ) - return kdf.derive(p) - - -def _decrypt_cbc(session_key: bytes, data: bytes) -> bytes: - extra_data_key = hmac.new(session_key, b"extra data key:", hashlib.sha256).digest() - extra_data_iv = hmac.new(session_key, b"extra data iv:", hashlib.sha256).digest() - # Get only the first 16 bytes of the iv - extra_data_iv = extra_data_iv[:16] - - # Decrypt with AES CBC - cipher = Cipher(algorithms.AES(extra_data_key), modes.CBC(extra_data_iv)) - decryptor = cipher.decryptor() - data = decryptor.update(data) + decryptor.finalize() - # Remove PKCS#7 padding - padder = padding.PKCS7(128).unpadder() - return padder.update(data) + padder.finalize() - - def _extract_phone_numbers(html: str) -> list[dict]: soup = bs4.BeautifulSoup(html, features="html.parser") data_elem = soup.find("script", {"class": "boot_args"}) @@ -273,7 +243,11 @@ class BaseAppleAccount(Closable, ABC): raise NotImplementedError @abstractmethod - def get_anisette_headers(self, serial: str = "0") -> MaybeCoro[dict[str, str]]: + def get_anisette_headers( + self, + with_client_info: bool = False, + serial: str = "0", + ) -> MaybeCoro[dict[str, str]]: """ Retrieve a complete dictionary of Anisette headers. @@ -285,6 +259,20 @@ class BaseAppleAccount(Closable, ABC): class AsyncAppleAccount(BaseAppleAccount): """An async implementation of `BaseAppleAccount`.""" + # auth endpoints + _ENDPOINT_GSA = "https://gsa.apple.com/grandslam/GsService2" + _ENDPOINT_LOGIN_MOBILEME = "https://setup.icloud.com/setup/iosbuddy/loginDelegates" + + # 2fa auth endpoints + _ENDPOINT_2FA_METHODS = "https://gsa.apple.com/auth" + _ENDPOINT_2FA_SMS_REQUEST = "https://gsa.apple.com/auth/verify/phone" + _ENDPOINT_2FA_SMS_SUBMIT = "https://gsa.apple.com/auth/verify/phone/securitycode" + _ENDPOINT_2FA_TD_REQUEST = "https://gsa.apple.com/auth/verify/trusteddevice" + _ENDPOINT_2FA_TD_SUBMIT = "https://gsa.apple.com/grandslam/GsService2/validate" + + # reports endpoints + _ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/acsnservice/fetch" + def __init__( self, anisette: BaseAnisetteProvider, @@ -438,7 +426,7 @@ class AsyncAppleAccount(BaseAppleAccount): methods.append(AsyncTrustedDeviceSecondFactor(self)) # sms - auth_page = await self._sms_2fa_request("GET", "https://gsa.apple.com/auth") + auth_page = await self._sms_2fa_request("GET", self._ENDPOINT_2FA_METHODS) try: phone_numbers = _extract_phone_numbers(auth_page) methods.extend( @@ -462,7 +450,7 @@ class AsyncAppleAccount(BaseAppleAccount): await self._sms_2fa_request( "PUT", - "https://gsa.apple.com/auth/verify/phone", + self._ENDPOINT_2FA_SMS_REQUEST, data, ) @@ -478,7 +466,7 @@ class AsyncAppleAccount(BaseAppleAccount): await self._sms_2fa_request( "POST", - "https://gsa.apple.com/auth/verify/phone/securitycode", + self._ENDPOINT_2FA_SMS_SUBMIT, data, ) @@ -501,7 +489,7 @@ class AsyncAppleAccount(BaseAppleAccount): } await self._sms_2fa_request( "GET", - "https://gsa.apple.com/auth/verify/trusteddevice", + self._ENDPOINT_2FA_TD_REQUEST, headers=headers, ) @@ -516,7 +504,7 @@ class AsyncAppleAccount(BaseAppleAccount): } await self._sms_2fa_request( "GET", - "https://gsa.apple.com/grandslam/GsService2/validate", + self._ENDPOINT_2FA_TD_SUBMIT, headers=headers, ) @@ -537,8 +525,9 @@ class AsyncAppleAccount(BaseAppleAccount): self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], ) data = {"search": [{"startDate": start, "endDate": end, "ids": ids}]} + r = await self._http.post( - "https://gateway.icloud.com/acsnservice/fetch", + self._ENDPOINT_REPORTS_FETCH, auth=auth, headers=await self.get_anisette_headers(), json=data, @@ -616,7 +605,7 @@ class AsyncAppleAccount(BaseAppleAccount): logging.debug("Attempting password challenge") - usr.p = _encrypt_password(self._password, r["s"], r["i"]) + usr.p = crypto.encrypt_password(self._password, r["s"], r["i"]) m1 = usr.process_challenge(r["s"], r["B"]) if m1 is None: msg = "Failed to process challenge" @@ -637,8 +626,12 @@ class AsyncAppleAccount(BaseAppleAccount): logging.debug("Decrypting SPD data in response") - spd = _decrypt_cbc(usr.get_session_key() or b"", r["spd"]) - spd = decode_plist(spd) + spd = decode_plist( + crypto.decrypt_spd_aes_cbc( + usr.get_session_key() or b"", + r["spd"], + ), + ) logging.debug("Received account information") self._account_info = cast( @@ -655,27 +648,23 @@ class AsyncAppleAccount(BaseAppleAccount): if au in ("secondaryAuth", "trustedDeviceSecondaryAuth"): logging.info("Detected 2FA requirement: %s", au) - self._account_info.update( - { - "trusted_device_2fa": au == "trustedDeviceSecondaryAuth", - }, - ) + self._account_info["trusted_device_2fa"] = au == "trustedDeviceSecondaryAuth" return self._set_login_state( LoginState.REQUIRE_2FA, {"adsid": spd["adsid"], "idms_token": spd["GsIdmsToken"]}, ) - if au is not None: - msg = f"Unknown auth value: {au}" - raise UnhandledProtocolError(msg) + if au is None: + logging.info("GSA authentication successful") - logging.info("GSA authentication successful") + idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "") + return self._set_login_state( + LoginState.AUTHENTICATED, + {"idms_pet": idms_pet, "adsid": spd["adsid"]}, + ) - idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "") - return self._set_login_state( - LoginState.AUTHENTICATED, - {"idms_pet": idms_pet, "adsid": spd["adsid"]}, - ) + msg = f"Unknown auth value: {au}" + raise UnhandledProtocolError(msg) @require_login_state(LoginState.AUTHENTICATED) async def _login_mobileme(self) -> LoginState: @@ -698,7 +687,7 @@ class AsyncAppleAccount(BaseAppleAccount): headers.update(await self.get_anisette_headers()) resp = await self._http.post( - "https://setup.icloud.com/setup/iosbuddy/loginDelegates", + self._ENDPOINT_LOGIN_MOBILEME, auth=(self._username or "", self._login_state_data["idms_pet"]), data=data, headers=headers, @@ -734,13 +723,9 @@ class AsyncAppleAccount(BaseAppleAccount): "User-Agent": "Xcode", "Accept-Language": "en-us", "X-Apple-Identity-Token": identity_token, - "X-Apple-App-Info": "com.apple.gs.xcode.auth", - "X-Xcode-Version": "11.2 (11B41)", - "X-Mme-Client-Info": " " - " ", }, ) - headers.update(await self.get_anisette_headers()) + headers.update(await self.get_anisette_headers(with_client_info=True)) r = await self._http.request( method, @@ -754,34 +739,29 @@ class AsyncAppleAccount(BaseAppleAccount): return r.text() - async def _gsa_request(self, params: dict[str, Any]) -> dict[Any, Any]: - request_data = { - "cpd": { - "bootstrap": True, - "icscrec": True, - "pbe": False, - "prkgen": True, - "svct": "iCloud", - }, - } - request_data["cpd"].update(await self.get_anisette_headers()) - request_data.update(params) - + async def _gsa_request(self, parameters: dict[str, Any]) -> dict[str, Any]: body = { - "Header": {"Version": "1.0.1"}, - "Request": request_data, + "Header": { + "Version": "1.0.1", + }, + "Request": { + "cpd": await self._anisette.get_cpd( + self._uid, + self._devid, + ), + **parameters, + }, } headers = { "Content-Type": "text/x-xml-plist", "Accept": "*/*", "User-Agent": "akd/1.0 CFNetwork/978.0.7 Darwin/18.7.0", - "X-MMe-Client-Info": " " - "", + "X-MMe-Client-Info": self._anisette.client, } resp = await self._http.post( - "https://gsa.apple.com/grandslam/GsService2", + self._ENDPOINT_GSA, headers=headers, data=plistlib.dumps(body), ) @@ -791,9 +771,13 @@ class AsyncAppleAccount(BaseAppleAccount): return resp.plist()["Response"] @override - async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: + async def get_anisette_headers( + self, + with_client_info: bool = False, + serial: str = "0", + ) -> dict[str, str]: """See `BaseAppleAccount.get_anisette_headers`.""" - return await self._anisette.get_headers(self._uid, self._devid, serial) + return await self._anisette.get_headers(self._uid, self._devid, serial, with_client_info) class AppleAccount(BaseAppleAccount): @@ -932,7 +916,11 @@ class AppleAccount(BaseAppleAccount): return self._evt_loop.run_until_complete(coro) @override - def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: + def get_anisette_headers( + self, + with_client_info: bool = False, + serial: str = "0", + ) -> dict[str, str]: """See `AsyncAppleAccount.get_anisette_headers`.""" - coro = self._asyncacc.get_anisette_headers(serial) + coro = self._asyncacc.get_anisette_headers(with_client_info, serial) return self._evt_loop.run_until_complete(coro) diff --git a/findmy/reports/anisette.py b/findmy/reports/anisette.py index bdf4b2e..3bfe4e9 100644 --- a/findmy/reports/anisette.py +++ b/findmy/reports/anisette.py @@ -13,48 +13,148 @@ from findmy.util.closable import Closable from findmy.util.http import HttpSession -def _gen_meta_headers( - user_id: str, - device_id: str, - serial: str = "0", -) -> dict[str, str]: - now = datetime.now(tz=timezone.utc) - locale_str = locale.getdefaultlocale()[0] or "en_US" - - return { - "X-Apple-I-Client-Time": now.replace(microsecond=0).isoformat() + "Z", - "X-Apple-I-TimeZone": str(now.astimezone().tzinfo), - "loc": locale_str, - "X-Apple-Locale": locale_str, - "X-Apple-I-MD-RINFO": "17106176", - "X-Apple-I-MD-LU": base64.b64encode(str(user_id).upper().encode()).decode(), - "X-Mme-Device-Id": str(device_id).upper(), - "X-Apple-I-SRL-NO": serial, - } - - class BaseAnisetteProvider(Closable, ABC): - """Abstract base class for Anisette providers.""" + """ + Abstract base class for Anisette providers. + Generously derived from https://github.com/nythepegasus/grandslam/blob/main/src/grandslam/gsa.py#L41. + """ + + @property @abstractmethod - async def _get_base_headers(self) -> dict[str, str]: + def otp(self) -> str: + """ + A seemingly random base64 string containing 28 bytes. + + TODO: Figure out how to generate this. + """ raise NotImplementedError + @property + @abstractmethod + def machine(self) -> str: + """ + A base64 encoded string of 60 'random' bytes. + + We're not sure how this is generated, we have to rely on the server. + TODO: Figure out how to generate this. + """ + raise NotImplementedError + + @property + def timestamp(self) -> str: + """Current timestamp in ISO 8601 format.""" + return datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() + "Z" + + @property + def timezone(self) -> str: + """Abbreviation of the timezone of the device.""" + return str(datetime.now().astimezone().tzinfo) + + @property + def locale(self) -> str: + """Locale of the device (e.g. en_US).""" + return locale.getdefaultlocale()[0] or "en_US" + + @property + def router(self) -> str: + """ + A number, either 17106176 or 50660608. + + It doesn't seem to matter which one we use. + - 17106176 is used by Sideloadly and Provision (android) based servers. + - 50660608 is used by Windows iCloud based servers. + """ + return "17106176" + + @property + def client(self) -> str: + """ + Client string. + + The format is as follows: + <%MODEL%> <%OS%;%MAJOR%.%MINOR%(%SPMAJOR%,%SPMINOR%);%BUILD%> + <%AUTHKIT_BUNDLE_ID%/%AUTHKIT_VERSION% (%APP_BUNDLE_ID%/%APP_VERSION%)> + + Where: + MODEL: The model of the device (e.g. MacBookPro15,1 or 'PC' + OS: The OS of the device (e.g. Mac OS X or Windows) + MAJOR: The major version of the OS (e.g. 10) + MINOR: The minor version of the OS (e.g. 15) + SPMAJOR: The major version of the service pack (e.g. 0) (Windows only) + SPMINOR: The minor version of the service pack (e.g. 0) (Windows only) + BUILD: The build number of the OS (e.g. 19C57) + AUTHKIT_BUNDLE_ID: The bundle ID of the AuthKit framework (e.g. com.apple.AuthKit) + AUTHKIT_VERSION: The version of the AuthKit framework (e.g. 1) + APP_BUNDLE_ID: The bundle ID of the app (e.g. com.apple.dt.Xcode) + APP_VERSION: The version of the app (e.g. 3594.4.19) + """ + return ( + " " + "" + ) + async def get_headers( self, user_id: str, device_id: str, serial: str = "0", + with_client_info: bool = False, ) -> dict[str, str]: """ - Retrieve a complete dictionary of Anisette headers. + Generate a complete dictionary of Anisette headers. Consider using `BaseAppleAccount.get_anisette_headers` instead. """ - base_headers = await self._get_base_headers() - base_headers.update(_gen_meta_headers(user_id, device_id, serial)) + headers = { + # Current Time + "X-Apple-I-Client-Time": self.timestamp, + "X-Apple-I-TimeZone": self.timezone, + # Locale + "loc": self.locale, + "X-Apple-Locale": self.locale, + # 'One Time Password' + "X-Apple-I-MD": self.otp, + # 'Local User ID' + "X-Apple-I-MD-LU": base64.b64encode(str(user_id).encode()).decode(), + # 'Machine ID' + "X-Apple-I-MD-M": self.machine, + # 'Routing Info', some implementations convert this to an integer + "X-Apple-I-MD-RINFO": self.router, + # 'Device Unique Identifier' + "X-Mme-Device-Id": str(device_id).upper(), + # 'Device Serial Number' + "X-Apple-I-SRL-NO": serial, + } - return base_headers + if with_client_info: + headers["X-Mme-Client-Info"] = self.client + headers["X-Apple-App-Info"] = "com.apple.gs.xcode.auth" + headers["X-Xcode-Version"] = "11.2 (11B41)" + + return headers + + async def get_cpd( + self, + user_id: str, + device_id: str, + serial: str = "0", + ) -> dict[str, str]: + """ + Generate a complete dictionary of CPD data. + + Intended for internal use. + """ + cpd = { + "bootstrap": True, + "icscrec": True, + "pbe": False, + "prkgen": True, + "svct": "iCloud", + } + cpd.update(await self.get_headers(user_id, device_id, serial)) + + return cpd class RemoteAnisetteProvider(BaseAnisetteProvider): @@ -68,17 +168,42 @@ class RemoteAnisetteProvider(BaseAnisetteProvider): self._http = HttpSession() - logging.info("Using remote anisette server: %s", self._server_url) + self._anisette_data: dict[str, str] | None = None + + @property + @override + def otp(self) -> str: + """See `BaseAnisetteProvider.otp`_.""" + otp = (self._anisette_data or {}).get("X-Apple-I-MD") + if otp is None: + logging.warning("X-Apple-I-MD header not found! Returning fallback...") + return otp or "" + + @property + @override + def machine(self) -> str: + """See `BaseAnisetteProvider.machine`_.""" + machine = (self._anisette_data or {}).get("X-Apple-I-MD-M") + if machine is None: + logging.warning("X-Apple-I-MD-M header not found! Returning fallback...") + return machine or "" @override - async def _get_base_headers(self) -> dict[str, str]: - r = await self._http.get(self._server_url) - headers = r.json() + async def get_headers( + self, + user_id: str, + device_id: str, + serial: str = "0", + with_client_info: bool = False, + ) -> dict[str, str]: + """See `BaseAnisetteProvider.get_headers`_.""" + if self._anisette_data is None: + logging.info("Fetching anisette data from %s", self._server_url) - return { - "X-Apple-I-MD": headers["X-Apple-I-MD"], - "X-Apple-I-MD-M": headers["X-Apple-I-MD-M"], - } + r = await self._http.get(self._server_url) + self._anisette_data = r.json() + + return await super().get_headers(user_id, device_id, serial, with_client_info) @override async def close(self) -> None: @@ -91,14 +216,18 @@ class RemoteAnisetteProvider(BaseAnisetteProvider): class LocalAnisetteProvider(BaseAnisetteProvider): """Anisette provider. Generates headers without a remote server using pyprovision.""" - def __init__(self) -> None: - """Initialize the provider.""" - super().__init__() - + @property @override - async def _get_base_headers(self) -> dict[str, str]: - return NotImplemented + def otp(self) -> str: + """See `BaseAnisetteProvider.otp`_.""" + raise NotImplementedError + + @property + @override + def machine(self) -> str: + """See `BaseAnisetteProvider.machine`_.""" + raise NotImplementedError @override async def close(self) -> None: - """See `AnisetteProvider.close`.""" + """See `BaseAnisetteProvider.close`_.""" diff --git a/findmy/util/crypto.py b/findmy/util/crypto.py index e14c206..810e92b 100644 --- a/findmy/util/crypto.py +++ b/findmy/util/crypto.py @@ -1,13 +1,44 @@ """Pure-python NIST P-224 Elliptic Curve cryptography. Used for some Apple algorithms.""" -from cryptography.hazmat.primitives import hashes +import hashlib +import hmac + +from cryptography.hazmat.primitives import hashes, padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.x963kdf import X963KDF -ECPoint = tuple[float, float] - P224_N = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFF16A2E0B8F03E13DD29455C5C2A3D +def encrypt_password(password: str, salt: bytes, iterations: int) -> bytes: + """Encrypt password using PBKDF2-HMAC.""" + p = hashlib.sha256(password.encode("utf-8")).digest() + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=iterations, + ) + return kdf.derive(p) + + +def decrypt_spd_aes_cbc(session_key: bytes, data: bytes) -> bytes: + """Decrypt SPD data using SRP session key.""" + extra_data_key = hmac.new(session_key, b"extra data key:", hashlib.sha256).digest() + extra_data_iv = hmac.new(session_key, b"extra data iv:", hashlib.sha256).digest() + # Get only the first 16 bytes of the iv + extra_data_iv = extra_data_iv[:16] + + # Decrypt with AES CBC + cipher = Cipher(algorithms.AES(extra_data_key), modes.CBC(extra_data_iv)) + decryptor = cipher.decryptor() + data = decryptor.update(data) + decryptor.finalize() + # Remove PKCS#7 padding + padder = padding.PKCS7(128).unpadder() + return padder.update(data) + padder.finalize() + + def x963_kdf(value: bytes, si: bytes, length: int) -> bytes: """Single pass of X9.63 KDF with SHA1.""" return X963KDF( diff --git a/pyproject.toml b/pyproject.toml index c241125..d0dfd7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ ignore = [ "D105", # docstrings in magic methods "PLR2004", # "magic" values >.> + "FBT", # boolean "traps" ] line-length = 100