From 88bd5194bbb77e607d2d793eaa77cb6ee21a76c9 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 15 Jul 2024 16:53:44 +0200 Subject: [PATCH 1/4] scanner: detect OF devices in Nearby state --- examples/device_scanner.py | 43 ++++++-- findmy/scanner/__init__.py | 12 ++- findmy/scanner/scanner.py | 197 ++++++++++++++++++++++++++++++------- 3 files changed, 202 insertions(+), 50 deletions(-) diff --git a/examples/device_scanner.py b/examples/device_scanner.py index 1314a32..49eb76d 100644 --- a/examples/device_scanner.py +++ b/examples/device_scanner.py @@ -1,11 +1,36 @@ import asyncio import logging -from findmy.scanner import OfflineFindingScanner +from findmy.scanner import ( + NearbyOfflineFindingDevice, + OfflineFindingScanner, + SeparatedOfflineFindingDevice, +) logging.basicConfig(level=logging.INFO) +def _print_nearby(device: NearbyOfflineFindingDevice) -> None: + print(f"NEARBY Device - {device.mac_address}") + print(f" Status byte: {device.status:x}") + print(" Extra data:") + for k, v in sorted(device.additional_data.items()): + print(f" {k:20}: {v}") + print() + + +def _print_separated(device: SeparatedOfflineFindingDevice) -> None: + print(f"SEPARATED Device - {device.mac_address}") + print(f" Public key: {device.adv_key_b64}") + print(f" Lookup key: {device.hashed_adv_key_b64}") + print(f" Status byte: {device.status:x}") + print(f" Hint byte: {device.hint:x}") + print(" Extra data:") + for k, v in sorted(device.additional_data.items()): + print(f" {k:20}: {v}") + print() + + async def scan() -> None: scanner = await OfflineFindingScanner.create() @@ -13,15 +38,13 @@ async def scan() -> None: print() async for device in scanner.scan_for(10, extend_timeout=True): - print(f"Device - {device.mac_address}") - print(f" Public key: {device.adv_key_b64}") - print(f" Lookup key: {device.hashed_adv_key_b64}") - print(f" Status byte: {device.status:x}") - print(f" Hint byte: {device.hint:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() + if isinstance(device, NearbyOfflineFindingDevice): + _print_nearby(device) + elif isinstance(device, SeparatedOfflineFindingDevice): + _print_separated(device) + else: + print(f"Unknown device: {device}") + print() if __name__ == "__main__": diff --git a/findmy/scanner/__init__.py b/findmy/scanner/__init__.py index 9336c6e..944547f 100644 --- a/findmy/scanner/__init__.py +++ b/findmy/scanner/__init__.py @@ -1,4 +1,12 @@ """Utilities related to physically discoverable FindMy-devices.""" -from .scanner import OfflineFindingScanner +from .scanner import ( + NearbyOfflineFindingDevice, + OfflineFindingScanner, + SeparatedOfflineFindingDevice, +) -__all__ = ("OfflineFindingScanner",) +__all__ = ( + "OfflineFindingScanner", + "NearbyOfflineFindingDevice", + "SeparatedOfflineFindingDevice", +) diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index dab1efa..f3204f0 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -1,9 +1,11 @@ """Airtag scanner.""" + from __future__ import annotations import asyncio import logging import time +from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, AsyncGenerator from bleak import BleakScanner @@ -18,12 +20,151 @@ if TYPE_CHECKING: logging.getLogger(__name__) -class OfflineFindingDevice(HasPublicKey): +class OfflineFindingDevice(ABC): """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" OF_HEADER_SIZE = 2 OF_TYPE = 0x12 - OF_DATA_LEN = 25 + + @classmethod + @property + @abstractmethod + def payload_len(cls) -> int: + """Length of OfflineFinding data payload in bytes.""" + raise NotImplementedError + + def __init__( + self, + mac_bytes: bytes, + additional_data: dict[Any, Any] | None = None, + ) -> None: + """Instantiate an OfflineFindingDevice.""" + self._mac_bytes: bytes = mac_bytes + self._additional_data: dict[Any, Any] = additional_data or {} + + @property + def mac_address(self) -> str: + """MAC address of the device in AA:BB:CC:DD:EE:FF format.""" + mac = self._mac_bytes.hex().upper() + return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2)) + + @property + def additional_data(self) -> dict[Any, Any]: + """Any additional data. No guarantees about the contents of this dictionary.""" + return self._additional_data + + @classmethod + @abstractmethod + def from_payload( + cls, + mac_address: str, + payload: bytes, + additional_data: dict[Any, Any] | None, + ) -> OfflineFindingDevice | None: + """Get a NearbyOfflineFindingDevice object from an OF message payload.""" + raise NotImplementedError + + @classmethod + def from_ble_payload( + cls, + mac_address: str, + ble_payload: bytes, + additional_data: dict[Any, Any] | None = None, + ) -> OfflineFindingDevice | None: + """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" + if len(ble_payload) < cls.OF_HEADER_SIZE: + logging.error("Not enough bytes to decode: %s", len(ble_payload)) + return None + if ble_payload[0] != cls.OF_TYPE: + logging.debug("Unsupported OF type: %s", ble_payload[0]) + return None + + device_type = next( + (dev for dev in cls.__subclasses__() if dev.payload_len == ble_payload[1]), + None, + ) + if device_type is None: + logging.error("Invalid OF payload length: %s", ble_payload[1]) + return None + + return device_type.from_payload( + mac_address, + ble_payload[cls.OF_HEADER_SIZE :], + additional_data, + ) + + @override + def __eq__(self, other: object) -> bool: + if isinstance(other, OfflineFindingDevice): + return self.mac_address == other.mac_address + + return NotImplemented + + @override + def __hash__(self) -> int: + return int.from_bytes(self._mac_bytes) + + +class NearbyOfflineFindingDevice(OfflineFindingDevice): + """Offline-Finding device in nearby state.""" + + @classmethod + @property + @override + def payload_len(cls) -> int: + """Length of OfflineFinding data payload in bytes.""" + return 0x02 # 2 + + def __init__( + self, + mac_bytes: bytes, + status_byte: int, + extra_byte: int, + additional_data: dict[Any, Any] | None = None, + ) -> None: + """Instantiate a NearbyOfflineFindingDevice.""" + super().__init__(mac_bytes, additional_data) + + self._status_byte: int = status_byte + self._extra_byte: int = extra_byte + + @property + def status(self) -> int: + """Status value as reported by the device.""" + return self._status_byte % 255 + + @classmethod + @override + def from_payload( + cls, + mac_address: str, + payload: bytes, + additional_data: dict[Any, Any] | None = None, + ) -> NearbyOfflineFindingDevice | None: + """Get a NearbyOfflineFindingDevice object from an OF message payload.""" + if len(payload) != cls.payload_len: + logging.error( + "Invalid OF data length: %s instead of %s", + len(payload), + payload[1], + ) + + mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) + status_byte = payload[0] + extra_byte = payload[1] + + return NearbyOfflineFindingDevice(mac_bytes, status_byte, extra_byte, additional_data) + + +class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): + """Offline-Finding device in separated state.""" + + @classmethod + @property + @override + def payload_len(cls) -> int: + """Length of OfflineFinding data in bytes.""" + return 0x19 # 25 def __init__( # noqa: PLR0913 self, @@ -33,20 +174,13 @@ class OfflineFindingDevice(HasPublicKey): hint: int, additional_data: dict[Any, Any] | None = None, ) -> None: - """Initialize an `OfflineFindingDevice`.""" - self._mac_bytes: bytes = mac_bytes + """Initialize a `SeparatedOfflineFindingDevice`.""" + super().__init__(mac_bytes, additional_data) + self._status: int = status self._public_key: bytes = public_key self._hint: int = hint - self._additional_data: dict[Any, Any] = additional_data or {} - - @property - def mac_address(self) -> str: - """MAC address of the device in AA:BB:CC:DD:EE:FF format.""" - mac = self._mac_bytes.hex().upper() - return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2)) - @property def status(self) -> int: """Status value as reported by the device.""" @@ -57,11 +191,6 @@ class OfflineFindingDevice(HasPublicKey): """Hint value as reported by the device.""" return self._hint % 255 - @property - def additional_data(self) -> dict[Any, Any]: - """Any additional data. No guarantees about the contents of this dictionary.""" - return self._additional_data - @property @override def adv_key_bytes(self) -> bytes: @@ -69,44 +198,36 @@ class OfflineFindingDevice(HasPublicKey): return self._public_key @classmethod + @override def from_payload( cls, mac_address: str, payload: bytes, - additional_data: dict[Any, Any], - ) -> OfflineFindingDevice | None: - """Get an OfflineFindingDevice object from a BLE payload.""" - if len(payload) < cls.OF_HEADER_SIZE: - logging.error("Not enough bytes to decode: %s", len(payload)) - return None - if payload[0] != cls.OF_TYPE: - logging.debug("Unsupported OF type: %s", payload[0]) - return None - if payload[1] != cls.OF_DATA_LEN: - logging.debug("Unknown OF data length: %s", payload[1]) - return None - if len(payload) != cls.OF_HEADER_SIZE + cls.OF_DATA_LEN: - logging.debug( + additional_data: dict[Any, Any] | None = None, + ) -> SeparatedOfflineFindingDevice | None: + """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" + if len(payload) != cls.payload_len: + logging.error( "Invalid OF data length: %s instead of %s", - len(payload) - cls.OF_HEADER_SIZE, + len(payload), payload[1], ) return None mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) - status = payload[cls.OF_HEADER_SIZE + 0] + status = payload[0] - pubkey_end = payload[cls.OF_HEADER_SIZE + 1 : cls.OF_HEADER_SIZE + 23] + pubkey_end = payload[1:23] pubkey_middle = mac_bytes[1:] - pubkey_start_ms = payload[cls.OF_HEADER_SIZE + 23] << 6 + pubkey_start_ms = payload[23] << 6 pubkey_start_ls = mac_bytes[0] & 0b00111111 pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big") pubkey = pubkey_start + pubkey_middle + pubkey_end - hint = payload[cls.OF_HEADER_SIZE + 24] + hint = payload[24] - return OfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data) + return SeparatedOfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data) @override def __repr__(self) -> str: @@ -179,7 +300,7 @@ class OfflineFindingScanner: # Likely Windows host, where details is a '_RawAdvData' object. # See: https://github.com/malmeloo/FindMy.py/issues/24 additional_data = {} - return OfflineFindingDevice.from_payload(device.address, apple_data, additional_data) + return OfflineFindingDevice.from_ble_payload(device.address, apple_data, additional_data) async def scan_for( self, From c12fffe98e1ac11f8bd1c692643d576b1e5e4200 Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 15 Jul 2024 17:03:05 +0200 Subject: [PATCH 2/4] scanner: fix pyright failing --- findmy/scanner/scanner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index f3204f0..6eb7ed9 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -102,7 +102,7 @@ class OfflineFindingDevice(ABC): @override def __hash__(self) -> int: - return int.from_bytes(self._mac_bytes) + return int.from_bytes(self._mac_bytes, "big") class NearbyOfflineFindingDevice(OfflineFindingDevice): From a5f1ccdd68ea0b03ca610734bd2cdb43861ebbfe Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Mon, 15 Jul 2024 17:20:23 +0200 Subject: [PATCH 3/4] scanner: abstract status byte into `OfflineFindingDevice` --- findmy/scanner/scanner.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 6eb7ed9..3a51d13 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -36,10 +36,12 @@ class OfflineFindingDevice(ABC): def __init__( self, mac_bytes: bytes, + status_byte: int, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate an OfflineFindingDevice.""" self._mac_bytes: bytes = mac_bytes + self._status: int = status_byte self._additional_data: dict[Any, Any] = additional_data or {} @property @@ -48,6 +50,11 @@ class OfflineFindingDevice(ABC): mac = self._mac_bytes.hex().upper() return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2)) + @property + def status(self) -> int: + """Status value as reported by the device.""" + return self._status % 255 + @property def additional_data(self) -> dict[Any, Any]: """Any additional data. No guarantees about the contents of this dictionary.""" @@ -123,16 +130,10 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate a NearbyOfflineFindingDevice.""" - super().__init__(mac_bytes, additional_data) + super().__init__(mac_bytes, status_byte, additional_data) - self._status_byte: int = status_byte self._extra_byte: int = extra_byte - @property - def status(self) -> int: - """Status value as reported by the device.""" - return self._status_byte % 255 - @classmethod @override def from_payload( @@ -175,17 +176,11 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): additional_data: dict[Any, Any] | None = None, ) -> None: """Initialize a `SeparatedOfflineFindingDevice`.""" - super().__init__(mac_bytes, additional_data) + super().__init__(mac_bytes, status, additional_data) - self._status: int = status self._public_key: bytes = public_key self._hint: int = hint - @property - def status(self) -> int: - """Status value as reported by the device.""" - return self._status % 255 - @property def hint(self) -> int: """Hint value as reported by the device.""" From 47ee3c6201ec8fac4eaace1876e60718f45a751e Mon Sep 17 00:00:00 2001 From: "Mike A." Date: Tue, 16 Jul 2024 22:29:23 +0200 Subject: [PATCH 4/4] scanner: support checking for device - key correspondence --- examples/device_scanner.py | 17 ++++++++ findmy/scanner/scanner.py | 86 ++++++++++++++++++++++++++++++++++---- 2 files changed, 94 insertions(+), 9 deletions(-) diff --git a/examples/device_scanner.py b/examples/device_scanner.py index 49eb76d..8ab61da 100644 --- a/examples/device_scanner.py +++ b/examples/device_scanner.py @@ -1,6 +1,7 @@ import asyncio import logging +from findmy import KeyPair from findmy.scanner import ( NearbyOfflineFindingDevice, OfflineFindingScanner, @@ -9,6 +10,11 @@ from findmy.scanner import ( logging.basicConfig(level=logging.INFO) +# Set if you want to check whether a specific key (or accessory!) is in the scan results. +# Make sure to enter its private key! +# Leave empty (= None) to not check. +CHECK_KEY = KeyPair.from_b64("") + def _print_nearby(device: NearbyOfflineFindingDevice) -> None: print(f"NEARBY Device - {device.mac_address}") @@ -37,6 +43,8 @@ async def scan() -> None: print("Scanning for FindMy-devices...") print() + scan_device = None + async for device in scanner.scan_for(10, extend_timeout=True): if isinstance(device, NearbyOfflineFindingDevice): _print_nearby(device) @@ -45,6 +53,15 @@ async def scan() -> None: else: print(f"Unknown device: {device}") print() + continue + + if CHECK_KEY and device.is_from(CHECK_KEY): + scan_device = device + + if scan_device: + print("Key or accessory was found in scan results! :D") + elif CHECK_KEY: + print("Selected key or accessory was not found in scan results... :c") if __name__ == "__main__": diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 3a51d13..a9ea170 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -6,11 +6,13 @@ import asyncio import logging import time from abc import ABC, abstractmethod +from datetime import datetime from typing import TYPE_CHECKING, Any, AsyncGenerator from bleak import BleakScanner from typing_extensions import override +from findmy.accessory import RollingKeyPairSource from findmy.keys import HasPublicKey if TYPE_CHECKING: @@ -37,11 +39,13 @@ class OfflineFindingDevice(ABC): self, mac_bytes: bytes, status_byte: int, + detected_at: datetime, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate an OfflineFindingDevice.""" self._mac_bytes: bytes = mac_bytes self._status: int = status_byte + self._detected_at: datetime = detected_at self._additional_data: dict[Any, Any] = additional_data or {} @property @@ -55,17 +59,28 @@ class OfflineFindingDevice(ABC): """Status value as reported by the device.""" return self._status % 255 + @property + def detected_at(self) -> datetime: + """Timezone-aware datetime of when the device was detected.""" + return self._detected_at + @property def additional_data(self) -> dict[Any, Any]: """Any additional data. No guarantees about the contents of this dictionary.""" return self._additional_data + @abstractmethod + def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: + """Check whether the OF device's identity originates from a specific key source.""" + raise NotImplementedError + @classmethod @abstractmethod def from_payload( cls, mac_address: str, payload: bytes, + detected_at: datetime, additional_data: dict[Any, Any] | None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" @@ -76,6 +91,7 @@ class OfflineFindingDevice(ABC): cls, mac_address: str, ble_payload: bytes, + detected_at: datetime | None = None, additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" @@ -97,6 +113,7 @@ class OfflineFindingDevice(ABC): return device_type.from_payload( mac_address, ble_payload[cls.OF_HEADER_SIZE :], + detected_at or datetime.now().astimezone(), additional_data, ) @@ -122,17 +139,29 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): """Length of OfflineFinding data payload in bytes.""" return 0x02 # 2 - def __init__( + def __init__( # noqa: PLR0913 self, mac_bytes: bytes, status_byte: int, - extra_byte: int, + first_adv_key_bytes: bytes, + detected_at: datetime, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate a NearbyOfflineFindingDevice.""" - super().__init__(mac_bytes, status_byte, additional_data) + super().__init__(mac_bytes, status_byte, detected_at, additional_data) - self._extra_byte: int = extra_byte + self._first_adv_key_bytes: bytes = first_adv_key_bytes + + @override + def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: + """Check whether the OF device's identity originates from a specific key source.""" + if isinstance(other_device, HasPublicKey): + return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes) + if isinstance(other_device, RollingKeyPairSource): + return any(self.is_from(key) for key in other_device.keys_at(self.detected_at)) + + msg = f"Cannot compare against {type(other_device)}" + raise ValueError(msg) @classmethod @override @@ -140,6 +169,7 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): cls, mac_address: str, payload: bytes, + detected_at: datetime, additional_data: dict[Any, Any] | None = None, ) -> NearbyOfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" @@ -152,9 +182,20 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) status_byte = payload[0] - extra_byte = payload[1] - return NearbyOfflineFindingDevice(mac_bytes, status_byte, extra_byte, additional_data) + pubkey_middle = mac_bytes[1:] + pubkey_start_ms = payload[1] << 6 + pubkey_start_ls = mac_bytes[0] & 0b00111111 + pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big") + partial_pubkey = pubkey_start + pubkey_middle + + return NearbyOfflineFindingDevice( + mac_bytes, + status_byte, + partial_pubkey, + detected_at, + additional_data, + ) class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): @@ -173,10 +214,11 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): status: int, public_key: bytes, hint: int, + detected_at: datetime, additional_data: dict[Any, Any] | None = None, ) -> None: """Initialize a `SeparatedOfflineFindingDevice`.""" - super().__init__(mac_bytes, status, additional_data) + super().__init__(mac_bytes, status, detected_at, additional_data) self._public_key: bytes = public_key self._hint: int = hint @@ -192,12 +234,24 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): """See `HasPublicKey.adv_key_bytes`.""" return self._public_key + @override + def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: + """Check whether the OF device's identity originates from a specific key source.""" + if isinstance(other_device, HasPublicKey): + return self.adv_key_bytes == other_device.adv_key_bytes + if isinstance(other_device, RollingKeyPairSource): + return any(self.is_from(key) for key in other_device.keys_at(self.detected_at)) + + msg = f"Cannot compare against {type(other_device)}" + raise ValueError(msg) + @classmethod @override def from_payload( cls, mac_address: str, payload: bytes, + detected_at: datetime, additional_data: dict[Any, Any] | None = None, ) -> SeparatedOfflineFindingDevice | None: """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" @@ -222,7 +276,14 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): hint = payload[24] - return SeparatedOfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data) + return SeparatedOfflineFindingDevice( + mac_bytes, + status, + pubkey, + hint, + detected_at, + additional_data, + ) @override def __repr__(self) -> str: @@ -289,13 +350,20 @@ class OfflineFindingScanner: if not apple_data: return None + detected_at = datetime.now().astimezone() + try: additional_data = device.details.get("props", {}) except AttributeError: # Likely Windows host, where details is a '_RawAdvData' object. # See: https://github.com/malmeloo/FindMy.py/issues/24 additional_data = {} - return OfflineFindingDevice.from_ble_payload(device.address, apple_data, additional_data) + return OfflineFindingDevice.from_ble_payload( + device.address, + apple_data, + detected_at, + additional_data, + ) async def scan_for( self,