diff --git a/examples/fetch_reports.py b/examples/fetch_reports.py index 24d79d4..27ec73c 100644 --- a/examples/fetch_reports.py +++ b/examples/fetch_reports.py @@ -37,7 +37,7 @@ async def login(account: AppleAccount): # Just take the first one to keep things simple method = methods[0] await method.request() - code = input('Code: ') + code = input("Code: ") # This automatically finishes the post-2FA login flow await method.submit(code) diff --git a/findmy/__init__.py b/findmy/__init__.py index 888fb44..8f64ec7 100644 --- a/findmy/__init__.py +++ b/findmy/__init__.py @@ -1,10 +1,4 @@ from .account import AppleAccount, LoginState, SmsSecondFactor from .anisette import RemoteAnisetteProvider -__all__ = ( - AppleAccount, - LoginState, - SmsSecondFactor, - - RemoteAnisetteProvider -) +__all__ = (AppleAccount, LoginState, SmsSecondFactor, RemoteAnisetteProvider) diff --git a/findmy/account.py b/findmy/account.py index aced878..e926d8f 100644 --- a/findmy/account.py +++ b/findmy/account.py @@ -108,7 +108,11 @@ def _extract_phone_numbers(html: str) -> list[dict]: raise RuntimeError("Could not find HTML element containing phone numbers") data = json.loads(data_elem.text) - return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) + return ( + data.get("direct", {}) + .get("phoneNumberVerification", {}) + .get("trustedPhoneNumbers", []) + ) def _require_login_state(*states: LoginState): @@ -163,7 +167,9 @@ class SmsSecondFactor(SecondFactorMethod): class AppleAccount: - def __init__(self, anisette: AnisetteProvider, user_id: str = None, device_id: str = None): + def __init__( + self, anisette: AnisetteProvider, user_id: str = None, device_id: str = None + ): self._anisette: AnisetteProvider = anisette self._uid: str = user_id or str(uuid.uuid4()) self._devid: str = device_id or str(uuid.uuid4()) @@ -178,7 +184,9 @@ class AppleAccount: self._http = HttpSession() - def _set_login_state(self, state: LoginState, data: Optional[dict] = None) -> LoginState: + def _set_login_state( + self, state: LoginState, data: Optional[dict] = None + ) -> LoginState: # clear account info if downgrading state (e.g. LOGGED_IN -> LOGGED_OUT) if state < self._login_state: logging.debug("Clearing cached account information") @@ -195,25 +203,38 @@ class AppleAccount: return self._login_state @property - @_require_login_state(LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA) + @_require_login_state( + LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA + ) def account_name(self): return self._account_info["account_name"] if self._account_info else None @property - @_require_login_state(LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA) + @_require_login_state( + LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA + ) def first_name(self): return self._account_info["first_name"] if self._account_info else None @property - @_require_login_state(LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA) + @_require_login_state( + LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA + ) def last_name(self): return self._account_info["last_name"] if self._account_info else None def export(self) -> dict: return { "ids": {"uid": self._uid, "devid": self._devid}, - "account": {"username": self._username, "password": self._password, "info": self._account_info}, - "login_state": {"state": self._login_state.value, "data": self._login_state_data}, + "account": { + "username": self._username, + "password": self._password, + "info": self._account_info, + }, + "login_state": { + "state": self._login_state.value, + "data": self._login_state_data, + }, } def restore(self, data: dict) -> None: @@ -253,7 +274,10 @@ class AppleAccount: try: phone_numbers = _extract_phone_numbers(auth_page) methods.extend( - SmsSecondFactor(self, number.get("id"), number.get("numberWithDialCode")) for number in phone_numbers + SmsSecondFactor( + self, number.get("id"), number.get("numberWithDialCode") + ) + for number in phone_numbers ) except RuntimeError: logging.warning("Unable to extract phone numbers from login page") @@ -264,13 +288,21 @@ class AppleAccount: async def sms_2fa_request(self, phone_number_id): data = {"phoneNumber": {"id": phone_number_id}, "mode": "sms"} - await self._sms_2fa_request("PUT", "https://gsa.apple.com/auth/verify/phone", data) + await self._sms_2fa_request( + "PUT", "https://gsa.apple.com/auth/verify/phone", data + ) @_require_login_state(LoginState.REQUIRE_2FA) async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: - data = {"phoneNumber": {"id": phone_number_id}, "securityCode": {"code": str(code)}, "mode": "sms"} + data = { + "phoneNumber": {"id": phone_number_id}, + "securityCode": {"code": str(code)}, + "mode": "sms", + } - await self._sms_2fa_request("POST", "https://gsa.apple.com/auth/verify/phone/securitycode", data) + await self._sms_2fa_request( + "POST", "https://gsa.apple.com/auth/verify/phone/securitycode", data + ) # REQUIRE_2FA -> AUTHENTICATED new_state = await self._gsa_authenticate() @@ -281,7 +313,9 @@ class AppleAccount: return await self._login_mobileme() @_require_login_state(LoginState.LOGGED_IN) - async def fetch_reports(self, keys: Sequence[KeyPair], date_from: datetime, date_to: datetime): + async def fetch_reports( + self, keys: Sequence[KeyPair], date_from: datetime, date_to: datetime + ): anisette_headers = await self.get_anisette_headers() return await fetch_reports( @@ -290,18 +324,24 @@ class AppleAccount: anisette_headers, date_from, date_to, - keys + keys, ) @_require_login_state(LoginState.LOGGED_IN) - async def fetch_last_reports(self, keys: Sequence[KeyPair], hours: int = 7 * 24): + async def fetch_last_reports( + self, + keys: Sequence[KeyPair], + hours: int = 7 * 24, + ): end = datetime.now() start = end - timedelta(hours=hours) return await self.fetch_reports(keys, start, end) @_require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA) - async def _gsa_authenticate(self, username: Optional[str] = None, password: Optional[str] = None) -> LoginState: + async def _gsa_authenticate( + self, username: Optional[str] = None, password: Optional[str] = None + ) -> LoginState: self._username = username or self._username self._password = password or self._password @@ -314,7 +354,9 @@ class AppleAccount: usr = srp.User(self._username, b"", hash_alg=srp.SHA256, ng_type=srp.NG_2048) _, a2k = usr.start_authentication() - r = await self._gsa_request({"A2k": a2k, "u": self._username, "ps": ["s2k", "s2k_fo"], "o": "init"}) + r = await self._gsa_request( + {"A2k": a2k, "u": self._username, "ps": ["s2k", "s2k_fo"], "o": "init"} + ) logging.debug("Verifying response to auth request") @@ -323,7 +365,9 @@ class AppleAccount: raise LoginException(f"Email verify failed: {message}") sp = r.get("sp") if sp != "s2k": - raise LoginException(f"This implementation only supports s2k. Server returned {sp}") + raise LoginException( + f"This implementation only supports s2k. Server returned {sp}" + ) logging.debug("Attempting password challenge") @@ -331,7 +375,9 @@ class AppleAccount: m1 = usr.process_challenge(r["s"], r["B"]) if m1 is None: raise LoginException("Failed to process challenge") - r = await self._gsa_request({"c": r["c"], "M1": m1, "u": self._username, "o": "complete"}) + r = await self._gsa_request( + {"c": r["c"], "M1": m1, "u": self._username, "o": "complete"} + ) logging.debug("Verifying password challenge response") @@ -369,7 +415,9 @@ class AppleAccount: logging.info("GSA authentication successful") idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "") - return self._set_login_state(LoginState.AUTHENTICATED, {"idms_pet": idms_pet, "adsid": spd["adsid"]}) + return self._set_login_state( + LoginState.AUTHENTICATED, {"idms_pet": idms_pet, "adsid": spd["adsid"]} + ) @_require_login_state(LoginState.AUTHENTICATED) async def _login_mobileme(self): @@ -387,15 +435,15 @@ class AppleAccount: "X-Apple-ADSID": self._login_state_data["adsid"], "User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0", "X-Mme-Client-Info": " " - " ", + " ", } headers.update(await self.get_anisette_headers()) async with await self._http.post( - "https://setup.icloud.com/setup/iosbuddy/loginDelegates", - auth=(self._username, self._login_state_data["idms_pet"]), - data=data, - headers=headers + "https://setup.icloud.com/setup/iosbuddy/loginDelegates", + auth=(self._username, self._login_state_data["idms_pet"]), + data=data, + headers=headers, ) as r: content = await r.content.read() resp = _load_plist(content) @@ -403,14 +451,19 @@ class AppleAccount: mobileme_data = resp.get("delegates", {}).get("com.apple.mobileme", {}) status = mobileme_data.get("status") if status != 0: - message = mobileme_data.get('status-message') - raise LoginException(f"com.apple.mobileme login failed with status {status}: {message}") + message = mobileme_data.get("status-message") + raise LoginException( + f"com.apple.mobileme login failed with status {status}: {message}" + ) return self._set_login_state( - LoginState.LOGGED_IN, {"dsid": resp["dsid"], "mobileme_data": mobileme_data["service-data"]} + LoginState.LOGGED_IN, + {"dsid": resp["dsid"], "mobileme_data": mobileme_data["service-data"]}, ) - async def _sms_2fa_request(self, method: str, url: str, data: Optional[dict] = None) -> str: + async def _sms_2fa_request( + self, method: str, url: str, data: Optional[dict] = None + ) -> str: adsid = self._login_state_data["adsid"] idms_token = self._login_state_data["idms_token"] identity_token = base64.b64encode((adsid + ":" + idms_token).encode()).decode() @@ -422,12 +475,12 @@ class AppleAccount: "X-Apple-App-Info": "com.apple.gs.xcode.auth", "X-Xcode-Version": "11.2 (11B41)", "X-Mme-Client-Info": " " - " ", + " ", } headers.update(await self.get_anisette_headers()) async with await self._http.request( - method, url, json=data, headers=headers + method, url, json=data, headers=headers ) as r: if not r.ok: raise LoginException(f"HTTP request failed: {r.status_code}") @@ -457,13 +510,13 @@ class AppleAccount: "Accept": "*/*", "User-Agent": "akd/1.0 CFNetwork/978.0.7 Darwin/18.7.0", "X-MMe-Client-Info": " " - "", + "", } async with await self._http.post( - "https://gsa.apple.com/grandslam/GsService2", - headers=headers, - data=plistlib.dumps(body) + "https://gsa.apple.com/grandslam/GsService2", + headers=headers, + data=plistlib.dumps(body), ) as r: content = await r.content.read() return _load_plist(content)["Response"] diff --git a/findmy/anisette.py b/findmy/anisette.py index 070e085..71a0f01 100644 --- a/findmy/anisette.py +++ b/findmy/anisette.py @@ -7,7 +7,7 @@ from datetime import datetime from .http import HttpSession -def _gen_meta_headers(user_id: str, device_id: str, serial: str = '0'): +def _gen_meta_headers(user_id: str, device_id: str, serial: str = "0"): now = datetime.utcnow() locale_str = locale.getdefaultlocale()[0] or "en_US" @@ -16,11 +16,10 @@ def _gen_meta_headers(user_id: str, device_id: str, serial: str = '0'): "X-Apple-I-TimeZone": str(now.astimezone().tzinfo), "loc": locale_str, "X-Apple-Locale": locale_str, - "X-Apple-I-MD-RINFO": "17106176", "X-Apple-I-MD-LU": base64.b64encode(str(user_id).upper().encode()).decode(), "X-Mme-Device-Id": str(device_id).upper(), - "X-Apple-I-SRL-NO": serial + "X-Apple-I-SRL-NO": serial, } @@ -33,7 +32,9 @@ class AnisetteProvider(ABC): async def close(self): return NotImplemented - async def get_headers(self, user_id: str, device_id: str, serial: str = '0') -> dict[str, str]: + async def get_headers( + self, user_id: str, device_id: str, serial: str = "0" + ) -> dict[str, str]: base_headers = await self._get_base_headers() base_headers.update(_gen_meta_headers(user_id, device_id, serial)) @@ -52,7 +53,10 @@ class RemoteAnisetteProvider(AnisetteProvider): async with await self._http.get(self._server_url) as r: headers = await r.json() - return {"X-Apple-I-MD": headers["X-Apple-I-MD"], "X-Apple-I-MD-M": headers["X-Apple-I-MD-M"]} + return { + "X-Apple-I-MD": headers["X-Apple-I-MD"], + "X-Apple-I-MD-M": headers["X-Apple-I-MD-M"], + } async def close(self): await self._http.close() diff --git a/findmy/reports.py b/findmy/reports.py index 71d7d66..9348ff3 100644 --- a/findmy/reports.py +++ b/findmy/reports.py @@ -19,22 +19,37 @@ class ReportsError(RuntimeError): def _decrypt_payload(payload: bytes, key: KeyPair) -> bytes: - eph_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP224R1(), payload[5:62]) + eph_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP224R1(), payload[5:62] + ) shared_key = key.dh_exchange(eph_key) - symmetric_key = hashlib.sha256(shared_key + b'\x00\x00\x00\x01' + payload[5:62]).digest() + symmetric_key = hashlib.sha256( + shared_key + b"\x00\x00\x00\x01" + payload[5:62] + ).digest() decryption_key = symmetric_key[:16] iv = symmetric_key[16:] enc_data = payload[62:72] tag = payload[72:] - decryptor = Cipher(algorithms.AES(decryption_key), modes.GCM(iv, tag), default_backend()).decryptor() + decryptor = Cipher( + algorithms.AES(decryption_key), modes.GCM(iv, tag), default_backend() + ).decryptor() return decryptor.update(enc_data) + decryptor.finalize() class KeyReport: - def __init__(self, key: KeyPair, publish_date: datetime, timestamp: datetime, description: str, lat: float, - lng: float, confidence: int, status: int): + def __init__( + self, + key: KeyPair, + publish_date: datetime, + timestamp: datetime, + description: str, + lat: float, + lng: float, + confidence: int, + status: int, + ): self._key = key self._publish_date = publish_date self._timestamp = timestamp @@ -79,17 +94,28 @@ class KeyReport: return self._status @classmethod - def from_payload(cls, key: KeyPair, publish_date: datetime, description: str, payload: bytes) -> "KeyReport": - timestamp_int = int.from_bytes(payload[0:4], 'big') + (60 * 60 * 24 * 11323) + def from_payload( + cls, key: KeyPair, publish_date: datetime, description: str, payload: bytes + ) -> "KeyReport": + timestamp_int = int.from_bytes(payload[0:4], "big") + (60 * 60 * 24 * 11323) timestamp = datetime.utcfromtimestamp(timestamp_int) data = _decrypt_payload(payload, key) latitude = struct.unpack(">i", data[0:4])[0] / 10000000 longitude = struct.unpack(">i", data[4:8])[0] / 10000000 - confidence = int.from_bytes(data[8:9], 'big') - status = int.from_bytes(data[9:10], 'big') + confidence = int.from_bytes(data[8:9], "big") + status = int.from_bytes(data[9:10], "big") - return KeyReport(key, publish_date, timestamp, description, latitude, longitude, confidence, status) + return KeyReport( + key, + publish_date, + timestamp, + description, + latitude, + longitude, + confidence, + status, + ) def __lt__(self, other): if isinstance(other, KeyReport): @@ -97,12 +123,20 @@ class KeyReport: return NotImplemented def __repr__(self): - return (f"") + return ( + f"" + ) -async def fetch_reports(dsid: str, search_party_token: str, anisette_headers: dict[str, str], date_from: datetime, - date_to: datetime, keys: Sequence[KeyPair]): +async def fetch_reports( + dsid: str, + search_party_token: str, + anisette_headers: dict[str, str], + date_from: datetime, + date_to: datetime, + keys: Sequence[KeyPair], +): start_date = date_from.timestamp() * 1000 end_date = date_to.timestamp() * 1000 ids = [key.hashed_adv_key_b64 for key in keys] @@ -111,10 +145,10 @@ async def fetch_reports(dsid: str, search_party_token: str, anisette_headers: di # TODO: do not create a new session every time # probably needs a wrapper class to allow closing the connections async with await _session.post( - "https://gateway.icloud.com/acsnservice/fetch", - auth=(dsid, search_party_token), - headers=anisette_headers, - json=data + "https://gateway.icloud.com/acsnservice/fetch", + auth=(dsid, search_party_token), + headers=anisette_headers, + json=data, ) as r: resp = await r.json() if not r.ok or resp["statusCode"] != "200": @@ -126,7 +160,9 @@ async def fetch_reports(dsid: str, search_party_token: str, anisette_headers: di for report in resp.get("results", []): key = id_to_key[report["id"]] - date_published = datetime.utcfromtimestamp(report.get("datePublished", 0) / 1000) + date_published = datetime.utcfromtimestamp( + report.get("datePublished", 0) / 1000 + ) description = report.get("description", "") payload = base64.b64decode(report["payload"])