diff --git a/README.md b/README.md index b5c27dc..55b3e72 100644 --- a/README.md +++ b/README.md @@ -15,9 +15,8 @@ application wishing to integrate with the Find My network. - [x] Apple Account log-in - [x] SMS 2FA support - [x] Fetch location reports -- [x] Generate new accessory keys -- [x] Import accessory keys -- [x] Fully async +- [x] Generate and import accessory keys +- [x] Both async and sync API - [x] Modular with a high degree of manual control ## Roadmap @@ -27,9 +26,6 @@ application wishing to integrate with the Find My network. - [ ] Local anisette generation (without server) - Can be done using [pyprovision](https://github.com/Dadoum/pyprovision/), however I want to wait until Python wheels are available. -- [ ] Sync API wrapper - - I realize not everyone may be comfortable using an async library; - building a synchronous wrapper around the `AppleAccount` class would be nice. # Installation diff --git a/examples/fetch_reports.py b/examples/fetch_reports.py index 27ec73c..a6ea8ab 100644 --- a/examples/fetch_reports.py +++ b/examples/fetch_reports.py @@ -1,4 +1,3 @@ -import asyncio import json import logging import os @@ -22,12 +21,12 @@ KEY_ADV = "" logging.basicConfig(level=logging.DEBUG) -async def login(account: AppleAccount): - state = await account.login(ACCOUNT_EMAIL, ACCOUNT_PASS) +def login(account: AppleAccount): + state = account.login(ACCOUNT_EMAIL, ACCOUNT_PASS) if state == LoginState.REQUIRE_2FA: # Account requires 2FA # This only supports SMS methods for now - methods = await account.get_2fa_methods() + methods = account.get_2fa_methods() # Print the (masked) phone numbers for method in methods: @@ -36,37 +35,33 @@ async def login(account: AppleAccount): # Just take the first one to keep things simple method = methods[0] - await method.request() + method.request() code = input("Code: ") # This automatically finishes the post-2FA login flow - await method.submit(code) + method.submit(code) return account -async def fetch_reports(lookup_key): +def fetch_reports(lookup_key): anisette = RemoteAnisetteProvider(ANISETTE_SERVER) acc = AppleAccount(anisette) - try: - # Save / restore account logic - if os.path.isfile("account.json"): - with open("account.json", "r") as f: - acc.restore(json.load(f)) - else: - await login(acc) - with open("account.json", "w+") as f: - json.dump(acc.export(), f) + # Save / restore account logic + if os.path.isfile("account.json"): + with open("account.json", "r") as f: + acc.restore(json.load(f)) + else: + login(acc) + # with open("account.json", "w+") as f: + # json.dump(acc.export(), f) - print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})") + print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})") - # It's that simple! - reports = await acc.fetch_last_reports([lookup_key]) - print(reports) - - finally: - await acc.close() + # It's that simple! + reports = acc.fetch_last_reports([lookup_key]) + print(reports) if __name__ == "__main__": @@ -74,4 +69,4 @@ if __name__ == "__main__": if KEY_ADV: # verify that your adv key is correct :D assert key.adv_key_b64 == KEY_ADV - asyncio.run(fetch_reports(key)) + fetch_reports(key) diff --git a/examples/fetch_reports_async.py b/examples/fetch_reports_async.py new file mode 100644 index 0000000..8c1ec8e --- /dev/null +++ b/examples/fetch_reports_async.py @@ -0,0 +1,82 @@ +import asyncio +import json +import logging +import os + +from findmy import ( + AsyncAppleAccount, + LoginState, + SmsSecondFactor, + RemoteAnisetteProvider, +) +from findmy import keys + +# URL to (public or local) anisette server +ANISETTE_SERVER = "http://localhost:6969" + +# Apple account details +ACCOUNT_EMAIL = "test@test.com" +ACCOUNT_PASS = "1234" + +# Private base64-encoded key to look up +KEY_PRIV = "" + +# Optional, to verify that advertisement key derivation works for your key +KEY_ADV = "" + +logging.basicConfig(level=logging.DEBUG) + + +async def login(account: AsyncAppleAccount): + state = await account.login(ACCOUNT_EMAIL, ACCOUNT_PASS) + + if state == LoginState.REQUIRE_2FA: # Account requires 2FA + # This only supports SMS methods for now + methods = await account.get_2fa_methods() + + # Print the (masked) phone numbers + for method in methods: + if isinstance(method, SmsSecondFactor): + print(method.phone_number) + + # Just take the first one to keep things simple + method = methods[0] + await method.request() + code = input("Code: ") + + # This automatically finishes the post-2FA login flow + await method.submit(code) + + return account + + +async def fetch_reports(lookup_key): + anisette = RemoteAnisetteProvider(ANISETTE_SERVER) + acc = AsyncAppleAccount(anisette) + + try: + # Save / restore account logic + if os.path.isfile("account.json"): + with open("account.json", "r") as f: + acc.restore(json.load(f)) + else: + await login(acc) + with open("account.json", "w+") as f: + json.dump(acc.export(), f) + + print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})") + + # It's that simple! + reports = await acc.fetch_last_reports([lookup_key]) + print(reports) + + finally: + await acc.close() + + +if __name__ == "__main__": + key = keys.KeyPair.from_b64(KEY_PRIV) + if KEY_ADV: # verify that your adv key is correct :D + assert key.adv_key_b64 == KEY_ADV + + asyncio.run(fetch_reports(key)) diff --git a/findmy/__init__.py b/findmy/__init__.py index 8f64ec7..d779ac9 100644 --- a/findmy/__init__.py +++ b/findmy/__init__.py @@ -1,4 +1,10 @@ -from .account import AppleAccount, LoginState, SmsSecondFactor +from .account import AppleAccount, AsyncAppleAccount, LoginState, SmsSecondFactor from .anisette import RemoteAnisetteProvider -__all__ = (AppleAccount, LoginState, SmsSecondFactor, RemoteAnisetteProvider) +__all__ = ( + AppleAccount, + AsyncAppleAccount, + LoginState, + SmsSecondFactor, + RemoteAnisetteProvider, +) diff --git a/findmy/account.py b/findmy/account.py index e926d8f..351c392 100644 --- a/findmy/account.py +++ b/findmy/account.py @@ -1,3 +1,4 @@ +import asyncio import base64 import hashlib import hmac @@ -5,9 +6,7 @@ import json import logging import plistlib import uuid -from abc import ABC, abstractmethod from datetime import datetime, timedelta -from enum import Enum from functools import wraps from typing import Optional, TypedDict, Any from typing import Sequence @@ -19,6 +18,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from .anisette import AnisetteProvider +from .base import BaseAppleAccount, BaseSecondFactorMethod, LoginState from .http import HttpSession from .keys import KeyPair from .reports import fetch_reports @@ -29,22 +29,6 @@ srp.rfc5054_enable() srp.no_username_in_x() -class LoginState(Enum): - LOGGED_OUT = 0 - REQUIRE_2FA = 1 - AUTHENTICATED = 2 - LOGGED_IN = 3 - - def __lt__(self, other): - if isinstance(other, LoginState): - return self.value < other.value - - return NotImplemented - - def __repr__(self): - return self.__str__() - - class AccountInfo(TypedDict): account_name: str first_name: str @@ -118,7 +102,7 @@ def _extract_phone_numbers(html: str) -> list[dict]: def _require_login_state(*states: LoginState): def decorator(func): @wraps(func) - def wrapper(acc: "AppleAccount", *args, **kwargs): + def wrapper(acc: "BaseAppleAccount", *args, **kwargs): if acc.login_state not in states: raise InvalidStateException( f"Invalid login state! Currently: {acc.login_state} but should be one of: {states}" @@ -131,30 +115,17 @@ def _require_login_state(*states: LoginState): return decorator -class SecondFactorMethod(ABC): - def __init__(self, account: "AppleAccount"): - self._account = account - - @property - def account(self): - return self._account - - @abstractmethod - def request(self) -> None: - raise NotImplementedError() - - @abstractmethod - def submit(self, code: str) -> LoginState: - raise NotImplementedError() - - -class SmsSecondFactor(SecondFactorMethod): - def __init__(self, account: "AppleAccount", number_id: int, phone_number: str): +class AsyncSmsSecondFactor(BaseSecondFactorMethod): + def __init__(self, account: "AsyncAppleAccount", number_id: int, phone_number: str): super().__init__(account) self._phone_number_id: int = number_id self._phone_number: str = phone_number + @property + def phone_number_id(self): + return self._phone_number_id + @property def phone_number(self): return self._phone_number @@ -166,7 +137,25 @@ class SmsSecondFactor(SecondFactorMethod): return await self.account.sms_2fa_submit(self._phone_number_id, code) -class AppleAccount: +class SmsSecondFactor(BaseSecondFactorMethod): + def __init__(self, account: "AppleAccount", number_id: int, phone_number: str): + super().__init__(account) + + self._phone_number_id: int = number_id + self._phone_number: str = phone_number + + @property + def phone_number(self): + return self._phone_number + + def request(self) -> None: + return self.account.sms_2fa_request(self._phone_number_id) + + def submit(self, code: str) -> LoginState: + return self.account.sms_2fa_submit(self._phone_number_id, code) + + +class AsyncAppleAccount(BaseAppleAccount): def __init__( self, anisette: AnisetteProvider, user_id: str = None, device_id: str = None ): @@ -237,7 +226,7 @@ class AppleAccount: }, } - def restore(self, data: dict) -> None: + def restore(self, data: dict): try: self._uid = data["ids"]["uid"] self._devid = data["ids"]["devid"] @@ -266,26 +255,27 @@ class AppleAccount: return await self._login_mobileme() @_require_login_state(LoginState.REQUIRE_2FA) - async def get_2fa_methods(self) -> list[SecondFactorMethod]: - methods: list[SecondFactorMethod] = [] + async def get_2fa_methods(self) -> list[BaseSecondFactorMethod]: + methods: list[BaseSecondFactorMethod] = [] # sms auth_page = await self._sms_2fa_request("GET", "https://gsa.apple.com/auth") try: phone_numbers = _extract_phone_numbers(auth_page) - methods.extend( - SmsSecondFactor( - self, number.get("id"), number.get("numberWithDialCode") - ) - for number in phone_numbers - ) except RuntimeError: logging.warning("Unable to extract phone numbers from login page") + methods.extend( + AsyncSmsSecondFactor( + self, number.get("id"), number.get("numberWithDialCode") + ) + for number in phone_numbers + ) + return methods @_require_login_state(LoginState.REQUIRE_2FA) - async def sms_2fa_request(self, phone_number_id): + async def sms_2fa_request(self, phone_number_id: int): data = {"phoneNumber": {"id": phone_number_id}, "mode": "sms"} await self._sms_2fa_request( @@ -523,3 +513,83 @@ class AppleAccount: async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: return await self._anisette.get_headers(self._uid, self._devid, serial) + + +class AppleAccount(BaseAppleAccount): + def __init__( + self, anisette: AnisetteProvider, user_id: str = None, device_id: str = None + ): + self._asyncacc = AsyncAppleAccount(anisette, user_id, device_id) + + try: + self._loop = asyncio.get_running_loop() + except RuntimeError: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + def __del__(self) -> None: + coro = self._asyncacc.close() + return self._loop.run_until_complete(coro) + + @property + def login_state(self): + return self._asyncacc.login_state + + @property + def account_name(self): + return self._asyncacc.account_name + + @property + def first_name(self): + return self._asyncacc.first_name + + @property + def last_name(self): + return self._asyncacc.last_name + + def export(self) -> dict: + return self._asyncacc.export() + + def restore(self, data: dict): + return self._asyncacc.restore(data) + + def login(self, username: str, password: str) -> LoginState: + coro = self._asyncacc.login(username, password) + return self._loop.run_until_complete(coro) + + def get_2fa_methods(self) -> list[BaseSecondFactorMethod]: + coro = self._asyncacc.get_2fa_methods() + methods = self._loop.run_until_complete(coro) + + res = [] + for m in methods: + if isinstance(m, AsyncSmsSecondFactor): + res.append(SmsSecondFactor(self, m.phone_number_id, m.phone_number)) + else: + raise RuntimeError( + f"Failed to cast 2FA object to sync alternative: {m}. This is a bug, please report it." + ) + + return res + + def sms_2fa_request(self, phone_number_id: int): + coro = self._asyncacc.sms_2fa_request(phone_number_id) + return self._loop.run_until_complete(coro) + + def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: + coro = self._asyncacc.sms_2fa_submit(phone_number_id, code) + return self._loop.run_until_complete(coro) + + def fetch_reports( + self, keys: Sequence[KeyPair], date_from: datetime, date_to: datetime + ): + coro = self._asyncacc.fetch_reports(keys, date_from, date_to) + return self._loop.run_until_complete(coro) + + def fetch_last_reports(self, keys: Sequence[KeyPair], hours: int = 7 * 24): + coro = self._asyncacc.fetch_last_reports(keys, hours) + return self._loop.run_until_complete(coro) + + def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: + coro = self._asyncacc.get_anisette_headers() + return self._loop.run_until_complete(coro) diff --git a/findmy/base.py b/findmy/base.py new file mode 100644 index 0000000..971e367 --- /dev/null +++ b/findmy/base.py @@ -0,0 +1,103 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from enum import Enum +from typing import Sequence + +from .keys import KeyPair + + +class LoginState(Enum): + LOGGED_OUT = 0 + REQUIRE_2FA = 1 + AUTHENTICATED = 2 + LOGGED_IN = 3 + + def __lt__(self, other): + if isinstance(other, LoginState): + return self.value < other.value + + return NotImplemented + + def __repr__(self): + return self.__str__() + + +class BaseSecondFactorMethod(ABC): + def __init__(self, account: "BaseAppleAccount"): + self._account = account + + @property + def account(self): + return self._account + + @abstractmethod + def request(self) -> None: + raise NotImplementedError() + + @abstractmethod + def submit(self, code: str) -> LoginState: + raise NotImplementedError() + + +class BaseAppleAccount(ABC): + @property + @abstractmethod + def login_state(self): + return NotImplemented + + @property + @abstractmethod + def account_name(self): + return NotImplemented + + @property + @abstractmethod + def first_name(self): + return NotImplemented + + @property + @abstractmethod + def last_name(self): + return NotImplemented + + @abstractmethod + def export(self) -> dict: + return NotImplemented + + @abstractmethod + def restore(self, data: dict): + return NotImplemented + + @abstractmethod + def login(self, username: str, password: str) -> LoginState: + return NotImplemented + + @abstractmethod + def get_2fa_methods(self) -> list[BaseSecondFactorMethod]: + return NotImplemented + + @abstractmethod + def sms_2fa_request(self, phone_number_id: int): + return NotImplemented + + @abstractmethod + def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: + return NotImplemented + + @abstractmethod + def fetch_reports( + self, keys: Sequence[KeyPair], date_from: datetime, date_to: datetime + ): + return NotImplemented + + @abstractmethod + def fetch_last_reports( + self, + keys: Sequence[KeyPair], + hours: int = 7 * 24, + ): + return NotImplemented + + @abstractmethod + def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: + return NotImplemented diff --git a/findmy/http.py b/findmy/http.py index 772dc89..bdaeb1f 100644 --- a/findmy/http.py +++ b/findmy/http.py @@ -24,7 +24,7 @@ class HttpSession: def __del__(self) -> None: try: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() loop.call_soon_threadsafe(loop.create_task, self.close()) except RuntimeError: # cannot await closure pass