Refactor entire module

This commit is contained in:
Mike A
2024-01-03 22:13:16 +01:00
parent 1be1246675
commit e31f9ac745
18 changed files with 459 additions and 524 deletions

View File

@@ -2,12 +2,12 @@ import json
import logging
import os
from findmy import (
from findmy.reports import (
AppleAccount,
LoginState,
RemoteAnisetteProvider,
SmsSecondFactor,
keys,
SmsSecondFactorMethod,
)
# URL to (public or local) anisette server
@@ -35,7 +35,7 @@ def login(account: AppleAccount):
# Print the (masked) phone numbers
for method in methods:
if isinstance(method, SmsSecondFactor):
if isinstance(method, SmsSecondFactorMethod):
print(method.phone_number)
# Just take the first one to keep things simple

View File

@@ -3,11 +3,11 @@ import json
import logging
import os
from findmy import (
from findmy.reports import (
AsyncAppleAccount,
LoginState,
RemoteAnisetteProvider,
SmsSecondFactor,
SmsSecondFactorMethod,
keys,
)
@@ -36,7 +36,7 @@ async def login(account: AsyncAppleAccount):
# Print the (masked) phone numbers
for method in methods:
if isinstance(method, SmsSecondFactor):
if isinstance(method, SmsSecondFactorMethod):
print(method.phone_number)
# Just take the first one to keep things simple

View File

@@ -1,11 +1,5 @@
"""A package providing everything you need to query Apple's FindMy network."""
from .account import AppleAccount, AsyncAppleAccount, LoginState, SmsSecondFactor
from .anisette import RemoteAnisetteProvider
"""A package providing everything you need to work with Apple's FindMy network."""
from . import reports
from .util import errors
__all__ = (
"AppleAccount",
"AsyncAppleAccount",
"LoginState",
"SmsSecondFactor",
"RemoteAnisetteProvider",
)
__all__ = ("reports", "errors")

View File

@@ -1,117 +0,0 @@
from datetime import datetime
from typing import Sequence
from .anisette import BaseAnisetteProvider as BaseAnisetteProvider
from .base import (
BaseAppleAccount as BaseAppleAccount,
)
from .base import (
BaseSecondFactorMethod as BaseSecondFactorMethod,
)
from .base import (
LoginState as LoginState,
)
from .keys import KeyPair as KeyPair
from .reports import KeyReport as KeyReport
class LoginError(Exception): ...
class InvalidStateError(RuntimeError): ...
class ExportRestoreError(ValueError): ...
class AsyncSmsSecondFactor(BaseSecondFactorMethod):
def __init__(
self,
account: AsyncAppleAccount,
number_id: int,
phone_number: str,
) -> None: ...
@property
def phone_number_id(self) -> int: ...
@property
def phone_number(self) -> str: ...
async def request(self) -> None: ...
async def submit(self, code: str) -> LoginState: ...
class SmsSecondFactor(BaseSecondFactorMethod):
def __init__(
self,
account: AppleAccount,
number_id: int,
phone_number: str,
) -> None: ...
@property
def phone_number_id(self) -> int: ...
@property
def phone_number(self) -> str: ...
def request(self) -> None: ...
def submit(self, code: str) -> LoginState: ...
class AsyncAppleAccount(BaseAppleAccount):
def __init__(
self,
anisette: BaseAnisetteProvider,
user_id: str | None = None,
device_id: str | None = None,
) -> None: ...
@property
def login_state(self) -> LoginState: ...
@property
def account_name(self) -> str | None: ...
@property
def first_name(self) -> str | None: ...
@property
def last_name(self) -> str | None: ...
def export(self) -> dict: ...
def restore(self, data: dict) -> None: ...
async def close(self) -> None: ...
async def login(self, username: str, password: str) -> LoginState: ...
async def get_2fa_methods(self) -> list[AsyncSmsSecondFactor]: ...
async def sms_2fa_request(self, phone_number_id: int) -> None: ...
async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: ...
async def fetch_reports(
self,
keys: Sequence[KeyPair],
date_from: datetime,
date_to: datetime,
) -> dict[KeyPair, list[KeyReport]]: ...
async def fetch_last_reports(
self,
keys: Sequence[KeyPair],
hours: int = ...,
) -> dict[KeyPair, list[KeyReport]]: ...
async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: ...
class AppleAccount(BaseAppleAccount):
def __init__(
self,
anisette: BaseAnisetteProvider,
user_id: str | None = None,
device_id: str | None = None,
) -> None: ...
def __del__(self) -> None: ...
@property
def login_state(self) -> LoginState: ...
@property
def account_name(self) -> str: ...
@property
def first_name(self) -> str | None: ...
@property
def last_name(self) -> str | None: ...
def export(self) -> dict: ...
def restore(self, data: dict) -> None: ...
def login(self, username: str, password: str) -> LoginState: ...
def get_2fa_methods(self) -> list[SmsSecondFactor]: ...
def sms_2fa_request(self, phone_number_id: int) -> None: ...
def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: ...
def fetch_reports(
self,
keys: Sequence[KeyPair],
date_from: datetime,
date_to: datetime,
) -> dict[KeyPair, list[KeyReport]]: ...
def fetch_last_reports(
self,
keys: Sequence[KeyPair],
hours: int = ...,
) -> dict[KeyPair, list[KeyReport]]: ...
def get_anisette_headers(self, serial: str = "0") -> dict[str, str]: ...

View File

@@ -1,20 +0,0 @@
import abc
from abc import ABC, abstractmethod
class BaseAnisetteProvider(ABC, metaclass=abc.ABCMeta):
@abstractmethod
async def close(self) -> None: ...
async def get_headers(
self,
user_id: str,
device_id: str,
serial: str = "0",
) -> dict[str, str]: ...
class RemoteAnisetteProvider(BaseAnisetteProvider):
def __init__(self, server_url: str) -> None: ...
async def close(self) -> None: ...
class LocalAnisetteProvider(BaseAnisetteProvider):
def __init__(self) -> None: ...
async def close(self) -> None: ...

View File

@@ -1,198 +0,0 @@
"""Module that contains base classes for various other modules. For internal use only."""
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Sequence, TypeVar
if TYPE_CHECKING:
from datetime import datetime
from .keys import KeyPair
from .reports import KeyReport
class LoginState(Enum):
"""Enum of possible login states. Used for `AppleAccount`'s internal state machine."""
LOGGED_OUT = 0
REQUIRE_2FA = 1
AUTHENTICATED = 2
LOGGED_IN = 3
def __lt__(self, other: LoginState) -> bool:
"""
Compare against another `LoginState`.
A `LoginState` is said to be "less than" another `LoginState` iff it is in
an "earlier" stage of the login process, going from LOGGED_OUT to LOGGED_IN.
"""
if isinstance(other, LoginState):
return self.value < other.value
return NotImplemented
def __repr__(self) -> str:
"""Human-readable string representation of the state."""
return self.__str__()
T = TypeVar("T", bound="BaseAppleAccount")
class BaseSecondFactorMethod(ABC):
"""Base class for a second-factor authentication method for an Apple account."""
def __init__(self, account: T) -> None:
"""Initialize the second-factor method."""
self._account: T = account
@property
def account(self) -> T:
"""The account associated with the second-factor method."""
return self._account
@abstractmethod
def request(self) -> None:
"""
Put in a request for the second-factor challenge.
Exact meaning is up to the implementing class.
"""
raise NotImplementedError
@abstractmethod
def submit(self, code: str) -> LoginState:
"""Submit a code to complete the second-factor challenge."""
raise NotImplementedError
class BaseAppleAccount(ABC):
"""Base class for an Apple account."""
@property
@abstractmethod
def login_state(self) -> LoginState:
"""The current login state of the account."""
raise NotImplementedError
@property
@abstractmethod
def account_name(self) -> str:
"""
The name of the account as reported by Apple.
This is usually an e-mail address.
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
@property
@abstractmethod
def first_name(self) -> str | None:
"""
First name of the account holder as reported by Apple.
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
@property
@abstractmethod
def last_name(self) -> str | None:
"""
Last name of the account holder as reported by Apple.
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
@abstractmethod
def export(self) -> dict:
"""
Export a representation of the current state of the account as a dictionary.
The output of this method is guaranteed to be JSON-serializable, and passing
the return value of this function as an argument to `BaseAppleAccount.restore`
will always result in an exact copy of the internal state as it was when exported.
This method is especially useful to avoid having to keep going through the login flow.
"""
raise NotImplementedError
@abstractmethod
def restore(self, data: dict) -> None:
"""
Restore a previous export of the internal state of the account.
See `BaseAppleAccount.export` for more information.
"""
raise NotImplementedError
@abstractmethod
def login(self, username: str, password: str) -> LoginState:
"""Log in to an Apple account using a username and password."""
raise NotImplementedError
@abstractmethod
def get_2fa_methods(self) -> list[BaseSecondFactorMethod]:
"""
Get a list of 2FA methods that can be used as a secondary challenge.
Currently, only SMS-based 2FA methods are supported.
"""
raise NotImplementedError
@abstractmethod
def sms_2fa_request(self, phone_number_id: int) -> None:
"""
Request a 2FA code to be sent to a specific phone number ID.
Consider using `BaseSecondFactorMethod.request` instead.
"""
raise NotImplementedError
@abstractmethod
def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState:
"""
Submit a 2FA code that was sent to a specific phone number ID.
Consider using `BaseSecondFactorMethod.submit` instead.
"""
raise NotImplementedError
@abstractmethod
def fetch_reports(
self,
keys: Sequence[KeyPair],
date_from: datetime,
date_to: datetime,
) -> dict[KeyPair, list[KeyReport]]:
"""
Fetch location reports for a sequence of `KeyPair`s between `date_from` and `date_end`.
Returns a dictionary mapping `KeyPair`s to a list of their location reports.
"""
raise NotImplementedError
@abstractmethod
def fetch_last_reports(
self,
keys: Sequence[KeyPair],
hours: int = 7 * 24,
) -> dict[KeyPair, list[KeyReport]]:
"""
Fetch location reports for a sequence of `KeyPair`s for the last `hours` hours.
Utility method as an alternative to using `BaseAppleAccount.fetch_reports` directly.
"""
raise NotImplementedError
@abstractmethod
def get_anisette_headers(self, serial: str = "0") -> dict[str, str]:
"""
Retrieve a complete dictionary of Anisette headers.
Utility method for `AnisetteProvider.get_headers` using this account's user and device ID.
"""
raise NotImplementedError

View File

@@ -1,21 +0,0 @@
from cryptography.hazmat.primitives.asymmetric import ec
class KeyPair:
def __init__(self, private_key: bytes) -> None: ...
@classmethod
def generate(cls) -> KeyPair: ...
@classmethod
def from_b64(cls, key_b64: str) -> KeyPair: ...
@property
def private_key_bytes(self) -> bytes: ...
@property
def private_key_b64(self) -> str: ...
@property
def adv_key_bytes(self) -> bytes: ...
@property
def adv_key_b64(self) -> str: ...
@property
def hashed_adv_key_bytes(self) -> bytes: ...
@property
def hashed_adv_key_b64(self) -> str: ...
def dh_exchange(self, other_pub_key: ec.EllipticCurvePublicKey) -> bytes: ...

View File

@@ -0,0 +1,16 @@
"""Code related to fetching location reports."""
from .account import AppleAccount, AsyncAppleAccount
from .anisette import RemoteAnisetteProvider
from .keys import KeyPair
from .state import LoginState
from .twofactor import SecondFactorType, SmsSecondFactorMethod
__all__ = (
"AppleAccount",
"AsyncAppleAccount",
"LoginState",
"RemoteAnisetteProvider",
"KeyPair",
"SecondFactorType",
"SmsSecondFactorMethod",
)

View File

@@ -9,17 +9,13 @@ import json
import logging
import plistlib
import uuid
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Callable,
Concatenate,
ParamSpec,
Sequence,
TypedDict,
TypeVar,
)
import bs4
@@ -28,9 +24,18 @@ from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .base import BaseAppleAccount, BaseSecondFactorMethod, LoginState
from .http import HttpSession, decode_plist
from findmy.util import HttpSession, decode_plist
from findmy.util.errors import InvalidCredentialsError, UnhandledProtocolError
from .reports import KeyReport, fetch_reports
from .state import LoginState, require_login_state
from .twofactor import (
AsyncSecondFactorMethod,
AsyncSmsSecondFactor,
BaseSecondFactorMethod,
SyncSecondFactorMethod,
SyncSmsSecondFactor,
)
if TYPE_CHECKING:
from .anisette import BaseAnisetteProvider
@@ -48,22 +53,6 @@ class _AccountInfo(TypedDict):
last_name: str
class LoginError(Exception):
"""Raised when an error occurs during login, such as when the password is incorrect."""
class InvalidStateError(RuntimeError):
"""
Raised when a method is used that is in conflict with the internal account state.
For example: calling `BaseAppleAccount.login` while already logged in.
"""
class ExportRestoreError(ValueError):
"""Raised when an error occurs while exporting or restoring the account's current state."""
def _encrypt_password(password: str, salt: bytes, iterations: int) -> bytes:
p = hashlib.sha256(password.encode("utf-8")).digest()
kdf = PBKDF2HMAC(
@@ -101,108 +90,135 @@ def _extract_phone_numbers(html: str) -> list[dict]:
return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", [])
P = ParamSpec("P")
R = TypeVar("R")
A = TypeVar("A", bound=BaseAppleAccount)
F = Callable[Concatenate[A, P], R]
class BaseAppleAccount(ABC):
"""Base class for an Apple account."""
@property
@abstractmethod
def login_state(self) -> LoginState:
"""The current login state of the account."""
raise NotImplementedError
def _require_login_state(*states: LoginState) -> Callable[[F], F]:
def decorator(func: F) -> F:
@wraps(func)
def wrapper(acc: A, *args: P.args, **kwargs: P.kwargs) -> R:
if acc.login_state not in states:
msg = (
f"Invalid login state! Currently: {acc.login_state}"
f" but should be one of: {states}"
)
raise InvalidStateError(msg)
@property
@abstractmethod
def account_name(self) -> str:
"""
The name of the account as reported by Apple.
return func(acc, *args, **kwargs)
This is usually an e-mail address.
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
return wrapper
@property
@abstractmethod
def first_name(self) -> str | None:
"""
First name of the account holder as reported by Apple.
return decorator
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
@property
@abstractmethod
def last_name(self) -> str | None:
"""
Last name of the account holder as reported by Apple.
class AsyncSmsSecondFactor(BaseSecondFactorMethod):
"""An async implementation of a second-factor method."""
May be None in some cases, such as when not logged in.
"""
raise NotImplementedError
def __init__(
@abstractmethod
def export(self) -> dict:
"""
Export a representation of the current state of the account as a dictionary.
The output of this method is guaranteed to be JSON-serializable, and passing
the return value of this function as an argument to `BaseAppleAccount.restore`
will always result in an exact copy of the internal state as it was when exported.
This method is especially useful to avoid having to keep going through the login flow.
"""
raise NotImplementedError
@abstractmethod
def restore(self, data: dict) -> None:
"""
Restore a previous export of the internal state of the account.
See `BaseAppleAccount.export` for more information.
"""
raise NotImplementedError
@abstractmethod
def login(self, username: str, password: str) -> LoginState:
"""Log in to an Apple account using a username and password."""
raise NotImplementedError
@abstractmethod
def get_2fa_methods(self) -> list[BaseSecondFactorMethod]:
"""
Get a list of 2FA methods that can be used as a secondary challenge.
Currently, only SMS-based 2FA methods are supported.
"""
raise NotImplementedError
@abstractmethod
def sms_2fa_request(self, phone_number_id: int) -> None:
"""
Request a 2FA code to be sent to a specific phone number ID.
Consider using `BaseSecondFactorMethod.request` instead.
"""
raise NotImplementedError
@abstractmethod
def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState:
"""
Submit a 2FA code that was sent to a specific phone number ID.
Consider using `BaseSecondFactorMethod.submit` instead.
"""
raise NotImplementedError
@abstractmethod
def fetch_reports(
self,
account: AsyncAppleAccount,
number_id: int,
phone_number: str,
) -> None:
keys: Sequence[KeyPair],
date_from: datetime,
date_to: datetime,
) -> dict[KeyPair, list[KeyReport]]:
"""
Initialize the second factor method.
Fetch location reports for a sequence of `KeyPair`s between `date_from` and `date_end`.
Should not be done manually; use `BaseAppleAccount.get_2fa_methods` instead.
Returns a dictionary mapping `KeyPair`s to a list of their location reports.
"""
super().__init__(account)
raise NotImplementedError
self._phone_number_id: int = number_id
self._phone_number: str = phone_number
@property
def phone_number_id(self) -> int:
"""The phone number's ID. You most likely don't need this."""
return self._phone_number_id
@property
def phone_number(self) -> str:
"""
The 2FA method's phone number.
May be masked using unicode characters; should only be used for identification purposes.
"""
return self._phone_number
async def request(self) -> None:
"""Request an SMS to the corresponding phone number containing a 2FA code."""
return await self.account.sms_2fa_request(self._phone_number_id)
async def submit(self, code: str) -> LoginState:
"""See `BaseSecondFactorMethod.submit`."""
return await self.account.sms_2fa_submit(self._phone_number_id, code)
class SmsSecondFactor(BaseSecondFactorMethod):
"""
A sync implementation of `BaseSecondFactorMethod`.
Uses `AsyncSmsSecondFactor` internally.
"""
def __init__(
@abstractmethod
def fetch_last_reports(
self,
account: AppleAccount,
number_id: int,
phone_number: str,
) -> None:
"""See `AsyncSmsSecondFactor.__init__`."""
super().__init__(account)
keys: Sequence[KeyPair],
hours: int = 7 * 24,
) -> dict[KeyPair, list[KeyReport]]:
"""
Fetch location reports for a sequence of `KeyPair`s for the last `hours` hours.
self._phone_number_id: int = number_id
self._phone_number: str = phone_number
Utility method as an alternative to using `BaseAppleAccount.fetch_reports` directly.
"""
raise NotImplementedError
@property
def phone_number_id(self) -> int:
"""See `AsyncSmsSecondFactor.phone_number_id`."""
return self._phone_number_id
@abstractmethod
def get_anisette_headers(self, serial: str = "0") -> dict[str, str]:
"""
Retrieve a complete dictionary of Anisette headers.
@property
def phone_number(self) -> str:
"""See `AsyncSmsSecondFactor.phone_number`."""
return self._phone_number
def request(self) -> None:
"""See `AsyncSmsSecondFactor.request`."""
return self.account.sms_2fa_request(self._phone_number_id)
def submit(self, code: str) -> LoginState:
"""See `AsyncSmsSecondFactor.submit`."""
return self.account.sms_2fa_submit(self._phone_number_id, code)
Utility method for `AnisetteProvider.get_headers` using this account's user and device ID.
"""
raise NotImplementedError
class AsyncAppleAccount(BaseAppleAccount):
@@ -257,7 +273,7 @@ class AsyncAppleAccount(BaseAppleAccount):
return self._login_state
@property
@_require_login_state(
@require_login_state(
LoginState.LOGGED_IN,
LoginState.AUTHENTICATED,
LoginState.REQUIRE_2FA,
@@ -267,7 +283,7 @@ class AsyncAppleAccount(BaseAppleAccount):
return self._account_info["account_name"] if self._account_info else None
@property
@_require_login_state(
@require_login_state(
LoginState.LOGGED_IN,
LoginState.AUTHENTICATED,
LoginState.REQUIRE_2FA,
@@ -277,7 +293,7 @@ class AsyncAppleAccount(BaseAppleAccount):
return self._account_info["first_name"] if self._account_info else None
@property
@_require_login_state(
@require_login_state(
LoginState.LOGGED_IN,
LoginState.AUTHENTICATED,
LoginState.REQUIRE_2FA,
@@ -315,7 +331,7 @@ class AsyncAppleAccount(BaseAppleAccount):
self._login_state_data = data["login_state"]["data"]
except KeyError as e:
msg = f"Failed to restore account data: {e}"
raise ExportRestoreError(msg) from None
raise ValueError(msg) from None
async def close(self) -> None:
"""
@@ -326,7 +342,7 @@ class AsyncAppleAccount(BaseAppleAccount):
await self._anisette.close()
await self._http.close()
@_require_login_state(LoginState.LOGGED_OUT)
@require_login_state(LoginState.LOGGED_OUT)
async def login(self, username: str, password: str) -> LoginState:
"""See `BaseAppleAccount.login`."""
# LOGGED_OUT -> (REQUIRE_2FA or AUTHENTICATED)
@@ -337,10 +353,10 @@ class AsyncAppleAccount(BaseAppleAccount):
# AUTHENTICATED -> LOGGED_IN
return await self._login_mobileme()
@_require_login_state(LoginState.REQUIRE_2FA)
async def get_2fa_methods(self) -> list[AsyncSmsSecondFactor]:
@require_login_state(LoginState.REQUIRE_2FA)
async def get_2fa_methods(self) -> list[AsyncSecondFactorMethod]:
"""See `BaseAppleAccount.get_2fa_methods`."""
methods: list[AsyncSmsSecondFactor] = []
methods: list[AsyncSecondFactorMethod] = []
# sms
auth_page = await self._sms_2fa_request("GET", "https://gsa.apple.com/auth")
@@ -359,7 +375,7 @@ class AsyncAppleAccount(BaseAppleAccount):
return methods
@_require_login_state(LoginState.REQUIRE_2FA)
@require_login_state(LoginState.REQUIRE_2FA)
async def sms_2fa_request(self, phone_number_id: int) -> None:
"""See `BaseAppleAccount.sms_2fa_request`."""
data = {"phoneNumber": {"id": phone_number_id}, "mode": "sms"}
@@ -370,7 +386,7 @@ class AsyncAppleAccount(BaseAppleAccount):
data,
)
@_require_login_state(LoginState.REQUIRE_2FA)
@require_login_state(LoginState.REQUIRE_2FA)
async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState:
"""See `BaseAppleAccount.sms_2fa_submit`."""
data = {
@@ -389,12 +405,12 @@ class AsyncAppleAccount(BaseAppleAccount):
new_state = await self._gsa_authenticate()
if new_state != LoginState.AUTHENTICATED:
msg = f"Unexpected state after submitting 2FA: {new_state}"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
# AUTHENTICATED -> LOGGED_IN
return await self._login_mobileme()
@_require_login_state(LoginState.LOGGED_IN)
@require_login_state(LoginState.LOGGED_IN)
async def fetch_reports(
self,
keys: Sequence[KeyPair],
@@ -413,7 +429,7 @@ class AsyncAppleAccount(BaseAppleAccount):
keys,
)
@_require_login_state(LoginState.LOGGED_IN)
@require_login_state(LoginState.LOGGED_IN)
async def fetch_last_reports(
self,
keys: Sequence[KeyPair],
@@ -425,7 +441,7 @@ class AsyncAppleAccount(BaseAppleAccount):
return await self.fetch_reports(keys, start, end)
@_require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA)
@require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA)
async def _gsa_authenticate(
self,
username: str | None = None,
@@ -452,12 +468,12 @@ class AsyncAppleAccount(BaseAppleAccount):
logging.debug("Verifying response to auth request")
if r["Status"].get("ec") != 0:
msg = "Email verify failed: " + r["Status"].get("em")
raise LoginError(msg)
msg = "Email verification failed: " + r["Status"].get("em")
raise InvalidCredentialsError(msg)
sp = r.get("sp")
if sp != "s2k":
msg = f"This implementation only supports s2k. Server returned {sp}"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
logging.debug("Attempting password challenge")
@@ -465,7 +481,7 @@ class AsyncAppleAccount(BaseAppleAccount):
m1 = usr.process_challenge(r["s"], r["B"])
if m1 is None:
msg = "Failed to process challenge"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
r = await self._gsa_request(
{"c": r["c"], "M1": m1, "u": self._username, "o": "complete"},
)
@@ -474,11 +490,11 @@ class AsyncAppleAccount(BaseAppleAccount):
if r["Status"].get("ec") != 0:
msg = "Password authentication failed: " + r["Status"].get("em")
raise LoginError(msg)
raise InvalidCredentialsError(msg)
usr.verify_session(r.get("M2"))
if not usr.authenticated():
msg = "Failed to verify session"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
logging.debug("Decrypting SPD data in response")
@@ -504,7 +520,7 @@ class AsyncAppleAccount(BaseAppleAccount):
)
if au is not None:
msg = f"Unknown auth value: {au}"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
logging.info("GSA authentication successful")
@@ -514,7 +530,7 @@ class AsyncAppleAccount(BaseAppleAccount):
{"idms_pet": idms_pet, "adsid": spd["adsid"]},
)
@_require_login_state(LoginState.AUTHENTICATED)
@require_login_state(LoginState.AUTHENTICATED)
async def _login_mobileme(self) -> LoginState:
logging.info("Logging into com.apple.mobileme")
data = plistlib.dumps(
@@ -547,7 +563,7 @@ class AsyncAppleAccount(BaseAppleAccount):
if status != 0:
status_message = mobileme_data.get("status-message")
msg = f"com.apple.mobileme login failed with status {status}: {status_message}"
raise LoginError(msg)
raise UnhandledProtocolError(msg)
return self._set_login_state(
LoginState.LOGGED_IN,
@@ -582,8 +598,8 @@ class AsyncAppleAccount(BaseAppleAccount):
headers=headers,
)
if not r.ok:
msg = f"HTTP request failed: {r.status_code}"
raise LoginError(msg)
msg = f"SMS 2FA request failed: {r.status_code}"
raise UnhandledProtocolError(msg)
return r.text()
@@ -618,6 +634,9 @@ class AsyncAppleAccount(BaseAppleAccount):
headers=headers,
data=plistlib.dumps(body),
)
if not resp.ok:
msg = f"Error response for GSA request: {resp.status_code}"
raise UnhandledProtocolError(msg)
return resp.plist()["Response"]
async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]:
@@ -685,7 +704,7 @@ class AppleAccount(BaseAppleAccount):
coro = self._asyncacc.login(username, password)
return self._loop.run_until_complete(coro)
def get_2fa_methods(self) -> list[SmsSecondFactor]:
def get_2fa_methods(self) -> list[SyncSecondFactorMethod]:
"""See `AsyncAppleAccount.get_2fa_methods`."""
coro = self._asyncacc.get_2fa_methods()
methods = self._loop.run_until_complete(coro)
@@ -693,7 +712,7 @@ class AppleAccount(BaseAppleAccount):
res = []
for m in methods:
if isinstance(m, AsyncSmsSecondFactor):
res.append(SmsSecondFactor(self, m.phone_number_id, m.phone_number))
res.append(SyncSmsSecondFactor(self, m.phone_number_id, m.phone_number))
else:
msg = (
f"Failed to cast 2FA object to sync alternative: {m}."

View File

@@ -7,7 +7,7 @@ import logging
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from .http import HttpSession
from findmy.util import HttpSession
def _gen_meta_headers(

View File

@@ -11,7 +11,7 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .http import HttpSession
from findmy.util import HttpSession
if TYPE_CHECKING:
from .keys import KeyPair

60
findmy/reports/state.py Normal file
View File

@@ -0,0 +1,60 @@
"""Code related to internal account state handling."""
from enum import Enum
from functools import wraps
from typing import TYPE_CHECKING, Callable, Concatenate, ParamSpec, TypeVar
from findmy.util.errors import InvalidStateError
if TYPE_CHECKING:
# noinspection PyUnresolvedReferences
from .account import BaseAppleAccount
P = ParamSpec("P")
R = TypeVar("R")
A = TypeVar("A", bound="BaseAppleAccount")
F = Callable[Concatenate[A, P], R]
class LoginState(Enum):
"""Enum of possible login states. Used for `AppleAccount`'s internal state machine."""
LOGGED_OUT = 0
REQUIRE_2FA = 1
AUTHENTICATED = 2
LOGGED_IN = 3
def __lt__(self, other: "LoginState") -> bool:
"""
Compare against another `LoginState`.
A `LoginState` is said to be "less than" another `LoginState` iff it is in
an "earlier" stage of the login process, going from LOGGED_OUT to LOGGED_IN.
"""
if isinstance(other, LoginState):
return self.value < other.value
return NotImplemented
def __repr__(self) -> str:
"""Human-readable string representation of the state."""
return self.__str__()
def require_login_state(*states: LoginState) -> Callable[[F], F]:
"""Enforce a login state as precondition for a method."""
def decorator(func: F) -> F:
@wraps(func)
def wrapper(acc: A, *args: P.args, **kwargs: P.kwargs) -> R:
if acc.login_state not in states:
msg = (
f"Invalid login state! Currently: {acc.login_state}"
f" but should be one of: {states}"
)
raise InvalidStateError(msg)
return func(acc, *args, **kwargs)
return wrapper
return decorator

172
findmy/reports/twofactor.py Normal file
View File

@@ -0,0 +1,172 @@
"""Public classes related to handling two-factor authentication."""
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, TypeVar
from .state import LoginState
if TYPE_CHECKING:
# noinspection PyUnresolvedReferences
from .account import AppleAccount, AsyncAppleAccount, BaseAppleAccount
T = TypeVar("T", bound="BaseAppleAccount")
class BaseSecondFactorMethod(metaclass=ABCMeta):
"""Base class for a second-factor authentication method for an Apple account."""
def __init__(self, account: T) -> None:
"""Initialize the second-factor method."""
self._account: T = account
@property
def account(self) -> T:
"""The account associated with the second-factor method."""
return self._account
@abstractmethod
def request(self) -> None:
"""
Put in a request for the second-factor challenge.
Exact meaning is up to the implementing class.
"""
raise NotImplementedError
@abstractmethod
def submit(self, code: str) -> LoginState:
"""Submit a code to complete the second-factor challenge."""
raise NotImplementedError
class AsyncSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta):
"""
An asynchronous implementation of a second-factor authentication method.
Intended as a base class for actual implementations to inherit from.
"""
def __init__(self, account: "AsyncAppleAccount") -> None:
"""Initialize the second-factor method."""
super().__init__(account)
@property
def account(self) -> "AsyncAppleAccount":
"""The account associated with the second-factor method."""
return self._account
class SyncSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta):
"""
A synchronous implementation of a second-factor authentication method.
Intended as a base class for actual implementations to inherit from.
"""
def __init__(self, account: "AppleAccount") -> None:
"""Initialize the second-factor method."""
super().__init__(account)
@property
def account(self) -> "AppleAccount":
"""The account associated with the second-factor method."""
return self._account
class SmsSecondFactorMethod(BaseSecondFactorMethod, metaclass=ABCMeta):
"""Base class for SMS-based two-factor authentication."""
@property
@abstractmethod
def phone_number_id(self) -> int:
"""The phone number's ID. You most likely don't need this."""
raise NotImplementedError
@property
@abstractmethod
def phone_number(self) -> str:
"""
The 2FA method's phone number.
May be masked using unicode characters; should only be used for identification purposes.
"""
raise NotImplementedError
class AsyncSmsSecondFactor(AsyncSecondFactorMethod, SmsSecondFactorMethod):
"""An async implementation of a second-factor method."""
def __init__(
self,
account: "AsyncAppleAccount",
number_id: int,
phone_number: str,
) -> None:
"""
Initialize the second factor method.
Should not be done manually; use `AsyncAppleAccount.get_2fa_methods` instead.
"""
super().__init__(account)
self._phone_number_id: int = number_id
self._phone_number: str = phone_number
@property
def phone_number_id(self) -> int:
"""The phone number's ID. You most likely don't need this."""
return self._phone_number_id
@property
def phone_number(self) -> str:
"""
The 2FA method's phone number.
May be masked using unicode characters; should only be used for identification purposes.
"""
return self._phone_number
async def request(self) -> None:
"""Request an SMS to the corresponding phone number containing a 2FA code."""
return await self.account.sms_2fa_request(self._phone_number_id)
async def submit(self, code: str) -> LoginState:
"""See `BaseSecondFactorMethod.submit`."""
return await self.account.sms_2fa_submit(self._phone_number_id, code)
class SyncSmsSecondFactor(SyncSecondFactorMethod, SmsSecondFactorMethod):
"""
A sync implementation of `BaseSecondFactorMethod`.
Uses `AsyncSmsSecondFactor` internally.
"""
def __init__(
self,
account: "AppleAccount",
number_id: int,
phone_number: str,
) -> None:
"""See `AsyncSmsSecondFactor.__init__`."""
super().__init__(account)
self._phone_number_id: int = number_id
self._phone_number: str = phone_number
@property
def phone_number_id(self) -> int:
"""See `AsyncSmsSecondFactor.phone_number_id`."""
return self._phone_number_id
@property
def phone_number(self) -> str:
"""See `AsyncSmsSecondFactor.phone_number`."""
return self._phone_number
def request(self) -> None:
"""See `AsyncSmsSecondFactor.request`."""
return self.account.sms_2fa_request(self._phone_number_id)
def submit(self, code: str) -> LoginState:
"""See `AsyncSmsSecondFactor.submit`."""
return self.account.sms_2fa_submit(self._phone_number_id, code)

5
findmy/util/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Utility functions and classes. Intended for internal use."""
from .http import HttpResponse, HttpSession
from .parsers import decode_plist
__all__ = ("HttpSession", "HttpResponse", "decode_plist")

21
findmy/util/errors.py Normal file
View File

@@ -0,0 +1,21 @@
"""Exception classes."""
class InvalidCredentialsError(Exception):
"""Raised when credentials are incorrect."""
class UnhandledProtocolError(RuntimeError):
"""
Raised when an unexpected error occurs while communicating with Apple servers.
This is almost always a bug, so please report it.
"""
class InvalidStateError(RuntimeError):
"""
Raised when a method is used that is in conflict with the internal account state.
For example: calling `BaseAppleAccount.login` while already logged in.
"""

View File

@@ -1,30 +1,18 @@
"""Module to simplify asynchronous HTTP calls. For internal use only."""
"""Module to simplify asynchronous HTTP calls."""
from __future__ import annotations
import asyncio
import json
import logging
import plistlib
from typing import Any, ParamSpec
from aiohttp import BasicAuth, ClientSession, ClientTimeout
from .parsers import decode_plist
logging.getLogger(__name__)
def decode_plist(data: bytes) -> Any: # noqa: ANN401
"""Decode a plist file."""
plist_header = (
b"<?xml version='1.0' encoding='UTF-8'?>"
b"<!DOCTYPE plist PUBLIC '-//Apple//DTD PLIST 1.0//EN' 'http://www.apple.com/DTDs/PropertyList-1.0.dtd'>"
)
if not data.startswith(b"<?xml"): # append header ourselves
data = plist_header + data
return plistlib.loads(data)
class HttpResponse:
"""Response of a request made by `HttpSession`."""

16
findmy/util/parsers.py Normal file
View File

@@ -0,0 +1,16 @@
"""Parsers for various forms of data formats."""
import plistlib
from typing import Any
def decode_plist(data: bytes) -> Any: # noqa: ANN401
"""Decode a plist file."""
plist_header = (
b"<?xml version='1.0' encoding='UTF-8'?>"
b"<!DOCTYPE plist PUBLIC '-//Apple//DTD PLIST 1.0//EN' 'http://www.apple.com/DTDs/PropertyList-1.0.dtd'>"
)
if not data.startswith(b"<?xml"): # append header ourselves
data = plist_header + data
return plistlib.loads(data)