diff --git a/examples/_login.py b/examples/_login.py index b602400..7753745 100644 --- a/examples/_login.py +++ b/examples/_login.py @@ -1,13 +1,14 @@ from __future__ import annotations -from findmy.reports import ( +from findmy import ( AppleAccount, AsyncAppleAccount, + LocalAnisetteProvider, LoginState, + RemoteAnisetteProvider, SmsSecondFactorMethod, TrustedDeviceSecondFactorMethod, ) -from findmy.reports.anisette import LocalAnisetteProvider, RemoteAnisetteProvider def _login_sync(account: AppleAccount) -> None: diff --git a/examples/scanner.py b/examples/scanner.py index 7728a6a..8f929cd 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -5,9 +5,9 @@ import asyncio import logging from pathlib import Path -from findmy import KeyPair -from findmy.accessory import FindMyAccessory -from findmy.scanner import ( +from findmy import ( + FindMyAccessory, + KeyPair, NearbyOfflineFindingDevice, OfflineFindingScanner, SeparatedOfflineFindingDevice, diff --git a/findmy/__init__.py b/findmy/__init__.py index 4492842..01c9876 100644 --- a/findmy/__init__.py +++ b/findmy/__init__.py @@ -1,15 +1,81 @@ """A package providing everything you need to work with Apple's FindMy network.""" -from . import errors, keys, plist, reports, scanner -from .accessory import FindMyAccessory -from .keys import KeyPair +from .accessory import FindMyAccessory, FindMyAccessoryMapping, RollingKeyPairSource +from .errors import ( + InvalidCredentialsError, + InvalidStateError, + UnauthorizedError, + UnhandledProtocolError, +) +from .keys import HasHashedPublicKey, HasPublicKey, KeyPair, KeyPairMapping, KeyPairType +from .reports import ( + AccountStateMapping, + AnisetteMapping, + AppleAccount, + AsyncAppleAccount, + AsyncSmsSecondFactor, + AsyncTrustedDeviceSecondFactor, + BaseAnisetteProvider, + BaseAppleAccount, + BaseSecondFactorMethod, + LocalAnisetteMapping, + LocalAnisetteProvider, + LocationReport, + LocationReportDecryptedMapping, + LocationReportEncryptedMapping, + LocationReportMapping, + LoginState, + RemoteAnisetteMapping, + RemoteAnisetteProvider, + SmsSecondFactorMethod, + SyncSmsSecondFactor, + SyncTrustedDeviceSecondFactor, + TrustedDeviceSecondFactorMethod, +) +from .scanner import ( + NearbyOfflineFindingDevice, + OfflineFindingDevice, + OfflineFindingScanner, + SeparatedOfflineFindingDevice, +) __all__ = ( + "AccountStateMapping", + "AnisetteMapping", + "AppleAccount", + "AsyncAppleAccount", + "AsyncSmsSecondFactor", + "AsyncTrustedDeviceSecondFactor", + "BaseAnisetteProvider", + "BaseAppleAccount", + "BaseSecondFactorMethod", "FindMyAccessory", + "FindMyAccessoryMapping", + "HasHashedPublicKey", + "HasPublicKey", + "InvalidCredentialsError", + "InvalidStateError", "KeyPair", - "errors", - "keys", - "plist", - "reports", - "scanner", + "KeyPairMapping", + "KeyPairType", + "LocalAnisetteMapping", + "LocalAnisetteProvider", + "LocationReport", + "LocationReportDecryptedMapping", + "LocationReportEncryptedMapping", + "LocationReportMapping", + "LoginState", + "NearbyOfflineFindingDevice", + "OfflineFindingDevice", + "OfflineFindingScanner", + "RemoteAnisetteMapping", + "RemoteAnisetteProvider", + "RollingKeyPairSource", + "SeparatedOfflineFindingDevice", + "SmsSecondFactorMethod", + "SyncSmsSecondFactor", + "SyncTrustedDeviceSecondFactor", + "TrustedDeviceSecondFactorMethod", + "UnauthorizedError", + "UnhandledProtocolError", ) diff --git a/findmy/accessory.py b/findmy/accessory.py index c2bfd73..2567d28 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -13,10 +13,8 @@ from typing import TYPE_CHECKING, Literal, TypedDict, overload from typing_extensions import override -from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, read_data_plist, save_and_return_json - -from .keys import KeyGenerator, KeyPair, KeyType +from . import util +from .keys import KeyGenerator, KeyPair, KeyPairType from .util import crypto if TYPE_CHECKING: @@ -93,7 +91,7 @@ class RollingKeyPairSource(ABC): yield ind, key -class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]): +class FindMyAccessory(RollingKeyPairSource, util.abc.Serializable[FindMyAccessoryMapping]): """A findable Find My-accessory using official key rollover.""" def __init__( # noqa: PLR0913 @@ -116,8 +114,8 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] :param skn: The SKN for the primary key. :param sks: The SKS for the secondary key. """ - self._primary_gen = AccessoryKeyGenerator(master_key, skn, KeyType.PRIMARY) - self._secondary_gen = AccessoryKeyGenerator(master_key, sks, KeyType.SECONDARY) + self._primary_gen = _AccessoryKeyGenerator(master_key, skn, KeyPairType.PRIMARY) + self._secondary_gen = _AccessoryKeyGenerator(master_key, sks, KeyPairType.SECONDARY) self._paired_at: datetime = paired_at if self._paired_at.tzinfo is None: self._paired_at = self._paired_at.astimezone() @@ -270,7 +268,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] name: str | None = None, ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" - device_data = read_data_plist(plist) + device_data = util.files.read_data_plist(plist) # PRIVATE master key. 28 (?) bytes. master_key = device_data["privateKey"]["key"]["data"][-28:] @@ -295,7 +293,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] alignment_date = None index = None if key_alignment_plist: - alignment_data = read_data_plist(key_alignment_plist) + alignment_data = util.files.read_data_plist(key_alignment_plist) # last observed date alignment_date = alignment_data["lastIndexObservationDate"].replace( @@ -335,7 +333,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] "alignment_index": self._alignment_index, } - return save_and_return_json(res, path) + return util.files.save_and_return_json(res, path) @classmethod @override @@ -344,7 +342,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] val: str | Path | FindMyAccessoryMapping, /, ) -> FindMyAccessory: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "accessory" try: @@ -368,14 +366,14 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] raise ValueError(msg) from None -class AccessoryKeyGenerator(KeyGenerator[KeyPair]): +class _AccessoryKeyGenerator(KeyGenerator[KeyPair]): """KeyPair generator. Uses the same algorithm internally as FindMy accessories do.""" def __init__( self, master_key: bytes, initial_sk: bytes, - key_type: KeyType = KeyType.UNKNOWN, + key_type: KeyPairType = KeyPairType.UNKNOWN, ) -> None: """ Initialize the key generator. @@ -411,7 +409,7 @@ class AccessoryKeyGenerator(KeyGenerator[KeyPair]): return self._initial_sk @property - def key_type(self) -> KeyType: + def key_type(self) -> KeyPairType: """The type of key this generator produces.""" return self._key_type diff --git a/findmy/keys.py b/findmy/keys.py index b899616..af584da 100644 --- a/findmy/keys.py +++ b/findmy/keys.py @@ -22,7 +22,7 @@ if TYPE_CHECKING: from pathlib import Path -class KeyType(Enum): +class KeyPairType(Enum): """Enum of possible key types.""" UNKNOWN = 0 @@ -133,7 +133,7 @@ class KeyPair(HasPublicKey, Serializable[KeyPairMapping]): def __init__( self, private_key: bytes, - key_type: KeyType = KeyType.UNKNOWN, + key_type: KeyPairType = KeyPairType.UNKNOWN, name: str | None = None, ) -> None: """Initialize the :meth:`KeyPair` with the private key bytes.""" @@ -147,7 +147,7 @@ class KeyPair(HasPublicKey, Serializable[KeyPairMapping]): self._name = name @property - def key_type(self) -> KeyType: + def key_type(self) -> KeyPairType: """Type of this key.""" return self._key_type @@ -217,7 +217,7 @@ class KeyPair(HasPublicKey, Serializable[KeyPairMapping]): try: return cls( private_key=base64.b64decode(val["private_key"]), - key_type=KeyType(val["key_type"]), + key_type=KeyPairType(val["key_type"]), name=val["name"], ) except KeyError as e: diff --git a/findmy/reports/__init__.py b/findmy/reports/__init__.py index 730ff68..91228b2 100644 --- a/findmy/reports/__init__.py +++ b/findmy/reports/__init__.py @@ -1,16 +1,52 @@ """Code related to fetching location reports.""" -from .account import AppleAccount, AsyncAppleAccount -from .anisette import BaseAnisetteProvider, RemoteAnisetteProvider +from .account import AccountStateMapping, AppleAccount, AsyncAppleAccount, BaseAppleAccount +from .anisette import ( + AnisetteMapping, + BaseAnisetteProvider, + LocalAnisetteMapping, + LocalAnisetteProvider, + RemoteAnisetteMapping, + RemoteAnisetteProvider, +) +from .reports import ( + LocationReport, + LocationReportDecryptedMapping, + LocationReportEncryptedMapping, + LocationReportMapping, +) from .state import LoginState -from .twofactor import SmsSecondFactorMethod, TrustedDeviceSecondFactorMethod +from .twofactor import ( + AsyncSmsSecondFactor, + AsyncTrustedDeviceSecondFactor, + BaseSecondFactorMethod, + SmsSecondFactorMethod, + SyncSmsSecondFactor, + SyncTrustedDeviceSecondFactor, + TrustedDeviceSecondFactorMethod, +) __all__ = ( + "AccountStateMapping", + "AnisetteMapping", "AppleAccount", "AsyncAppleAccount", + "AsyncSmsSecondFactor", + "AsyncTrustedDeviceSecondFactor", "BaseAnisetteProvider", + "BaseAppleAccount", + "BaseSecondFactorMethod", + "LocalAnisetteMapping", + "LocalAnisetteProvider", + "LocationReport", + "LocationReportDecryptedMapping", + "LocationReportEncryptedMapping", + "LocationReportMapping", "LoginState", + "RemoteAnisetteMapping", "RemoteAnisetteProvider", "SmsSecondFactorMethod", + "SyncSmsSecondFactor", + "SyncTrustedDeviceSecondFactor", "TrustedDeviceSecondFactorMethod", ) diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 3dade65..b751d62 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -26,18 +26,15 @@ import bs4 import srp._pysrp as srp from typing_extensions import Concatenate, ParamSpec, override +from findmy import util from findmy.errors import ( InvalidCredentialsError, InvalidStateError, UnauthorizedError, UnhandledProtocolError, ) -from findmy.reports.anisette import AnisetteMapping, get_provider_from_mapping -from findmy.util import crypto -from findmy.util.abc import Closable, Serializable -from findmy.util.files import read_data_json, save_and_return_json -from findmy.util.http import HttpResponse, HttpSession, decode_plist +from .anisette import AnisetteMapping, get_provider_from_mapping from .reports import LocationReport, LocationReportsFetcher from .state import LoginState from .twofactor import ( @@ -106,7 +103,7 @@ _A = TypeVar("_A", bound="BaseAppleAccount") _F = Callable[Concatenate[_A, _P], _R] -def require_login_state(*states: LoginState) -> Callable[[_F], _F]: +def _require_login_state(*states: LoginState) -> Callable[[_F], _F]: """Enforce a login state as precondition for a method.""" def decorator(func: _F) -> _F: @@ -141,7 +138,7 @@ def _extract_phone_numbers(html: str) -> list[dict]: return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) -class BaseAppleAccount(Closable, Serializable[AccountStateMapping], ABC): +class BaseAppleAccount(util.abc.Closable, util.abc.Serializable[AccountStateMapping], ABC): """Base class for an Apple account.""" @property @@ -376,7 +373,7 @@ class AsyncAppleAccount(BaseAppleAccount): state_info["account"]["info"] if state_info else None ) - self._http: HttpSession = HttpSession() + self._http: util.http.HttpSession = util.http.HttpSession() self._reports: LocationReportsFetcher = LocationReportsFetcher(self) self._closed: bool = False @@ -403,7 +400,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._login_state @property - @require_login_state( + @_require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -414,7 +411,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._account_info["account_name"] if self._account_info else None @property - @require_login_state( + @_require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -425,7 +422,7 @@ class AsyncAppleAccount(BaseAppleAccount): return self._account_info["first_name"] if self._account_info else None @property - @require_login_state( + @_require_login_state( LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA, @@ -452,7 +449,7 @@ class AsyncAppleAccount(BaseAppleAccount): "anisette": self._anisette.to_json(), } - return save_and_return_json(res, path) + return util.files.save_and_return_json(res, path) @classmethod @override @@ -463,7 +460,7 @@ class AsyncAppleAccount(BaseAppleAccount): *, anisette_libs_path: str | Path | None = None, ) -> AsyncAppleAccount: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "account" try: @@ -496,7 +493,7 @@ class AsyncAppleAccount(BaseAppleAccount): except (RuntimeError, OSError, ConnectionError) as e: logger.warning("Error closing HTTP session: %s", e) - @require_login_state(LoginState.LOGGED_OUT) + @_require_login_state(LoginState.LOGGED_OUT) @override async def login(self, username: str, password: str) -> LoginState: """See :meth:`BaseAppleAccount.login`.""" @@ -508,7 +505,7 @@ class AsyncAppleAccount(BaseAppleAccount): # AUTHENTICATED -> LOGGED_IN return await self._login_mobileme() - @require_login_state(LoginState.REQUIRE_2FA) + @_require_login_state(LoginState.REQUIRE_2FA) @override async def get_2fa_methods(self) -> Sequence[AsyncSecondFactorMethod]: """See :meth:`BaseAppleAccount.get_2fa_methods`.""" @@ -537,7 +534,7 @@ class AsyncAppleAccount(BaseAppleAccount): return methods - @require_login_state(LoginState.REQUIRE_2FA) + @_require_login_state(LoginState.REQUIRE_2FA) @override async def sms_2fa_request(self, phone_number_id: int) -> None: """See :meth:`BaseAppleAccount.sms_2fa_request`.""" @@ -549,7 +546,7 @@ class AsyncAppleAccount(BaseAppleAccount): data, ) - @require_login_state(LoginState.REQUIRE_2FA) + @_require_login_state(LoginState.REQUIRE_2FA) @override async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState: """See :meth:`BaseAppleAccount.sms_2fa_submit`.""" @@ -574,7 +571,7 @@ class AsyncAppleAccount(BaseAppleAccount): # AUTHENTICATED -> LOGGED_IN return await self._login_mobileme() - @require_login_state(LoginState.REQUIRE_2FA) + @_require_login_state(LoginState.REQUIRE_2FA) @override async def td_2fa_request(self) -> None: """See :meth:`BaseAppleAccount.td_2fa_request`.""" @@ -588,7 +585,7 @@ class AsyncAppleAccount(BaseAppleAccount): headers=headers, ) - @require_login_state(LoginState.REQUIRE_2FA) + @_require_login_state(LoginState.REQUIRE_2FA) @override async def td_2fa_submit(self, code: str) -> LoginState: """See :meth:`BaseAppleAccount.td_2fa_submit`.""" @@ -612,7 +609,7 @@ class AsyncAppleAccount(BaseAppleAccount): # AUTHENTICATED -> LOGGED_IN return await self._login_mobileme() - @require_login_state(LoginState.LOGGED_IN) + @_require_login_state(LoginState.LOGGED_IN) async def fetch_raw_reports( self, devices: list[tuple[list[str], list[str]]], @@ -647,7 +644,7 @@ class AsyncAppleAccount(BaseAppleAccount): ], } - async def _do_request() -> HttpResponse: + async def _do_request() -> util.http.HttpResponse: return await self._http.post( self._ENDPOINT_REPORTS_FETCH, auth=auth, @@ -740,7 +737,7 @@ class AsyncAppleAccount(BaseAppleAccount): keys: Sequence[HasHashedPublicKey | RollingKeyPairSource], ) -> dict[HasHashedPublicKey | RollingKeyPairSource, LocationReport | None]: ... - @require_login_state(LoginState.LOGGED_IN) + @_require_login_state(LoginState.LOGGED_IN) @override async def fetch_location( self, @@ -759,7 +756,7 @@ class AsyncAppleAccount(BaseAppleAccount): return {dev: sorted(reports)[-1] if reports else None for dev, reports in hist.items()} - @require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA, LoginState.LOGGED_IN) + @_require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA, LoginState.LOGGED_IN) async def _gsa_authenticate( self, username: str | None = None, @@ -795,7 +792,7 @@ class AsyncAppleAccount(BaseAppleAccount): logger.debug("Attempting password challenge") - usr.p = crypto.encrypt_password(self._password, r["s"], r["i"], sp) + usr.p = util.crypto.encrypt_password(self._password, r["s"], r["i"], sp) m1 = usr.process_challenge(r["s"], r["B"]) if m1 is None: msg = "Failed to process challenge" @@ -816,8 +813,8 @@ class AsyncAppleAccount(BaseAppleAccount): logger.debug("Decrypting SPD data in response") - spd = decode_plist( - crypto.decrypt_spd_aes_cbc( + spd = util.parsers.decode_plist( + util.crypto.decrypt_spd_aes_cbc( usr.get_session_key() or b"", r["spd"], ), @@ -856,7 +853,7 @@ class AsyncAppleAccount(BaseAppleAccount): msg = f"Unknown auth value: {au}" raise UnhandledProtocolError(msg) - @require_login_state(LoginState.AUTHENTICATED) + @_require_login_state(LoginState.AUTHENTICATED) async def _login_mobileme(self) -> LoginState: logger.info("Logging into com.apple.mobileme") data = plistlib.dumps( @@ -1036,7 +1033,7 @@ class AppleAccount(BaseAppleAccount): *, anisette_libs_path: str | Path | None = None, ) -> AppleAccount: - val = read_data_json(val) + val = util.files.read_data_json(val) try: ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path) return cls(ani_provider, state_info=val) diff --git a/findmy/reports/anisette.py b/findmy/reports/anisette.py index a16eae1..dedf5e7 100644 --- a/findmy/reports/anisette.py +++ b/findmy/reports/anisette.py @@ -15,9 +15,7 @@ from typing import BinaryIO, Literal, TypedDict, Union from anisette import Anisette, AnisetteHeaders from typing_extensions import override -from findmy.util.abc import Closable, Serializable -from findmy.util.files import read_data_json, save_and_return_json -from findmy.util.http import HttpSession +from findmy import util logger = logging.getLogger(__name__) @@ -53,7 +51,7 @@ def get_provider_from_mapping( raise ValueError(msg) -class BaseAnisetteProvider(Closable, Serializable, ABC): +class BaseAnisetteProvider(util.abc.Closable, util.abc.Serializable, ABC): """ Abstract base class for Anisette providers. @@ -188,7 +186,7 @@ class BaseAnisetteProvider(Closable, Serializable, ABC): return cpd -class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMapping]): +class RemoteAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[RemoteAnisetteMapping]): """Anisette provider. Fetches headers from a remote Anisette server.""" _ANISETTE_DATA_VALID_FOR = 30 @@ -199,7 +197,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa self._server_url = server_url - self._http = HttpSession() + self._http = util.http.HttpSession() self._anisette_data: dict[str, str] | None = None self._anisette_data_expires_at: float = 0 @@ -208,7 +206,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa @override def to_json(self, dst: str | Path | None = None, /) -> RemoteAnisetteMapping: """See :meth:`BaseAnisetteProvider.serialize`.""" - return save_and_return_json( + return util.files.save_and_return_json( { "type": "aniRemote", "url": self._server_url, @@ -220,7 +218,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa @override def from_json(cls, val: str | Path | RemoteAnisetteMapping) -> RemoteAnisetteProvider: """See :meth:`BaseAnisetteProvider.deserialize`.""" - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "aniRemote" @@ -282,7 +280,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa logger.warning("Error closing anisette HTTP session: %s", e) -class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapping]): +class LocalAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[LocalAnisetteMapping]): """Local anisette provider using the `anisette` library.""" def __init__( @@ -333,7 +331,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp self._ani.save_provisioning(buf) prov_data = base64.b64encode(buf.getvalue()).decode("utf-8") - return save_and_return_json( + return util.files.save_and_return_json( { "type": "aniLocal", "prov_data": prov_data, @@ -350,7 +348,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp libs_path: str | Path | None = None, ) -> LocalAnisetteProvider: """See :meth:`BaseAnisetteProvider.deserialize`.""" - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "aniLocal" diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index 3c4e77d..85173b2 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -16,10 +16,9 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from typing_extensions import override +from findmy import util from findmy.accessory import RollingKeyPairSource -from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyType -from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, save_and_return_json +from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyPairType if TYPE_CHECKING: from collections.abc import Sequence @@ -52,7 +51,7 @@ class LocationReportDecryptedMapping(TypedDict): LocationReportMapping = Union[LocationReportEncryptedMapping, LocationReportDecryptedMapping] -class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): +class LocationReport(HasHashedPublicKey, util.abc.Serializable[LocationReportMapping]): """Location report corresponding to a certain :meth:`HasHashedPublicKey`.""" def __init__( @@ -239,7 +238,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): include_key = self.is_decrypted if include_key: - return save_and_return_json( + return util.files.save_and_return_json( { "type": "locReportDecrypted", "payload": base64.b64encode(self._payload).decode("utf-8"), @@ -248,7 +247,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): }, dst, ) - return save_and_return_json( + return util.files.save_and_return_json( { "type": "locReportEncrypted", "payload": base64.b64encode(self._payload).decode("utf-8"), @@ -260,7 +259,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): @classmethod @override def from_json(cls, val: str | Path | LocationReportMapping, /) -> LocationReport: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "locReportEncrypted" or val["type"] == "locReportDecrypted" try: @@ -463,10 +462,10 @@ class LocationReportsFetcher: # split into primary and secondary keys # (UNKNOWN keys are filed as primary) new_keys_primary: set[str] = { - key.hashed_adv_key_b64 for key in key_batch if key.key_type == KeyType.PRIMARY + key.hashed_adv_key_b64 for key in key_batch if key.key_type == KeyPairType.PRIMARY } new_keys_secondary: set[str] = { - key.hashed_adv_key_b64 for key in key_batch if key.key_type != KeyType.PRIMARY + key.hashed_adv_key_b64 for key in key_batch if key.key_type != KeyPairType.PRIMARY } # 290 seems to be the maximum number of keys that Apple accepts in a single request, diff --git a/findmy/scanner/__init__.py b/findmy/scanner/__init__.py index de8a0b5..b605439 100644 --- a/findmy/scanner/__init__.py +++ b/findmy/scanner/__init__.py @@ -2,12 +2,14 @@ from .scanner import ( NearbyOfflineFindingDevice, + OfflineFindingDevice, OfflineFindingScanner, SeparatedOfflineFindingDevice, ) __all__ = ( "NearbyOfflineFindingDevice", + "OfflineFindingDevice", "OfflineFindingScanner", "SeparatedOfflineFindingDevice", ) diff --git a/findmy/util/__init__.py b/findmy/util/__init__.py index f3a5e88..48ade67 100644 --- a/findmy/util/__init__.py +++ b/findmy/util/__init__.py @@ -1,6 +1,13 @@ """Utility functions and classes. Intended for internal use.""" -from .http import HttpResponse, HttpSession -from .parsers import decode_plist +from . import abc, crypto, files, http, parsers, session, types -__all__ = ("HttpResponse", "HttpSession", "decode_plist") +__all__ = ( + "abc", + "crypto", + "files", + "http", + "parsers", + "session", + "types", +) diff --git a/findmy/util/session.py b/findmy/util/session.py index a545292..e5b7cba 100644 --- a/findmy/util/session.py +++ b/findmy/util/session.py @@ -3,9 +3,11 @@ from __future__ import annotations import random -from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Union +from typing import TYPE_CHECKING, Any, Generic, TypeVar, Union -from findmy.util.abc import Closable, Serializable +from typing_extensions import Self + +from .abc import Closable, Serializable if TYPE_CHECKING: from pathlib import Path