Merge pull request #50 from malmeloo/feat/nearby-of-device-scanner

scanner: detect OF devices in Nearby state
This commit is contained in:
Mike Almeloo
2024-07-21 21:25:04 +02:00
committed by GitHub
3 changed files with 279 additions and 47 deletions

View File

@@ -1,19 +1,32 @@
import asyncio import asyncio
import logging import logging
from findmy.scanner import OfflineFindingScanner from findmy import KeyPair
from findmy.scanner import (
NearbyOfflineFindingDevice,
OfflineFindingScanner,
SeparatedOfflineFindingDevice,
)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
# Set if you want to check whether a specific key (or accessory!) is in the scan results.
# Make sure to enter its private key!
# Leave empty (= None) to not check.
CHECK_KEY = KeyPair.from_b64("")
async def scan() -> None:
scanner = await OfflineFindingScanner.create()
print("Scanning for FindMy-devices...") def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
print(f"NEARBY Device - {device.mac_address}")
print(f" Status byte: {device.status:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print() print()
async for device in scanner.scan_for(10, extend_timeout=True):
print(f"Device - {device.mac_address}") def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
print(f"SEPARATED Device - {device.mac_address}")
print(f" Public key: {device.adv_key_b64}") print(f" Public key: {device.adv_key_b64}")
print(f" Lookup key: {device.hashed_adv_key_b64}") print(f" Lookup key: {device.hashed_adv_key_b64}")
print(f" Status byte: {device.status:x}") print(f" Status byte: {device.status:x}")
@@ -24,5 +37,32 @@ async def scan() -> None:
print() print()
async def scan() -> None:
scanner = await OfflineFindingScanner.create()
print("Scanning for FindMy-devices...")
print()
scan_device = None
async for device in scanner.scan_for(10, extend_timeout=True):
if isinstance(device, NearbyOfflineFindingDevice):
_print_nearby(device)
elif isinstance(device, SeparatedOfflineFindingDevice):
_print_separated(device)
else:
print(f"Unknown device: {device}")
print()
continue
if CHECK_KEY and device.is_from(CHECK_KEY):
scan_device = device
if scan_device:
print("Key or accessory was found in scan results! :D")
elif CHECK_KEY:
print("Selected key or accessory was not found in scan results... :c")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(scan()) asyncio.run(scan())

View File

@@ -1,4 +1,12 @@
"""Utilities related to physically discoverable FindMy-devices.""" """Utilities related to physically discoverable FindMy-devices."""
from .scanner import OfflineFindingScanner from .scanner import (
NearbyOfflineFindingDevice,
OfflineFindingScanner,
SeparatedOfflineFindingDevice,
)
__all__ = ("OfflineFindingScanner",) __all__ = (
"OfflineFindingScanner",
"NearbyOfflineFindingDevice",
"SeparatedOfflineFindingDevice",
)

View File

@@ -1,14 +1,18 @@
"""Airtag scanner.""" """Airtag scanner."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging import logging
import time import time
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Any, AsyncGenerator from typing import TYPE_CHECKING, Any, AsyncGenerator
from bleak import BleakScanner from bleak import BleakScanner
from typing_extensions import override from typing_extensions import override
from findmy.accessory import RollingKeyPairSource
from findmy.keys import HasPublicKey from findmy.keys import HasPublicKey
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -18,27 +22,30 @@ if TYPE_CHECKING:
logging.getLogger(__name__) logging.getLogger(__name__)
class OfflineFindingDevice(HasPublicKey): class OfflineFindingDevice(ABC):
"""Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" """Device discoverable through Apple's bluetooth-based Offline Finding protocol."""
OF_HEADER_SIZE = 2 OF_HEADER_SIZE = 2
OF_TYPE = 0x12 OF_TYPE = 0x12
OF_DATA_LEN = 25
def __init__( # noqa: PLR0913 @classmethod
@property
@abstractmethod
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
raise NotImplementedError
def __init__(
self, self,
mac_bytes: bytes, mac_bytes: bytes,
status: int, status_byte: int,
public_key: bytes, detected_at: datetime,
hint: int,
additional_data: dict[Any, Any] | None = None, additional_data: dict[Any, Any] | None = None,
) -> None: ) -> None:
"""Initialize an `OfflineFindingDevice`.""" """Instantiate an OfflineFindingDevice."""
self._mac_bytes: bytes = mac_bytes self._mac_bytes: bytes = mac_bytes
self._status: int = status self._status: int = status_byte
self._public_key: bytes = public_key self._detected_at: datetime = detected_at
self._hint: int = hint
self._additional_data: dict[Any, Any] = additional_data or {} self._additional_data: dict[Any, Any] = additional_data or {}
@property @property
@@ -53,60 +60,230 @@ class OfflineFindingDevice(HasPublicKey):
return self._status % 255 return self._status % 255
@property @property
def hint(self) -> int: def detected_at(self) -> datetime:
"""Hint value as reported by the device.""" """Timezone-aware datetime of when the device was detected."""
return self._hint % 255 return self._detected_at
@property @property
def additional_data(self) -> dict[Any, Any]: def additional_data(self) -> dict[Any, Any]:
"""Any additional data. No guarantees about the contents of this dictionary.""" """Any additional data. No guarantees about the contents of this dictionary."""
return self._additional_data return self._additional_data
@abstractmethod
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
raise NotImplementedError
@classmethod
@abstractmethod
def from_payload(
cls,
mac_address: str,
payload: bytes,
detected_at: datetime,
additional_data: dict[Any, Any] | None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
raise NotImplementedError
@classmethod
def from_ble_payload(
cls,
mac_address: str,
ble_payload: bytes,
detected_at: datetime | None = None,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
if len(ble_payload) < cls.OF_HEADER_SIZE:
logging.error("Not enough bytes to decode: %s", len(ble_payload))
return None
if ble_payload[0] != cls.OF_TYPE:
logging.debug("Unsupported OF type: %s", ble_payload[0])
return None
device_type = next(
(dev for dev in cls.__subclasses__() if dev.payload_len == ble_payload[1]),
None,
)
if device_type is None:
logging.error("Invalid OF payload length: %s", ble_payload[1])
return None
return device_type.from_payload(
mac_address,
ble_payload[cls.OF_HEADER_SIZE :],
detected_at or datetime.now().astimezone(),
additional_data,
)
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, OfflineFindingDevice):
return self.mac_address == other.mac_address
return NotImplemented
@override
def __hash__(self) -> int:
return int.from_bytes(self._mac_bytes, "big")
class NearbyOfflineFindingDevice(OfflineFindingDevice):
"""Offline-Finding device in nearby state."""
@classmethod
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
return 0x02 # 2
def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status_byte: int,
first_adv_key_bytes: bytes,
detected_at: datetime,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate a NearbyOfflineFindingDevice."""
super().__init__(mac_bytes, status_byte, detected_at, additional_data)
self._first_adv_key_bytes: bytes = first_adv_key_bytes
@override
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
if isinstance(other_device, HasPublicKey):
return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes)
if isinstance(other_device, RollingKeyPairSource):
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at))
msg = f"Cannot compare against {type(other_device)}"
raise ValueError(msg)
@classmethod
@override
def from_payload(
cls,
mac_address: str,
payload: bytes,
detected_at: datetime,
additional_data: dict[Any, Any] | None = None,
) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.payload_len:
logging.error(
"Invalid OF data length: %s instead of %s",
len(payload),
payload[1],
)
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
status_byte = payload[0]
pubkey_middle = mac_bytes[1:]
pubkey_start_ms = payload[1] << 6
pubkey_start_ls = mac_bytes[0] & 0b00111111
pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big")
partial_pubkey = pubkey_start + pubkey_middle
return NearbyOfflineFindingDevice(
mac_bytes,
status_byte,
partial_pubkey,
detected_at,
additional_data,
)
class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
"""Offline-Finding device in separated state."""
@classmethod
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data in bytes."""
return 0x19 # 25
def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status: int,
public_key: bytes,
hint: int,
detected_at: datetime,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Initialize a `SeparatedOfflineFindingDevice`."""
super().__init__(mac_bytes, status, detected_at, additional_data)
self._public_key: bytes = public_key
self._hint: int = hint
@property
def hint(self) -> int:
"""Hint value as reported by the device."""
return self._hint % 255
@property @property
@override @override
def adv_key_bytes(self) -> bytes: def adv_key_bytes(self) -> bytes:
"""See `HasPublicKey.adv_key_bytes`.""" """See `HasPublicKey.adv_key_bytes`."""
return self._public_key return self._public_key
@override
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
if isinstance(other_device, HasPublicKey):
return self.adv_key_bytes == other_device.adv_key_bytes
if isinstance(other_device, RollingKeyPairSource):
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at))
msg = f"Cannot compare against {type(other_device)}"
raise ValueError(msg)
@classmethod @classmethod
@override
def from_payload( def from_payload(
cls, cls,
mac_address: str, mac_address: str,
payload: bytes, payload: bytes,
additional_data: dict[Any, Any], detected_at: datetime,
) -> OfflineFindingDevice | None: additional_data: dict[Any, Any] | None = None,
"""Get an OfflineFindingDevice object from a BLE payload.""" ) -> SeparatedOfflineFindingDevice | None:
if len(payload) < cls.OF_HEADER_SIZE: """Get a SeparatedOfflineFindingDevice object from an OF message payload."""
logging.error("Not enough bytes to decode: %s", len(payload)) if len(payload) != cls.payload_len:
return None logging.error(
if payload[0] != cls.OF_TYPE:
logging.debug("Unsupported OF type: %s", payload[0])
return None
if payload[1] != cls.OF_DATA_LEN:
logging.debug("Unknown OF data length: %s", payload[1])
return None
if len(payload) != cls.OF_HEADER_SIZE + cls.OF_DATA_LEN:
logging.debug(
"Invalid OF data length: %s instead of %s", "Invalid OF data length: %s instead of %s",
len(payload) - cls.OF_HEADER_SIZE, len(payload),
payload[1], payload[1],
) )
return None return None
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
status = payload[cls.OF_HEADER_SIZE + 0] status = payload[0]
pubkey_end = payload[cls.OF_HEADER_SIZE + 1 : cls.OF_HEADER_SIZE + 23] pubkey_end = payload[1:23]
pubkey_middle = mac_bytes[1:] pubkey_middle = mac_bytes[1:]
pubkey_start_ms = payload[cls.OF_HEADER_SIZE + 23] << 6 pubkey_start_ms = payload[23] << 6
pubkey_start_ls = mac_bytes[0] & 0b00111111 pubkey_start_ls = mac_bytes[0] & 0b00111111
pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big") pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big")
pubkey = pubkey_start + pubkey_middle + pubkey_end pubkey = pubkey_start + pubkey_middle + pubkey_end
hint = payload[cls.OF_HEADER_SIZE + 24] hint = payload[24]
return OfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data) return SeparatedOfflineFindingDevice(
mac_bytes,
status,
pubkey,
hint,
detected_at,
additional_data,
)
@override @override
def __repr__(self) -> str: def __repr__(self) -> str:
@@ -173,13 +350,20 @@ class OfflineFindingScanner:
if not apple_data: if not apple_data:
return None return None
detected_at = datetime.now().astimezone()
try: try:
additional_data = device.details.get("props", {}) additional_data = device.details.get("props", {})
except AttributeError: except AttributeError:
# Likely Windows host, where details is a '_RawAdvData' object. # Likely Windows host, where details is a '_RawAdvData' object.
# See: https://github.com/malmeloo/FindMy.py/issues/24 # See: https://github.com/malmeloo/FindMy.py/issues/24
additional_data = {} additional_data = {}
return OfflineFindingDevice.from_payload(device.address, apple_data, additional_data) return OfflineFindingDevice.from_ble_payload(
device.address,
apple_data,
detected_at,
additional_data,
)
async def scan_for( async def scan_for(
self, self,