From e8d05b69bfd28a43526f6b6e2bf2adad7d5bd56c Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Thu, 11 Sep 2025 16:29:01 +0200 Subject: [PATCH] refactor: combine utils into single import internally --- findmy/accessory.py | 14 ++++++-------- findmy/reports/account.py | 25 +++++++++++-------------- findmy/reports/anisette.py | 20 +++++++++----------- findmy/reports/reports.py | 11 +++++------ findmy/util/__init__.py | 14 +++++++++++--- findmy/util/session.py | 2 +- 6 files changed, 43 insertions(+), 43 deletions(-) diff --git a/findmy/accessory.py b/findmy/accessory.py index 3cb8955..2567d28 100644 --- a/findmy/accessory.py +++ b/findmy/accessory.py @@ -13,9 +13,7 @@ from typing import TYPE_CHECKING, Literal, TypedDict, overload from typing_extensions import override -from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, read_data_plist, save_and_return_json - +from . import util from .keys import KeyGenerator, KeyPair, KeyPairType from .util import crypto @@ -93,7 +91,7 @@ class RollingKeyPairSource(ABC): yield ind, key -class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]): +class FindMyAccessory(RollingKeyPairSource, util.abc.Serializable[FindMyAccessoryMapping]): """A findable Find My-accessory using official key rollover.""" def __init__( # noqa: PLR0913 @@ -270,7 +268,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] name: str | None = None, ) -> FindMyAccessory: """Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" - device_data = read_data_plist(plist) + device_data = util.files.read_data_plist(plist) # PRIVATE master key. 28 (?) bytes. master_key = device_data["privateKey"]["key"]["data"][-28:] @@ -295,7 +293,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] alignment_date = None index = None if key_alignment_plist: - alignment_data = read_data_plist(key_alignment_plist) + alignment_data = util.files.read_data_plist(key_alignment_plist) # last observed date alignment_date = alignment_data["lastIndexObservationDate"].replace( @@ -335,7 +333,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] "alignment_index": self._alignment_index, } - return save_and_return_json(res, path) + return util.files.save_and_return_json(res, path) @classmethod @override @@ -344,7 +342,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping] val: str | Path | FindMyAccessoryMapping, /, ) -> FindMyAccessory: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "accessory" try: diff --git a/findmy/reports/account.py b/findmy/reports/account.py index 845593a..b751d62 100644 --- a/findmy/reports/account.py +++ b/findmy/reports/account.py @@ -26,18 +26,15 @@ import bs4 import srp._pysrp as srp from typing_extensions import Concatenate, ParamSpec, override +from findmy import util from findmy.errors import ( InvalidCredentialsError, InvalidStateError, UnauthorizedError, UnhandledProtocolError, ) -from findmy.reports.anisette import AnisetteMapping, get_provider_from_mapping -from findmy.util import crypto -from findmy.util.abc import Closable, Serializable -from findmy.util.files import read_data_json, save_and_return_json -from findmy.util.http import HttpResponse, HttpSession, decode_plist +from .anisette import AnisetteMapping, get_provider_from_mapping from .reports import LocationReport, LocationReportsFetcher from .state import LoginState from .twofactor import ( @@ -141,7 +138,7 @@ def _extract_phone_numbers(html: str) -> list[dict]: return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) -class BaseAppleAccount(Closable, Serializable[AccountStateMapping], ABC): +class BaseAppleAccount(util.abc.Closable, util.abc.Serializable[AccountStateMapping], ABC): """Base class for an Apple account.""" @property @@ -376,7 +373,7 @@ class AsyncAppleAccount(BaseAppleAccount): state_info["account"]["info"] if state_info else None ) - self._http: HttpSession = HttpSession() + self._http: util.http.HttpSession = util.http.HttpSession() self._reports: LocationReportsFetcher = LocationReportsFetcher(self) self._closed: bool = False @@ -452,7 +449,7 @@ class AsyncAppleAccount(BaseAppleAccount): "anisette": self._anisette.to_json(), } - return save_and_return_json(res, path) + return util.files.save_and_return_json(res, path) @classmethod @override @@ -463,7 +460,7 @@ class AsyncAppleAccount(BaseAppleAccount): *, anisette_libs_path: str | Path | None = None, ) -> AsyncAppleAccount: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "account" try: @@ -647,7 +644,7 @@ class AsyncAppleAccount(BaseAppleAccount): ], } - async def _do_request() -> HttpResponse: + async def _do_request() -> util.http.HttpResponse: return await self._http.post( self._ENDPOINT_REPORTS_FETCH, auth=auth, @@ -795,7 +792,7 @@ class AsyncAppleAccount(BaseAppleAccount): logger.debug("Attempting password challenge") - usr.p = crypto.encrypt_password(self._password, r["s"], r["i"], sp) + usr.p = util.crypto.encrypt_password(self._password, r["s"], r["i"], sp) m1 = usr.process_challenge(r["s"], r["B"]) if m1 is None: msg = "Failed to process challenge" @@ -816,8 +813,8 @@ class AsyncAppleAccount(BaseAppleAccount): logger.debug("Decrypting SPD data in response") - spd = decode_plist( - crypto.decrypt_spd_aes_cbc( + spd = util.parsers.decode_plist( + util.crypto.decrypt_spd_aes_cbc( usr.get_session_key() or b"", r["spd"], ), @@ -1036,7 +1033,7 @@ class AppleAccount(BaseAppleAccount): *, anisette_libs_path: str | Path | None = None, ) -> AppleAccount: - val = read_data_json(val) + val = util.files.read_data_json(val) try: ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path) return cls(ani_provider, state_info=val) diff --git a/findmy/reports/anisette.py b/findmy/reports/anisette.py index a16eae1..dedf5e7 100644 --- a/findmy/reports/anisette.py +++ b/findmy/reports/anisette.py @@ -15,9 +15,7 @@ from typing import BinaryIO, Literal, TypedDict, Union from anisette import Anisette, AnisetteHeaders from typing_extensions import override -from findmy.util.abc import Closable, Serializable -from findmy.util.files import read_data_json, save_and_return_json -from findmy.util.http import HttpSession +from findmy import util logger = logging.getLogger(__name__) @@ -53,7 +51,7 @@ def get_provider_from_mapping( raise ValueError(msg) -class BaseAnisetteProvider(Closable, Serializable, ABC): +class BaseAnisetteProvider(util.abc.Closable, util.abc.Serializable, ABC): """ Abstract base class for Anisette providers. @@ -188,7 +186,7 @@ class BaseAnisetteProvider(Closable, Serializable, ABC): return cpd -class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMapping]): +class RemoteAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[RemoteAnisetteMapping]): """Anisette provider. Fetches headers from a remote Anisette server.""" _ANISETTE_DATA_VALID_FOR = 30 @@ -199,7 +197,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa self._server_url = server_url - self._http = HttpSession() + self._http = util.http.HttpSession() self._anisette_data: dict[str, str] | None = None self._anisette_data_expires_at: float = 0 @@ -208,7 +206,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa @override def to_json(self, dst: str | Path | None = None, /) -> RemoteAnisetteMapping: """See :meth:`BaseAnisetteProvider.serialize`.""" - return save_and_return_json( + return util.files.save_and_return_json( { "type": "aniRemote", "url": self._server_url, @@ -220,7 +218,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa @override def from_json(cls, val: str | Path | RemoteAnisetteMapping) -> RemoteAnisetteProvider: """See :meth:`BaseAnisetteProvider.deserialize`.""" - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "aniRemote" @@ -282,7 +280,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa logger.warning("Error closing anisette HTTP session: %s", e) -class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapping]): +class LocalAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[LocalAnisetteMapping]): """Local anisette provider using the `anisette` library.""" def __init__( @@ -333,7 +331,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp self._ani.save_provisioning(buf) prov_data = base64.b64encode(buf.getvalue()).decode("utf-8") - return save_and_return_json( + return util.files.save_and_return_json( { "type": "aniLocal", "prov_data": prov_data, @@ -350,7 +348,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp libs_path: str | Path | None = None, ) -> LocalAnisetteProvider: """See :meth:`BaseAnisetteProvider.deserialize`.""" - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "aniLocal" diff --git a/findmy/reports/reports.py b/findmy/reports/reports.py index dba4530..85173b2 100644 --- a/findmy/reports/reports.py +++ b/findmy/reports/reports.py @@ -16,10 +16,9 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from typing_extensions import override +from findmy import util from findmy.accessory import RollingKeyPairSource from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyPairType -from findmy.util.abc import Serializable -from findmy.util.files import read_data_json, save_and_return_json if TYPE_CHECKING: from collections.abc import Sequence @@ -52,7 +51,7 @@ class LocationReportDecryptedMapping(TypedDict): LocationReportMapping = Union[LocationReportEncryptedMapping, LocationReportDecryptedMapping] -class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): +class LocationReport(HasHashedPublicKey, util.abc.Serializable[LocationReportMapping]): """Location report corresponding to a certain :meth:`HasHashedPublicKey`.""" def __init__( @@ -239,7 +238,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): include_key = self.is_decrypted if include_key: - return save_and_return_json( + return util.files.save_and_return_json( { "type": "locReportDecrypted", "payload": base64.b64encode(self._payload).decode("utf-8"), @@ -248,7 +247,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): }, dst, ) - return save_and_return_json( + return util.files.save_and_return_json( { "type": "locReportEncrypted", "payload": base64.b64encode(self._payload).decode("utf-8"), @@ -260,7 +259,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): @classmethod @override def from_json(cls, val: str | Path | LocationReportMapping, /) -> LocationReport: - val = read_data_json(val) + val = util.files.read_data_json(val) assert val["type"] == "locReportEncrypted" or val["type"] == "locReportDecrypted" try: diff --git a/findmy/util/__init__.py b/findmy/util/__init__.py index f3a5e88..efc2b1d 100644 --- a/findmy/util/__init__.py +++ b/findmy/util/__init__.py @@ -1,6 +1,14 @@ """Utility functions and classes. Intended for internal use.""" -from .http import HttpResponse, HttpSession -from .parsers import decode_plist +from . import abc, crypto, files, http, parsers, plist, session, types -__all__ = ("HttpResponse", "HttpSession", "decode_plist") +__all__ = ( + "abc", + "crypto", + "files", + "http", + "parsers", + "plist", + "session", + "types", +) diff --git a/findmy/util/session.py b/findmy/util/session.py index a545292..2dc6a38 100644 --- a/findmy/util/session.py +++ b/findmy/util/session.py @@ -5,7 +5,7 @@ from __future__ import annotations import random from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Union -from findmy.util.abc import Closable, Serializable +from .abc import Closable, Serializable if TYPE_CHECKING: from pathlib import Path