refactor: combine utils into single import internally

This commit is contained in:
Mike A.
2025-09-11 16:29:01 +02:00
parent 34a324128c
commit e8d05b69bf
6 changed files with 43 additions and 43 deletions

View File

@@ -13,9 +13,7 @@ from typing import TYPE_CHECKING, Literal, TypedDict, overload
from typing_extensions import override from typing_extensions import override
from findmy.util.abc import Serializable from . import util
from findmy.util.files import read_data_json, read_data_plist, save_and_return_json
from .keys import KeyGenerator, KeyPair, KeyPairType from .keys import KeyGenerator, KeyPair, KeyPairType
from .util import crypto from .util import crypto
@@ -93,7 +91,7 @@ class RollingKeyPairSource(ABC):
yield ind, key yield ind, key
class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]): class FindMyAccessory(RollingKeyPairSource, util.abc.Serializable[FindMyAccessoryMapping]):
"""A findable Find My-accessory using official key rollover.""" """A findable Find My-accessory using official key rollover."""
def __init__( # noqa: PLR0913 def __init__( # noqa: PLR0913
@@ -270,7 +268,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]
name: str | None = None, name: str | None = None,
) -> FindMyAccessory: ) -> FindMyAccessory:
"""Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" """Create a FindMyAccessory from a .plist file dumped from the FindMy app."""
device_data = read_data_plist(plist) device_data = util.files.read_data_plist(plist)
# PRIVATE master key. 28 (?) bytes. # PRIVATE master key. 28 (?) bytes.
master_key = device_data["privateKey"]["key"]["data"][-28:] master_key = device_data["privateKey"]["key"]["data"][-28:]
@@ -295,7 +293,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]
alignment_date = None alignment_date = None
index = None index = None
if key_alignment_plist: if key_alignment_plist:
alignment_data = read_data_plist(key_alignment_plist) alignment_data = util.files.read_data_plist(key_alignment_plist)
# last observed date # last observed date
alignment_date = alignment_data["lastIndexObservationDate"].replace( alignment_date = alignment_data["lastIndexObservationDate"].replace(
@@ -335,7 +333,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]
"alignment_index": self._alignment_index, "alignment_index": self._alignment_index,
} }
return save_and_return_json(res, path) return util.files.save_and_return_json(res, path)
@classmethod @classmethod
@override @override
@@ -344,7 +342,7 @@ class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]
val: str | Path | FindMyAccessoryMapping, val: str | Path | FindMyAccessoryMapping,
/, /,
) -> FindMyAccessory: ) -> FindMyAccessory:
val = read_data_json(val) val = util.files.read_data_json(val)
assert val["type"] == "accessory" assert val["type"] == "accessory"
try: try:

View File

@@ -26,18 +26,15 @@ import bs4
import srp._pysrp as srp import srp._pysrp as srp
from typing_extensions import Concatenate, ParamSpec, override from typing_extensions import Concatenate, ParamSpec, override
from findmy import util
from findmy.errors import ( from findmy.errors import (
InvalidCredentialsError, InvalidCredentialsError,
InvalidStateError, InvalidStateError,
UnauthorizedError, UnauthorizedError,
UnhandledProtocolError, UnhandledProtocolError,
) )
from findmy.reports.anisette import AnisetteMapping, get_provider_from_mapping
from findmy.util import crypto
from findmy.util.abc import Closable, Serializable
from findmy.util.files import read_data_json, save_and_return_json
from findmy.util.http import HttpResponse, HttpSession, decode_plist
from .anisette import AnisetteMapping, get_provider_from_mapping
from .reports import LocationReport, LocationReportsFetcher from .reports import LocationReport, LocationReportsFetcher
from .state import LoginState from .state import LoginState
from .twofactor import ( from .twofactor import (
@@ -141,7 +138,7 @@ def _extract_phone_numbers(html: str) -> list[dict]:
return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", [])
class BaseAppleAccount(Closable, Serializable[AccountStateMapping], ABC): class BaseAppleAccount(util.abc.Closable, util.abc.Serializable[AccountStateMapping], ABC):
"""Base class for an Apple account.""" """Base class for an Apple account."""
@property @property
@@ -376,7 +373,7 @@ class AsyncAppleAccount(BaseAppleAccount):
state_info["account"]["info"] if state_info else None state_info["account"]["info"] if state_info else None
) )
self._http: HttpSession = HttpSession() self._http: util.http.HttpSession = util.http.HttpSession()
self._reports: LocationReportsFetcher = LocationReportsFetcher(self) self._reports: LocationReportsFetcher = LocationReportsFetcher(self)
self._closed: bool = False self._closed: bool = False
@@ -452,7 +449,7 @@ class AsyncAppleAccount(BaseAppleAccount):
"anisette": self._anisette.to_json(), "anisette": self._anisette.to_json(),
} }
return save_and_return_json(res, path) return util.files.save_and_return_json(res, path)
@classmethod @classmethod
@override @override
@@ -463,7 +460,7 @@ class AsyncAppleAccount(BaseAppleAccount):
*, *,
anisette_libs_path: str | Path | None = None, anisette_libs_path: str | Path | None = None,
) -> AsyncAppleAccount: ) -> AsyncAppleAccount:
val = read_data_json(val) val = util.files.read_data_json(val)
assert val["type"] == "account" assert val["type"] == "account"
try: try:
@@ -647,7 +644,7 @@ class AsyncAppleAccount(BaseAppleAccount):
], ],
} }
async def _do_request() -> HttpResponse: async def _do_request() -> util.http.HttpResponse:
return await self._http.post( return await self._http.post(
self._ENDPOINT_REPORTS_FETCH, self._ENDPOINT_REPORTS_FETCH,
auth=auth, auth=auth,
@@ -795,7 +792,7 @@ class AsyncAppleAccount(BaseAppleAccount):
logger.debug("Attempting password challenge") logger.debug("Attempting password challenge")
usr.p = crypto.encrypt_password(self._password, r["s"], r["i"], sp) usr.p = util.crypto.encrypt_password(self._password, r["s"], r["i"], sp)
m1 = usr.process_challenge(r["s"], r["B"]) m1 = usr.process_challenge(r["s"], r["B"])
if m1 is None: if m1 is None:
msg = "Failed to process challenge" msg = "Failed to process challenge"
@@ -816,8 +813,8 @@ class AsyncAppleAccount(BaseAppleAccount):
logger.debug("Decrypting SPD data in response") logger.debug("Decrypting SPD data in response")
spd = decode_plist( spd = util.parsers.decode_plist(
crypto.decrypt_spd_aes_cbc( util.crypto.decrypt_spd_aes_cbc(
usr.get_session_key() or b"", usr.get_session_key() or b"",
r["spd"], r["spd"],
), ),
@@ -1036,7 +1033,7 @@ class AppleAccount(BaseAppleAccount):
*, *,
anisette_libs_path: str | Path | None = None, anisette_libs_path: str | Path | None = None,
) -> AppleAccount: ) -> AppleAccount:
val = read_data_json(val) val = util.files.read_data_json(val)
try: try:
ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path) ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path)
return cls(ani_provider, state_info=val) return cls(ani_provider, state_info=val)

View File

@@ -15,9 +15,7 @@ from typing import BinaryIO, Literal, TypedDict, Union
from anisette import Anisette, AnisetteHeaders from anisette import Anisette, AnisetteHeaders
from typing_extensions import override from typing_extensions import override
from findmy.util.abc import Closable, Serializable from findmy import util
from findmy.util.files import read_data_json, save_and_return_json
from findmy.util.http import HttpSession
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -53,7 +51,7 @@ def get_provider_from_mapping(
raise ValueError(msg) raise ValueError(msg)
class BaseAnisetteProvider(Closable, Serializable, ABC): class BaseAnisetteProvider(util.abc.Closable, util.abc.Serializable, ABC):
""" """
Abstract base class for Anisette providers. Abstract base class for Anisette providers.
@@ -188,7 +186,7 @@ class BaseAnisetteProvider(Closable, Serializable, ABC):
return cpd return cpd
class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMapping]): class RemoteAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[RemoteAnisetteMapping]):
"""Anisette provider. Fetches headers from a remote Anisette server.""" """Anisette provider. Fetches headers from a remote Anisette server."""
_ANISETTE_DATA_VALID_FOR = 30 _ANISETTE_DATA_VALID_FOR = 30
@@ -199,7 +197,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa
self._server_url = server_url self._server_url = server_url
self._http = HttpSession() self._http = util.http.HttpSession()
self._anisette_data: dict[str, str] | None = None self._anisette_data: dict[str, str] | None = None
self._anisette_data_expires_at: float = 0 self._anisette_data_expires_at: float = 0
@@ -208,7 +206,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa
@override @override
def to_json(self, dst: str | Path | None = None, /) -> RemoteAnisetteMapping: def to_json(self, dst: str | Path | None = None, /) -> RemoteAnisetteMapping:
"""See :meth:`BaseAnisetteProvider.serialize`.""" """See :meth:`BaseAnisetteProvider.serialize`."""
return save_and_return_json( return util.files.save_and_return_json(
{ {
"type": "aniRemote", "type": "aniRemote",
"url": self._server_url, "url": self._server_url,
@@ -220,7 +218,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa
@override @override
def from_json(cls, val: str | Path | RemoteAnisetteMapping) -> RemoteAnisetteProvider: def from_json(cls, val: str | Path | RemoteAnisetteMapping) -> RemoteAnisetteProvider:
"""See :meth:`BaseAnisetteProvider.deserialize`.""" """See :meth:`BaseAnisetteProvider.deserialize`."""
val = read_data_json(val) val = util.files.read_data_json(val)
assert val["type"] == "aniRemote" assert val["type"] == "aniRemote"
@@ -282,7 +280,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMa
logger.warning("Error closing anisette HTTP session: %s", e) logger.warning("Error closing anisette HTTP session: %s", e)
class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapping]): class LocalAnisetteProvider(BaseAnisetteProvider, util.abc.Serializable[LocalAnisetteMapping]):
"""Local anisette provider using the `anisette` library.""" """Local anisette provider using the `anisette` library."""
def __init__( def __init__(
@@ -333,7 +331,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp
self._ani.save_provisioning(buf) self._ani.save_provisioning(buf)
prov_data = base64.b64encode(buf.getvalue()).decode("utf-8") prov_data = base64.b64encode(buf.getvalue()).decode("utf-8")
return save_and_return_json( return util.files.save_and_return_json(
{ {
"type": "aniLocal", "type": "aniLocal",
"prov_data": prov_data, "prov_data": prov_data,
@@ -350,7 +348,7 @@ class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapp
libs_path: str | Path | None = None, libs_path: str | Path | None = None,
) -> LocalAnisetteProvider: ) -> LocalAnisetteProvider:
"""See :meth:`BaseAnisetteProvider.deserialize`.""" """See :meth:`BaseAnisetteProvider.deserialize`."""
val = read_data_json(val) val = util.files.read_data_json(val)
assert val["type"] == "aniLocal" assert val["type"] == "aniLocal"

View File

@@ -16,10 +16,9 @@ from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from typing_extensions import override from typing_extensions import override
from findmy import util
from findmy.accessory import RollingKeyPairSource from findmy.accessory import RollingKeyPairSource
from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyPairType from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping, KeyPairType
from findmy.util.abc import Serializable
from findmy.util.files import read_data_json, save_and_return_json
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Sequence from collections.abc import Sequence
@@ -52,7 +51,7 @@ class LocationReportDecryptedMapping(TypedDict):
LocationReportMapping = Union[LocationReportEncryptedMapping, LocationReportDecryptedMapping] LocationReportMapping = Union[LocationReportEncryptedMapping, LocationReportDecryptedMapping]
class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]): class LocationReport(HasHashedPublicKey, util.abc.Serializable[LocationReportMapping]):
"""Location report corresponding to a certain :meth:`HasHashedPublicKey`.""" """Location report corresponding to a certain :meth:`HasHashedPublicKey`."""
def __init__( def __init__(
@@ -239,7 +238,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]):
include_key = self.is_decrypted include_key = self.is_decrypted
if include_key: if include_key:
return save_and_return_json( return util.files.save_and_return_json(
{ {
"type": "locReportDecrypted", "type": "locReportDecrypted",
"payload": base64.b64encode(self._payload).decode("utf-8"), "payload": base64.b64encode(self._payload).decode("utf-8"),
@@ -248,7 +247,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]):
}, },
dst, dst,
) )
return save_and_return_json( return util.files.save_and_return_json(
{ {
"type": "locReportEncrypted", "type": "locReportEncrypted",
"payload": base64.b64encode(self._payload).decode("utf-8"), "payload": base64.b64encode(self._payload).decode("utf-8"),
@@ -260,7 +259,7 @@ class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]):
@classmethod @classmethod
@override @override
def from_json(cls, val: str | Path | LocationReportMapping, /) -> LocationReport: def from_json(cls, val: str | Path | LocationReportMapping, /) -> LocationReport:
val = read_data_json(val) val = util.files.read_data_json(val)
assert val["type"] == "locReportEncrypted" or val["type"] == "locReportDecrypted" assert val["type"] == "locReportEncrypted" or val["type"] == "locReportDecrypted"
try: try:

View File

@@ -1,6 +1,14 @@
"""Utility functions and classes. Intended for internal use.""" """Utility functions and classes. Intended for internal use."""
from .http import HttpResponse, HttpSession from . import abc, crypto, files, http, parsers, plist, session, types
from .parsers import decode_plist
__all__ = ("HttpResponse", "HttpSession", "decode_plist") __all__ = (
"abc",
"crypto",
"files",
"http",
"parsers",
"plist",
"session",
"types",
)

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import random import random
from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Union from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Union
from findmy.util.abc import Closable, Serializable from .abc import Closable, Serializable
if TYPE_CHECKING: if TYPE_CHECKING:
from pathlib import Path from pathlib import Path