From e31f9ac745964141cd35e04d0dcd62e616bae8cd Mon Sep 17 00:00:00 2001 From: Mike A Date: Wed, 3 Jan 2024 22:13:16 +0100 Subject: [PATCH] Refactor entire module --- examples/fetch_reports.py | 6 +- examples/fetch_reports_async.py | 6 +- findmy/__init__.py | 14 +- findmy/account.pyi | 117 ------------- findmy/anisette.pyi | 20 --- findmy/base.py | 198 --------------------- findmy/keys.pyi | 21 --- findmy/reports/__init__.py | 16 ++ findmy/{ => reports}/account.py | 289 ++++++++++++++++--------------- findmy/{ => reports}/anisette.py | 2 +- findmy/{ => reports}/keys.py | 0 findmy/{ => reports}/reports.py | 2 +- findmy/reports/state.py | 60 +++++++ findmy/reports/twofactor.py | 172 ++++++++++++++++++ findmy/util/__init__.py | 5 + findmy/util/errors.py | 21 +++ findmy/{ => util}/http.py | 18 +- findmy/util/parsers.py | 16 ++ 18 files changed, 459 insertions(+), 524 deletions(-) delete mode 100644 findmy/account.pyi delete mode 100644 findmy/anisette.pyi delete mode 100644 findmy/base.py delete mode 100644 findmy/keys.pyi create mode 100644 findmy/reports/__init__.py rename findmy/{ => reports}/account.py (77%) rename findmy/{ => reports}/anisette.py (98%) rename findmy/{ => reports}/keys.py (100%) rename findmy/{ => reports}/reports.py (99%) create mode 100644 findmy/reports/state.py create mode 100644 findmy/reports/twofactor.py create mode 100644 findmy/util/__init__.py create mode 100644 findmy/util/errors.py rename findmy/{ => util}/http.py (87%) create mode 100644 findmy/util/parsers.py diff --git a/examples/fetch_reports.py b/examples/fetch_reports.py index cb8e7c2..bee7789 100644 --- a/examples/fetch_reports.py +++ b/examples/fetch_reports.py @@ -2,12 +2,12 @@ import json import logging import os -from findmy import ( +from findmy.reports import ( AppleAccount, LoginState, RemoteAnisetteProvider, - SmsSecondFactor, keys, + SmsSecondFactorMethod, ) # URL to (public or local) anisette server @@ -35,7 +35,7 @@ def login(account: AppleAccount): # Print the (masked) phone numbers for method in methods: - if isinstance(method, SmsSecondFactor): + if isinstance(method, SmsSecondFactorMethod): print(method.phone_number) # Just take the first one to keep things simple diff --git a/examples/fetch_reports_async.py b/examples/fetch_reports_async.py index 6cdd69f..ed736bd 100644 --- a/examples/fetch_reports_async.py +++ b/examples/fetch_reports_async.py @@ -3,11 +3,11 @@ import json import logging import os -from findmy import ( +from findmy.reports import ( AsyncAppleAccount, LoginState, RemoteAnisetteProvider, - SmsSecondFactor, + SmsSecondFactorMethod, keys, ) @@ -36,7 +36,7 @@ async def login(account: AsyncAppleAccount): # Print the (masked) phone numbers for method in methods: - if isinstance(method, SmsSecondFactor): + if isinstance(method, SmsSecondFactorMethod): print(method.phone_number) # Just take the first one to keep things simple diff --git a/findmy/__init__.py b/findmy/__init__.py index 64d6efa..3701cdb 100644 --- a/findmy/__init__.py +++ b/findmy/__init__.py @@ -1,11 +1,5 @@ -"""A package providing everything you need to query Apple's FindMy network.""" -from .account import AppleAccount, AsyncAppleAccount, LoginState, SmsSecondFactor -from .anisette import RemoteAnisetteProvider +"""A package providing everything you need to work with Apple's FindMy network.""" +from . import reports +from .util import errors -__all__ = ( - "AppleAccount", - "AsyncAppleAccount", - "LoginState", - "SmsSecondFactor", - "RemoteAnisetteProvider", -) +__all__ = ("reports", "errors") diff --git a/findmy/account.pyi b/findmy/account.pyi deleted file mode 100644 index 7b17a90..0000000 --- a/findmy/account.pyi +++ /dev/null @@ -1,117 +0,0 @@ -from datetime import datetime -from typing import Sequence - -from .anisette import BaseAnisetteProvider as BaseAnisetteProvider -from .base import ( - BaseAppleAccount as BaseAppleAccount, -) -from .base import ( - BaseSecondFactorMethod as BaseSecondFactorMethod, -) -from .base import ( - LoginState as LoginState, -) -from .keys import KeyPair as KeyPair -from .reports import KeyReport as KeyReport - -class LoginError(Exception): ... -class InvalidStateError(RuntimeError): ... -class ExportRestoreError(ValueError): ... - -class AsyncSmsSecondFactor(BaseSecondFactorMethod): - def __init__( - self, - account: AsyncAppleAccount, - number_id: int, - phone_number: str, - ) -> None: ... - @property - def phone_number_id(self) -> int: ... - @property - def phone_number(self) -> str: ... - async def request(self) -> None: ... - async def submit(self, code: str) -> LoginState: ... - -class SmsSecondFactor(BaseSecondFactorMethod): - def __init__( - self, - account: AppleAccount, - number_id: int, - phone_number: str, - ) -> None: ... - @property - def phone_number_id(self) -> int: ... - @property - def phone_number(self) -> str: ... - def request(self) -> None: ... - def submit(self, code: str) -> LoginState: ... - -class AsyncAppleAccount(BaseAppleAccount): - def __init__( - self, - anisette: BaseAnisetteProvider, - user_id: str | None = None, - device_id: str | None = None, - ) -> None: ... - @property - def login_state(self) -> LoginState: ... - @property - def account_name(self) -> str | None: ... - @property - def first_name(self) -> str | None: ... - @property - def last_name(self) -> str | None: ... - def export(self) -> dict: ... - def restore(self, data: dict) -> None: ... - async def close(self) -> None: ... - async def login(self, username: str, password: str) -> LoginState: ... - async def get_2fa_methods(self) -> list[AsyncSmsSecondFactor]: ... - async def sms_2fa_request(self, phone_number_id: int) -> None: ... - async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: ... - async def fetch_reports( - self, - keys: Sequence[KeyPair], - date_from: datetime, - date_to: datetime, - ) -> dict[KeyPair, list[KeyReport]]: ... - async def fetch_last_reports( - self, - keys: Sequence[KeyPair], - hours: int = ..., - ) -> dict[KeyPair, list[KeyReport]]: ... - async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: ... - -class AppleAccount(BaseAppleAccount): - def __init__( - self, - anisette: BaseAnisetteProvider, - user_id: str | None = None, - device_id: str | None = None, - ) -> None: ... - def __del__(self) -> None: ... - @property - def login_state(self) -> LoginState: ... - @property - def account_name(self) -> str: ... - @property - def first_name(self) -> str | None: ... - @property - def last_name(self) -> str | None: ... - def export(self) -> dict: ... - def restore(self, data: dict) -> None: ... - def login(self, username: str, password: str) -> LoginState: ... - def get_2fa_methods(self) -> list[SmsSecondFactor]: ... - def sms_2fa_request(self, phone_number_id: int) -> None: ... - def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: ... - def fetch_reports( - self, - keys: Sequence[KeyPair], - date_from: datetime, - date_to: datetime, - ) -> dict[KeyPair, list[KeyReport]]: ... - def fetch_last_reports( - self, - keys: Sequence[KeyPair], - hours: int = ..., - ) -> dict[KeyPair, list[KeyReport]]: ... - def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: ... diff --git a/findmy/anisette.pyi b/findmy/anisette.pyi deleted file mode 100644 index 3e17c89..0000000 --- a/findmy/anisette.pyi +++ /dev/null @@ -1,20 +0,0 @@ -import abc -from abc import ABC, abstractmethod - -class BaseAnisetteProvider(ABC, metaclass=abc.ABCMeta): - @abstractmethod - async def close(self) -> None: ... - async def get_headers( - self, - user_id: str, - device_id: str, - serial: str = "0", - ) -> dict[str, str]: ... - -class RemoteAnisetteProvider(BaseAnisetteProvider): - def __init__(self, server_url: str) -> None: ... - async def close(self) -> None: ... - -class LocalAnisetteProvider(BaseAnisetteProvider): - def __init__(self) -> None: ... - async def close(self) -> None: ... diff --git a/findmy/base.py b/findmy/base.py deleted file mode 100644 index 1071692..0000000 --- a/findmy/base.py +++ /dev/null @@ -1,198 +0,0 @@ -"""Module that contains base classes for various other modules. For internal use only.""" -from __future__ import annotations - -from abc import ABC, abstractmethod -from enum import Enum -from typing import TYPE_CHECKING, Sequence, TypeVar - -if TYPE_CHECKING: - from datetime import datetime - - from .keys import KeyPair - from .reports import KeyReport - - -class LoginState(Enum): - """Enum of possible login states. Used for `AppleAccount`'s internal state machine.""" - - LOGGED_OUT = 0 - REQUIRE_2FA = 1 - AUTHENTICATED = 2 - LOGGED_IN = 3 - - def __lt__(self, other: LoginState) -> bool: - """ - Compare against another `LoginState`. - - A `LoginState` is said to be "less than" another `LoginState` iff it is in - an "earlier" stage of the login process, going from LOGGED_OUT to LOGGED_IN. - """ - if isinstance(other, LoginState): - return self.value < other.value - - return NotImplemented - - def __repr__(self) -> str: - """Human-readable string representation of the state.""" - return self.__str__() - - -T = TypeVar("T", bound="BaseAppleAccount") - - -class BaseSecondFactorMethod(ABC): - """Base class for a second-factor authentication method for an Apple account.""" - - def __init__(self, account: T) -> None: - """Initialize the second-factor method.""" - self._account: T = account - - @property - def account(self) -> T: - """The account associated with the second-factor method.""" - return self._account - - @abstractmethod - def request(self) -> None: - """ - Put in a request for the second-factor challenge. - - Exact meaning is up to the implementing class. - """ - raise NotImplementedError - - @abstractmethod - def submit(self, code: str) -> LoginState: - """Submit a code to complete the second-factor challenge.""" - raise NotImplementedError - - -class BaseAppleAccount(ABC): - """Base class for an Apple account.""" - - @property - @abstractmethod - def login_state(self) -> LoginState: - """The current login state of the account.""" - raise NotImplementedError - - @property - @abstractmethod - def account_name(self) -> str: - """ - The name of the account as reported by Apple. - - This is usually an e-mail address. - May be None in some cases, such as when not logged in. - """ - raise NotImplementedError - - @property - @abstractmethod - def first_name(self) -> str | None: - """ - First name of the account holder as reported by Apple. - - May be None in some cases, such as when not logged in. - """ - raise NotImplementedError - - @property - @abstractmethod - def last_name(self) -> str | None: - """ - Last name of the account holder as reported by Apple. - - May be None in some cases, such as when not logged in. - """ - raise NotImplementedError - - @abstractmethod - def export(self) -> dict: - """ - Export a representation of the current state of the account as a dictionary. - - The output of this method is guaranteed to be JSON-serializable, and passing - the return value of this function as an argument to `BaseAppleAccount.restore` - will always result in an exact copy of the internal state as it was when exported. - - This method is especially useful to avoid having to keep going through the login flow. - """ - raise NotImplementedError - - @abstractmethod - def restore(self, data: dict) -> None: - """ - Restore a previous export of the internal state of the account. - - See `BaseAppleAccount.export` for more information. - """ - raise NotImplementedError - - @abstractmethod - def login(self, username: str, password: str) -> LoginState: - """Log in to an Apple account using a username and password.""" - raise NotImplementedError - - @abstractmethod - def get_2fa_methods(self) -> list[BaseSecondFactorMethod]: - """ - Get a list of 2FA methods that can be used as a secondary challenge. - - Currently, only SMS-based 2FA methods are supported. - """ - raise NotImplementedError - - @abstractmethod - def sms_2fa_request(self, phone_number_id: int) -> None: - """ - Request a 2FA code to be sent to a specific phone number ID. - - Consider using `BaseSecondFactorMethod.request` instead. - """ - raise NotImplementedError - - @abstractmethod - def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: - """ - Submit a 2FA code that was sent to a specific phone number ID. - - Consider using `BaseSecondFactorMethod.submit` instead. - """ - raise NotImplementedError - - @abstractmethod - def fetch_reports( - self, - keys: Sequence[KeyPair], - date_from: datetime, - date_to: datetime, - ) -> dict[KeyPair, list[KeyReport]]: - """ - Fetch location reports for a sequence of `KeyPair`s between `date_from` and `date_end`. - - Returns a dictionary mapping `KeyPair`s to a list of their location reports. - """ - raise NotImplementedError - - @abstractmethod - def fetch_last_reports( - self, - keys: Sequence[KeyPair], - hours: int = 7 * 24, - ) -> dict[KeyPair, list[KeyReport]]: - """ - Fetch location reports for a sequence of `KeyPair`s for the last `hours` hours. - - Utility method as an alternative to using `BaseAppleAccount.fetch_reports` directly. - """ - raise NotImplementedError - - @abstractmethod - def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: - """ - Retrieve a complete dictionary of Anisette headers. - - Utility method for `AnisetteProvider.get_headers` using this account's user and device ID. - """ - raise NotImplementedError diff --git a/findmy/keys.pyi b/findmy/keys.pyi deleted file mode 100644 index fab1623..0000000 --- a/findmy/keys.pyi +++ /dev/null @@ -1,21 +0,0 @@ -from cryptography.hazmat.primitives.asymmetric import ec - -class KeyPair: - def __init__(self, private_key: bytes) -> None: ... - @classmethod - def generate(cls) -> KeyPair: ... - @classmethod - def from_b64(cls, key_b64: str) -> KeyPair: ... - @property - def private_key_bytes(self) -> bytes: ... - @property - def private_key_b64(self) -> str: ... - @property - def adv_key_bytes(self) -> bytes: ... - @property - def adv_key_b64(self) -> str: ... - @property - def hashed_adv_key_bytes(self) -> bytes: ... - @property - def hashed_adv_key_b64(self) -> str: ... - def dh_exchange(self, other_pub_key: ec.EllipticCurvePublicKey) -> bytes: ... diff --git a/findmy/reports/__init__.py b/findmy/reports/__init__.py new file mode 100644 index 0000000..932bba0 --- /dev/null +++ b/findmy/reports/__init__.py @@ -0,0 +1,16 @@ +"""Code related to fetching location reports.""" +from .account import AppleAccount, AsyncAppleAccount +from .anisette import RemoteAnisetteProvider +from .keys import KeyPair +from .state import LoginState +from .twofactor import SecondFactorType, SmsSecondFactorMethod + +__all__ = ( + "AppleAccount", + "AsyncAppleAccount", + "LoginState", + "RemoteAnisetteProvider", + "KeyPair", + "SecondFactorType", + "SmsSecondFactorMethod", +) diff --git a/findmy/account.py b/findmy/reports/account.py similarity index 77% rename from findmy/account.py rename to findmy/reports/account.py index 234f304..832ec9b 100644 --- a/findmy/account.py +++ b/findmy/reports/account.py @@ -9,17 +9,13 @@ import json import logging import plistlib import uuid +from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone -from functools import wraps from typing import ( TYPE_CHECKING, Any, - Callable, - Concatenate, - ParamSpec, Sequence, TypedDict, - TypeVar, ) import bs4 @@ -28,9 +24,18 @@ from cryptography.hazmat.primitives import hashes, padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from .base import BaseAppleAccount, BaseSecondFactorMethod, LoginState -from .http import HttpSession, decode_plist +from findmy.util import HttpSession, decode_plist +from findmy.util.errors import InvalidCredentialsError, UnhandledProtocolError + from .reports import KeyReport, fetch_reports +from .state import LoginState, require_login_state +from .twofactor import ( + AsyncSecondFactorMethod, + AsyncSmsSecondFactor, + BaseSecondFactorMethod, + SyncSecondFactorMethod, + SyncSmsSecondFactor, +) if TYPE_CHECKING: from .anisette import BaseAnisetteProvider @@ -48,22 +53,6 @@ class _AccountInfo(TypedDict): last_name: str -class LoginError(Exception): - """Raised when an error occurs during login, such as when the password is incorrect.""" - - -class InvalidStateError(RuntimeError): - """ - Raised when a method is used that is in conflict with the internal account state. - - For example: calling `BaseAppleAccount.login` while already logged in. - """ - - -class ExportRestoreError(ValueError): - """Raised when an error occurs while exporting or restoring the account's current state.""" - - def _encrypt_password(password: str, salt: bytes, iterations: int) -> bytes: p = hashlib.sha256(password.encode("utf-8")).digest() kdf = PBKDF2HMAC( @@ -101,108 +90,135 @@ def _extract_phone_numbers(html: str) -> list[dict]: return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) -P = ParamSpec("P") -R = TypeVar("R") -A = TypeVar("A", bound=BaseAppleAccount) -F = Callable[Concatenate[A, P], R] +class BaseAppleAccount(ABC): + """Base class for an Apple account.""" + @property + @abstractmethod + def login_state(self) -> LoginState: + """The current login state of the account.""" + raise NotImplementedError -def _require_login_state(*states: LoginState) -> Callable[[F], F]: - def decorator(func: F) -> F: - @wraps(func) - def wrapper(acc: A, *args: P.args, **kwargs: P.kwargs) -> R: - if acc.login_state not in states: - msg = ( - f"Invalid login state! Currently: {acc.login_state}" - f" but should be one of: {states}" - ) - raise InvalidStateError(msg) + @property + @abstractmethod + def account_name(self) -> str: + """ + The name of the account as reported by Apple. - return func(acc, *args, **kwargs) + This is usually an e-mail address. + May be None in some cases, such as when not logged in. + """ + raise NotImplementedError - return wrapper + @property + @abstractmethod + def first_name(self) -> str | None: + """ + First name of the account holder as reported by Apple. - return decorator + May be None in some cases, such as when not logged in. + """ + raise NotImplementedError + @property + @abstractmethod + def last_name(self) -> str | None: + """ + Last name of the account holder as reported by Apple. -class AsyncSmsSecondFactor(BaseSecondFactorMethod): - """An async implementation of a second-factor method.""" + May be None in some cases, such as when not logged in. + """ + raise NotImplementedError - def __init__( + @abstractmethod + def export(self) -> dict: + """ + Export a representation of the current state of the account as a dictionary. + + The output of this method is guaranteed to be JSON-serializable, and passing + the return value of this function as an argument to `BaseAppleAccount.restore` + will always result in an exact copy of the internal state as it was when exported. + + This method is especially useful to avoid having to keep going through the login flow. + """ + raise NotImplementedError + + @abstractmethod + def restore(self, data: dict) -> None: + """ + Restore a previous export of the internal state of the account. + + See `BaseAppleAccount.export` for more information. + """ + raise NotImplementedError + + @abstractmethod + def login(self, username: str, password: str) -> LoginState: + """Log in to an Apple account using a username and password.""" + raise NotImplementedError + + @abstractmethod + def get_2fa_methods(self) -> list[BaseSecondFactorMethod]: + """ + Get a list of 2FA methods that can be used as a secondary challenge. + + Currently, only SMS-based 2FA methods are supported. + """ + raise NotImplementedError + + @abstractmethod + def sms_2fa_request(self, phone_number_id: int) -> None: + """ + Request a 2FA code to be sent to a specific phone number ID. + + Consider using `BaseSecondFactorMethod.request` instead. + """ + raise NotImplementedError + + @abstractmethod + def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: + """ + Submit a 2FA code that was sent to a specific phone number ID. + + Consider using `BaseSecondFactorMethod.submit` instead. + """ + raise NotImplementedError + + @abstractmethod + def fetch_reports( self, - account: AsyncAppleAccount, - number_id: int, - phone_number: str, - ) -> None: + keys: Sequence[KeyPair], + date_from: datetime, + date_to: datetime, + ) -> dict[KeyPair, list[KeyReport]]: """ - Initialize the second factor method. + Fetch location reports for a sequence of `KeyPair`s between `date_from` and `date_end`. - Should not be done manually; use `BaseAppleAccount.get_2fa_methods` instead. + Returns a dictionary mapping `KeyPair`s to a list of their location reports. """ - super().__init__(account) + raise NotImplementedError - self._phone_number_id: int = number_id - self._phone_number: str = phone_number - - @property - def phone_number_id(self) -> int: - """The phone number's ID. You most likely don't need this.""" - return self._phone_number_id - - @property - def phone_number(self) -> str: - """ - The 2FA method's phone number. - - May be masked using unicode characters; should only be used for identification purposes. - """ - return self._phone_number - - async def request(self) -> None: - """Request an SMS to the corresponding phone number containing a 2FA code.""" - return await self.account.sms_2fa_request(self._phone_number_id) - - async def submit(self, code: str) -> LoginState: - """See `BaseSecondFactorMethod.submit`.""" - return await self.account.sms_2fa_submit(self._phone_number_id, code) - - -class SmsSecondFactor(BaseSecondFactorMethod): - """ - A sync implementation of `BaseSecondFactorMethod`. - - Uses `AsyncSmsSecondFactor` internally. - """ - - def __init__( + @abstractmethod + def fetch_last_reports( self, - account: AppleAccount, - number_id: int, - phone_number: str, - ) -> None: - """See `AsyncSmsSecondFactor.__init__`.""" - super().__init__(account) + keys: Sequence[KeyPair], + hours: int = 7 * 24, + ) -> dict[KeyPair, list[KeyReport]]: + """ + Fetch location reports for a sequence of `KeyPair`s for the last `hours` hours. - self._phone_number_id: int = number_id - self._phone_number: str = phone_number + Utility method as an alternative to using `BaseAppleAccount.fetch_reports` directly. + """ + raise NotImplementedError - @property - def phone_number_id(self) -> int: - """See `AsyncSmsSecondFactor.phone_number_id`.""" - return self._phone_number_id + @abstractmethod + def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: + """ + Retrieve a complete dictionary of Anisette headers. - @property - def phone_number(self) -> str: - """See `AsyncSmsSecondFactor.phone_number`.""" - return self._phone_number - - def request(self) -> None: - """See `AsyncSmsSecondFactor.request`.""" - return self.account.sms_2fa_request(self._phone_number_id) - - def submit(self, code: str) -> LoginState: - """See `AsyncSmsSecondFactor.submit`.""" - return self.account.sms_2fa_submit(self._phone_number_id, code) + Utility method for `AnisetteProvider.get_headers` using this account's user and device ID. + """ + raise NotImplementedError class AsyncAppleAccount(BaseAppleAccount): @@ -257,7 +273,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._login_state @property - @_require_login_state( + @require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -267,7 +283,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._account_info["account_name"] if self._account_info else None @property - @_require_login_state( + @require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -277,7 +293,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._account_info["first_name"] if self._account_info else None @property - @_require_login_state( + @require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -315,7 +331,7 @@ class AsyncAppleAccount(BaseAppleAccount): self._login_state_data = data["login_state"]["data"] except KeyError as e: msg = f"Failed to restore account data: {e}" - raise ExportRestoreError(msg) from None + raise ValueError(msg) from None async def close(self) -> None: """ @@ -326,7 +342,7 @@ class AsyncAppleAccount(BaseAppleAccount): await self._anisette.close() await self._http.close() - @_require_login_state(LoginState.LOGGED_OUT) + @require_login_state(LoginState.LOGGED_OUT) async def login(self, username: str, password: str) -> LoginState: """See `BaseAppleAccount.login`.""" # LOGGED_OUT -> (REQUIRE_2FA or AUTHENTICATED) @@ -337,10 +353,10 @@ class AsyncAppleAccount(BaseAppleAccount): # AUTHENTICATED -> LOGGED_IN return await self._login_mobileme() - @_require_login_state(LoginState.REQUIRE_2FA) - async def get_2fa_methods(self) -> list[AsyncSmsSecondFactor]: + @require_login_state(LoginState.REQUIRE_2FA) + async def get_2fa_methods(self) -> list[AsyncSecondFactorMethod]: """See `BaseAppleAccount.get_2fa_methods`.""" - methods: list[AsyncSmsSecondFactor] = [] + methods: list[AsyncSecondFactorMethod] = [] # sms auth_page = await self._sms_2fa_request("GET", "https://gsa.apple.com/auth") @@ -359,7 +375,7 @@ class AsyncAppleAccount(BaseAppleAccount): return methods - @_require_login_state(LoginState.REQUIRE_2FA) + @require_login_state(LoginState.REQUIRE_2FA) async def sms_2fa_request(self, phone_number_id: int) -> None: """See `BaseAppleAccount.sms_2fa_request`.""" data = {"phoneNumber": {"id": phone_number_id}, "mode": "sms"} @@ -370,7 +386,7 @@ class AsyncAppleAccount(BaseAppleAccount): data, ) - @_require_login_state(LoginState.REQUIRE_2FA) + @require_login_state(LoginState.REQUIRE_2FA) async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: """See `BaseAppleAccount.sms_2fa_submit`.""" data = { @@ -389,12 +405,12 @@ class AsyncAppleAccount(BaseAppleAccount): new_state = await self._gsa_authenticate() if new_state != LoginState.AUTHENTICATED: msg = f"Unexpected state after submitting 2FA: {new_state}" - raise LoginError(msg) + raise UnhandledProtocolError(msg) # AUTHENTICATED -> LOGGED_IN return await self._login_mobileme() - @_require_login_state(LoginState.LOGGED_IN) + @require_login_state(LoginState.LOGGED_IN) async def fetch_reports( self, keys: Sequence[KeyPair], @@ -413,7 +429,7 @@ class AsyncAppleAccount(BaseAppleAccount): keys, ) - @_require_login_state(LoginState.LOGGED_IN) + @require_login_state(LoginState.LOGGED_IN) async def fetch_last_reports( self, keys: Sequence[KeyPair], @@ -425,7 +441,7 @@ class AsyncAppleAccount(BaseAppleAccount): return await self.fetch_reports(keys, start, end) - @_require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA) + @require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA) async def _gsa_authenticate( self, username: str | None = None, @@ -452,12 +468,12 @@ class AsyncAppleAccount(BaseAppleAccount): logging.debug("Verifying response to auth request") if r["Status"].get("ec") != 0: - msg = "Email verify failed: " + r["Status"].get("em") - raise LoginError(msg) + msg = "Email verification failed: " + r["Status"].get("em") + raise InvalidCredentialsError(msg) sp = r.get("sp") if sp != "s2k": msg = f"This implementation only supports s2k. Server returned {sp}" - raise LoginError(msg) + raise UnhandledProtocolError(msg) logging.debug("Attempting password challenge") @@ -465,7 +481,7 @@ class AsyncAppleAccount(BaseAppleAccount): m1 = usr.process_challenge(r["s"], r["B"]) if m1 is None: msg = "Failed to process challenge" - raise LoginError(msg) + raise UnhandledProtocolError(msg) r = await self._gsa_request( {"c": r["c"], "M1": m1, "u": self._username, "o": "complete"}, ) @@ -474,11 +490,11 @@ class AsyncAppleAccount(BaseAppleAccount): if r["Status"].get("ec") != 0: msg = "Password authentication failed: " + r["Status"].get("em") - raise LoginError(msg) + raise InvalidCredentialsError(msg) usr.verify_session(r.get("M2")) if not usr.authenticated(): msg = "Failed to verify session" - raise LoginError(msg) + raise UnhandledProtocolError(msg) logging.debug("Decrypting SPD data in response") @@ -504,7 +520,7 @@ class AsyncAppleAccount(BaseAppleAccount): ) if au is not None: msg = f"Unknown auth value: {au}" - raise LoginError(msg) + raise UnhandledProtocolError(msg) logging.info("GSA authentication successful") @@ -514,7 +530,7 @@ class AsyncAppleAccount(BaseAppleAccount): {"idms_pet": idms_pet, "adsid": spd["adsid"]}, ) - @_require_login_state(LoginState.AUTHENTICATED) + @require_login_state(LoginState.AUTHENTICATED) async def _login_mobileme(self) -> LoginState: logging.info("Logging into com.apple.mobileme") data = plistlib.dumps( @@ -547,7 +563,7 @@ class AsyncAppleAccount(BaseAppleAccount): if status != 0: status_message = mobileme_data.get("status-message") msg = f"com.apple.mobileme login failed with status {status}: {status_message}" - raise LoginError(msg) + raise UnhandledProtocolError(msg) return self._set_login_state( LoginState.LOGGED_IN, @@ -582,8 +598,8 @@ class AsyncAppleAccount(BaseAppleAccount): headers=headers, ) if not r.ok: - msg = f"HTTP request failed: {r.status_code}" - raise LoginError(msg) + msg = f"SMS 2FA request failed: {r.status_code}" + raise UnhandledProtocolError(msg) return r.text() @@ -618,6 +634,9 @@ class AsyncAppleAccount(BaseAppleAccount): headers=headers, data=plistlib.dumps(body), ) + if not resp.ok: + msg = f"Error response for GSA request: {resp.status_code}" + raise UnhandledProtocolError(msg) return resp.plist()["Response"] async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: @@ -685,7 +704,7 @@ class AppleAccount(BaseAppleAccount): coro = self._asyncacc.login(username, password) return self._loop.run_until_complete(coro) - def get_2fa_methods(self) -> list[SmsSecondFactor]: + def get_2fa_methods(self) -> list[SyncSecondFactorMethod]: """See `AsyncAppleAccount.get_2fa_methods`.""" coro = self._asyncacc.get_2fa_methods() methods = self._loop.run_until_complete(coro) @@ -693,7 +712,7 @@ class AppleAccount(BaseAppleAccount): res = [] for m in methods: if isinstance(m, AsyncSmsSecondFactor): - res.append(SmsSecondFactor(self, m.phone_number_id, m.phone_number)) + res.append(SyncSmsSecondFactor(self, m.phone_number_id, m.phone_number)) else: msg = ( f"Failed to cast 2FA object to sync alternative: {m}." diff --git a/findmy/anisette.py b/findmy/reports/anisette.py similarity index 98% rename from findmy/anisette.py rename to findmy/reports/anisette.py index c1c3f14..31df608 100644 --- a/findmy/anisette.py +++ b/findmy/reports/anisette.py @@ -7,7 +7,7 @@ import logging from abc import ABC, abstractmethod from datetime import datetime, timezone -from .http import HttpSession +from findmy.util import HttpSession def _gen_meta_headers( diff --git a/findmy/keys.py b/findmy/reports/keys.py similarity index 100% rename from findmy/keys.py rename to findmy/reports/keys.py diff --git a/findmy/reports.py b/findmy/reports/reports.py similarity index 99% rename from findmy/reports.py rename to findmy/reports/reports.py index ebd30cf..93da46f 100644 --- a/findmy/reports.py +++ b/findmy/reports/reports.py @@ -11,7 +11,7 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from .http import HttpSession +from findmy.util import HttpSession if TYPE_CHECKING: from .keys import KeyPair diff --git a/findmy/reports/state.py b/findmy/reports/state.py new file mode 100644 index 0000000..5cf751e --- /dev/null +++ b/findmy/reports/state.py @@ -0,0 +1,60 @@ +"""Code related to internal account state handling.""" +from enum import Enum +from functools import wraps +from typing import TYPE_CHECKING, Callable, Concatenate, ParamSpec, TypeVar + +from findmy.util.errors import InvalidStateError + +if TYPE_CHECKING: + # noinspection PyUnresolvedReferences + from .account import BaseAppleAccount + +P = ParamSpec("P") +R = TypeVar("R") +A = TypeVar("A", bound="BaseAppleAccount") +F = Callable[Concatenate[A, P], R] + + +class LoginState(Enum): + """Enum of possible login states. Used for `AppleAccount`'s internal state machine.""" + + LOGGED_OUT = 0 + REQUIRE_2FA = 1 + AUTHENTICATED = 2 + LOGGED_IN = 3 + + def __lt__(self, other: "LoginState") -> bool: + """ + Compare against another `LoginState`. + + A `LoginState` is said to be "less than" another `LoginState` iff it is in + an "earlier" stage of the login process, going from LOGGED_OUT to LOGGED_IN. + """ + if isinstance(other, LoginState): + return self.value < other.value + + return NotImplemented + + def __repr__(self) -> str: + """Human-readable string representation of the state.""" + return self.__str__() + + +def require_login_state(*states: LoginState) -> Callable[[F], F]: + """Enforce a login state as precondition for a method.""" + + def decorator(func: F) -> F: + @wraps(func) + def wrapper(acc: A, *args: P.args, **kwargs: P.kwargs) -> R: + if acc.login_state not in states: + msg = ( + f"Invalid login state! Currently: {acc.login_state}" + f" but should be one of: {states}" + ) + raise InvalidStateError(msg) + + return func(acc, *args, **kwargs) + + return wrapper + + return decorator diff --git a/findmy/reports/twofactor.py b/findmy/reports/twofactor.py new file mode 100644 index 0000000..c2ff81e --- /dev/null +++ b/findmy/reports/twofactor.py @@ -0,0 +1,172 @@ +"""Public classes related to handling two-factor authentication.""" +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, TypeVar + +from .state import LoginState + +if TYPE_CHECKING: + # noinspection PyUnresolvedReferences + from .account import AppleAccount, AsyncAppleAccount, BaseAppleAccount + +T = TypeVar("T", bound="BaseAppleAccount") + + +class BaseSecondFactorMethod(metaclass=ABCMeta): + """Base class for a second-factor authentication method for an Apple account.""" + + def __init__(self, account: T) -> None: + """Initialize the second-factor method.""" + self._account: T = account + + @property + def account(self) -> T: + """The account associated with the second-factor method.""" + return self._account + + @abstractmethod + def request(self) -> None: + """ + Put in a request for the second-factor challenge. + + Exact meaning is up to the implementing class. + """ + raise NotImplementedError + + @abstractmethod + def submit(self, code: str) -> LoginState: + """Submit a code to complete the second-factor challenge.""" + raise NotImplementedError + + +class AsyncSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta): + """ + An asynchronous implementation of a second-factor authentication method. + + Intended as a base class for actual implementations to inherit from. + """ + + def __init__(self, account: "AsyncAppleAccount") -> None: + """Initialize the second-factor method.""" + super().__init__(account) + + @property + def account(self) -> "AsyncAppleAccount": + """The account associated with the second-factor method.""" + return self._account + + +class SyncSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta): + """ + A synchronous implementation of a second-factor authentication method. + + Intended as a base class for actual implementations to inherit from. + """ + + def __init__(self, account: "AppleAccount") -> None: + """Initialize the second-factor method.""" + super().__init__(account) + + @property + def account(self) -> "AppleAccount": + """The account associated with the second-factor method.""" + return self._account + + +class SmsSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta): + """Base class for SMS-based two-factor authentication.""" + + @property + @abstractmethod + def phone_number_id(self) -> int: + """The phone number's ID. You most likely don't need this.""" + raise NotImplementedError + + @property + @abstractmethod + def phone_number(self) -> str: + """ + The 2FA method's phone number. + + May be masked using unicode characters; should only be used for identification purposes. + """ + raise NotImplementedError + + +class AsyncSmsSecondFactor(AsyncSecondFactorMethod, SmsSecondFactorMethod): + """An async implementation of a second-factor method.""" + + def __init__( + self, + account: "AsyncAppleAccount", + number_id: int, + phone_number: str, + ) -> None: + """ + Initialize the second factor method. + + Should not be done manually; use `AsyncAppleAccount.get_2fa_methods` instead. + """ + super().__init__(account) + + self._phone_number_id: int = number_id + self._phone_number: str = phone_number + + @property + def phone_number_id(self) -> int: + """The phone number's ID. You most likely don't need this.""" + return self._phone_number_id + + @property + def phone_number(self) -> str: + """ + The 2FA method's phone number. + + May be masked using unicode characters; should only be used for identification purposes. + """ + return self._phone_number + + async def request(self) -> None: + """Request an SMS to the corresponding phone number containing a 2FA code.""" + return await self.account.sms_2fa_request(self._phone_number_id) + + async def submit(self, code: str) -> LoginState: + """See `BaseSecondFactorMethod.submit`.""" + return await self.account.sms_2fa_submit(self._phone_number_id, code) + + +class SyncSmsSecondFactor(SyncSecondFactorMethod, SmsSecondFactorMethod): + """ + A sync implementation of `BaseSecondFactorMethod`. + + Uses `AsyncSmsSecondFactor` internally. + """ + + def __init__( + self, + account: "AppleAccount", + number_id: int, + phone_number: str, + ) -> None: + """See `AsyncSmsSecondFactor.__init__`.""" + super().__init__(account) + + self._phone_number_id: int = number_id + self._phone_number: str = phone_number + + @property + def phone_number_id(self) -> int: + """See `AsyncSmsSecondFactor.phone_number_id`.""" + return self._phone_number_id + + @property + def phone_number(self) -> str: + """See `AsyncSmsSecondFactor.phone_number`.""" + return self._phone_number + + def request(self) -> None: + """See `AsyncSmsSecondFactor.request`.""" + return self.account.sms_2fa_request(self._phone_number_id) + + def submit(self, code: str) -> LoginState: + """See `AsyncSmsSecondFactor.submit`.""" + return self.account.sms_2fa_submit(self._phone_number_id, code) diff --git a/findmy/util/__init__.py b/findmy/util/__init__.py new file mode 100644 index 0000000..6ed65f1 --- /dev/null +++ b/findmy/util/__init__.py @@ -0,0 +1,5 @@ +"""Utility functions and classes. Intended for internal use.""" +from .http import HttpResponse, HttpSession +from .parsers import decode_plist + +__all__ = ("HttpSession", "HttpResponse", "decode_plist") diff --git a/findmy/util/errors.py b/findmy/util/errors.py new file mode 100644 index 0000000..2eda554 --- /dev/null +++ b/findmy/util/errors.py @@ -0,0 +1,21 @@ +"""Exception classes.""" + + +class InvalidCredentialsError(Exception): + """Raised when credentials are incorrect.""" + + +class UnhandledProtocolError(RuntimeError): + """ + Raised when an unexpected error occurs while communicating with Apple servers. + + This is almost always a bug, so please report it. + """ + + +class InvalidStateError(RuntimeError): + """ + Raised when a method is used that is in conflict with the internal account state. + + For example: calling `BaseAppleAccount.login` while already logged in. + """ diff --git a/findmy/http.py b/findmy/util/http.py similarity index 87% rename from findmy/http.py rename to findmy/util/http.py index 11913d9..20c9d37 100644 --- a/findmy/http.py +++ b/findmy/util/http.py @@ -1,30 +1,18 @@ -"""Module to simplify asynchronous HTTP calls. For internal use only.""" +"""Module to simplify asynchronous HTTP calls.""" from __future__ import annotations import asyncio import json import logging -import plistlib from typing import Any, ParamSpec from aiohttp import BasicAuth, ClientSession, ClientTimeout +from .parsers import decode_plist + logging.getLogger(__name__) -def decode_plist(data: bytes) -> Any: # noqa: ANN401 - """Decode a plist file.""" - plist_header = ( - b"" - b"" - ) - - if not data.startswith(b" Any: # noqa: ANN401 + """Decode a plist file.""" + plist_header = ( + b"" + b"" + ) + + if not data.startswith(b"