mirror of
https://github.com/malmeloo/FindMy.py.git
synced 2026-04-17 21:53:57 +02:00
scanner: detect OF devices in Nearby state
This commit is contained in:
@@ -1,11 +1,36 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from findmy.scanner import OfflineFindingScanner
|
||||
from findmy.scanner import (
|
||||
NearbyOfflineFindingDevice,
|
||||
OfflineFindingScanner,
|
||||
SeparatedOfflineFindingDevice,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
|
||||
print(f"NEARBY Device - {device.mac_address}")
|
||||
print(f" Status byte: {device.status:x}")
|
||||
print(" Extra data:")
|
||||
for k, v in sorted(device.additional_data.items()):
|
||||
print(f" {k:20}: {v}")
|
||||
print()
|
||||
|
||||
|
||||
def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
|
||||
print(f"SEPARATED Device - {device.mac_address}")
|
||||
print(f" Public key: {device.adv_key_b64}")
|
||||
print(f" Lookup key: {device.hashed_adv_key_b64}")
|
||||
print(f" Status byte: {device.status:x}")
|
||||
print(f" Hint byte: {device.hint:x}")
|
||||
print(" Extra data:")
|
||||
for k, v in sorted(device.additional_data.items()):
|
||||
print(f" {k:20}: {v}")
|
||||
print()
|
||||
|
||||
|
||||
async def scan() -> None:
|
||||
scanner = await OfflineFindingScanner.create()
|
||||
|
||||
@@ -13,15 +38,13 @@ async def scan() -> None:
|
||||
print()
|
||||
|
||||
async for device in scanner.scan_for(10, extend_timeout=True):
|
||||
print(f"Device - {device.mac_address}")
|
||||
print(f" Public key: {device.adv_key_b64}")
|
||||
print(f" Lookup key: {device.hashed_adv_key_b64}")
|
||||
print(f" Status byte: {device.status:x}")
|
||||
print(f" Hint byte: {device.hint:x}")
|
||||
print(" Extra data:")
|
||||
for k, v in sorted(device.additional_data.items()):
|
||||
print(f" {k:20}: {v}")
|
||||
print()
|
||||
if isinstance(device, NearbyOfflineFindingDevice):
|
||||
_print_nearby(device)
|
||||
elif isinstance(device, SeparatedOfflineFindingDevice):
|
||||
_print_separated(device)
|
||||
else:
|
||||
print(f"Unknown device: {device}")
|
||||
print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
"""Utilities related to physically discoverable FindMy-devices."""
|
||||
from .scanner import OfflineFindingScanner
|
||||
from .scanner import (
|
||||
NearbyOfflineFindingDevice,
|
||||
OfflineFindingScanner,
|
||||
SeparatedOfflineFindingDevice,
|
||||
)
|
||||
|
||||
__all__ = ("OfflineFindingScanner",)
|
||||
__all__ = (
|
||||
"OfflineFindingScanner",
|
||||
"NearbyOfflineFindingDevice",
|
||||
"SeparatedOfflineFindingDevice",
|
||||
)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
"""Airtag scanner."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import TYPE_CHECKING, Any, AsyncGenerator
|
||||
|
||||
from bleak import BleakScanner
|
||||
@@ -18,12 +20,151 @@ if TYPE_CHECKING:
|
||||
logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OfflineFindingDevice(HasPublicKey):
|
||||
class OfflineFindingDevice(ABC):
|
||||
"""Device discoverable through Apple's bluetooth-based Offline Finding protocol."""
|
||||
|
||||
OF_HEADER_SIZE = 2
|
||||
OF_TYPE = 0x12
|
||||
OF_DATA_LEN = 25
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
@abstractmethod
|
||||
def payload_len(cls) -> int:
|
||||
"""Length of OfflineFinding data payload in bytes."""
|
||||
raise NotImplementedError
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mac_bytes: bytes,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Instantiate an OfflineFindingDevice."""
|
||||
self._mac_bytes: bytes = mac_bytes
|
||||
self._additional_data: dict[Any, Any] = additional_data or {}
|
||||
|
||||
@property
|
||||
def mac_address(self) -> str:
|
||||
"""MAC address of the device in AA:BB:CC:DD:EE:FF format."""
|
||||
mac = self._mac_bytes.hex().upper()
|
||||
return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
|
||||
|
||||
@property
|
||||
def additional_data(self) -> dict[Any, Any]:
|
||||
"""Any additional data. No guarantees about the contents of this dictionary."""
|
||||
return self._additional_data
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def from_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
additional_data: dict[Any, Any] | None,
|
||||
) -> OfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def from_ble_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
ble_payload: bytes,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> OfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
|
||||
if len(ble_payload) < cls.OF_HEADER_SIZE:
|
||||
logging.error("Not enough bytes to decode: %s", len(ble_payload))
|
||||
return None
|
||||
if ble_payload[0] != cls.OF_TYPE:
|
||||
logging.debug("Unsupported OF type: %s", ble_payload[0])
|
||||
return None
|
||||
|
||||
device_type = next(
|
||||
(dev for dev in cls.__subclasses__() if dev.payload_len == ble_payload[1]),
|
||||
None,
|
||||
)
|
||||
if device_type is None:
|
||||
logging.error("Invalid OF payload length: %s", ble_payload[1])
|
||||
return None
|
||||
|
||||
return device_type.from_payload(
|
||||
mac_address,
|
||||
ble_payload[cls.OF_HEADER_SIZE :],
|
||||
additional_data,
|
||||
)
|
||||
|
||||
@override
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if isinstance(other, OfflineFindingDevice):
|
||||
return self.mac_address == other.mac_address
|
||||
|
||||
return NotImplemented
|
||||
|
||||
@override
|
||||
def __hash__(self) -> int:
|
||||
return int.from_bytes(self._mac_bytes)
|
||||
|
||||
|
||||
class NearbyOfflineFindingDevice(OfflineFindingDevice):
|
||||
"""Offline-Finding device in nearby state."""
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
@override
|
||||
def payload_len(cls) -> int:
|
||||
"""Length of OfflineFinding data payload in bytes."""
|
||||
return 0x02 # 2
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mac_bytes: bytes,
|
||||
status_byte: int,
|
||||
extra_byte: int,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Instantiate a NearbyOfflineFindingDevice."""
|
||||
super().__init__(mac_bytes, additional_data)
|
||||
|
||||
self._status_byte: int = status_byte
|
||||
self._extra_byte: int = extra_byte
|
||||
|
||||
@property
|
||||
def status(self) -> int:
|
||||
"""Status value as reported by the device."""
|
||||
return self._status_byte % 255
|
||||
|
||||
@classmethod
|
||||
@override
|
||||
def from_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> NearbyOfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
|
||||
if len(payload) != cls.payload_len:
|
||||
logging.error(
|
||||
"Invalid OF data length: %s instead of %s",
|
||||
len(payload),
|
||||
payload[1],
|
||||
)
|
||||
|
||||
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
|
||||
status_byte = payload[0]
|
||||
extra_byte = payload[1]
|
||||
|
||||
return NearbyOfflineFindingDevice(mac_bytes, status_byte, extra_byte, additional_data)
|
||||
|
||||
|
||||
class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
|
||||
"""Offline-Finding device in separated state."""
|
||||
|
||||
@classmethod
|
||||
@property
|
||||
@override
|
||||
def payload_len(cls) -> int:
|
||||
"""Length of OfflineFinding data in bytes."""
|
||||
return 0x19 # 25
|
||||
|
||||
def __init__( # noqa: PLR0913
|
||||
self,
|
||||
@@ -33,20 +174,13 @@ class OfflineFindingDevice(HasPublicKey):
|
||||
hint: int,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Initialize an `OfflineFindingDevice`."""
|
||||
self._mac_bytes: bytes = mac_bytes
|
||||
"""Initialize a `SeparatedOfflineFindingDevice`."""
|
||||
super().__init__(mac_bytes, additional_data)
|
||||
|
||||
self._status: int = status
|
||||
self._public_key: bytes = public_key
|
||||
self._hint: int = hint
|
||||
|
||||
self._additional_data: dict[Any, Any] = additional_data or {}
|
||||
|
||||
@property
|
||||
def mac_address(self) -> str:
|
||||
"""MAC address of the device in AA:BB:CC:DD:EE:FF format."""
|
||||
mac = self._mac_bytes.hex().upper()
|
||||
return ":".join(mac[i : i + 2] for i in range(0, len(mac), 2))
|
||||
|
||||
@property
|
||||
def status(self) -> int:
|
||||
"""Status value as reported by the device."""
|
||||
@@ -57,11 +191,6 @@ class OfflineFindingDevice(HasPublicKey):
|
||||
"""Hint value as reported by the device."""
|
||||
return self._hint % 255
|
||||
|
||||
@property
|
||||
def additional_data(self) -> dict[Any, Any]:
|
||||
"""Any additional data. No guarantees about the contents of this dictionary."""
|
||||
return self._additional_data
|
||||
|
||||
@property
|
||||
@override
|
||||
def adv_key_bytes(self) -> bytes:
|
||||
@@ -69,44 +198,36 @@ class OfflineFindingDevice(HasPublicKey):
|
||||
return self._public_key
|
||||
|
||||
@classmethod
|
||||
@override
|
||||
def from_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
additional_data: dict[Any, Any],
|
||||
) -> OfflineFindingDevice | None:
|
||||
"""Get an OfflineFindingDevice object from a BLE payload."""
|
||||
if len(payload) < cls.OF_HEADER_SIZE:
|
||||
logging.error("Not enough bytes to decode: %s", len(payload))
|
||||
return None
|
||||
if payload[0] != cls.OF_TYPE:
|
||||
logging.debug("Unsupported OF type: %s", payload[0])
|
||||
return None
|
||||
if payload[1] != cls.OF_DATA_LEN:
|
||||
logging.debug("Unknown OF data length: %s", payload[1])
|
||||
return None
|
||||
if len(payload) != cls.OF_HEADER_SIZE + cls.OF_DATA_LEN:
|
||||
logging.debug(
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> SeparatedOfflineFindingDevice | None:
|
||||
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
|
||||
if len(payload) != cls.payload_len:
|
||||
logging.error(
|
||||
"Invalid OF data length: %s instead of %s",
|
||||
len(payload) - cls.OF_HEADER_SIZE,
|
||||
len(payload),
|
||||
payload[1],
|
||||
)
|
||||
return None
|
||||
|
||||
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
|
||||
|
||||
status = payload[cls.OF_HEADER_SIZE + 0]
|
||||
status = payload[0]
|
||||
|
||||
pubkey_end = payload[cls.OF_HEADER_SIZE + 1 : cls.OF_HEADER_SIZE + 23]
|
||||
pubkey_end = payload[1:23]
|
||||
pubkey_middle = mac_bytes[1:]
|
||||
pubkey_start_ms = payload[cls.OF_HEADER_SIZE + 23] << 6
|
||||
pubkey_start_ms = payload[23] << 6
|
||||
pubkey_start_ls = mac_bytes[0] & 0b00111111
|
||||
pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big")
|
||||
pubkey = pubkey_start + pubkey_middle + pubkey_end
|
||||
|
||||
hint = payload[cls.OF_HEADER_SIZE + 24]
|
||||
hint = payload[24]
|
||||
|
||||
return OfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data)
|
||||
return SeparatedOfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data)
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
@@ -179,7 +300,7 @@ class OfflineFindingScanner:
|
||||
# Likely Windows host, where details is a '_RawAdvData' object.
|
||||
# See: https://github.com/malmeloo/FindMy.py/issues/24
|
||||
additional_data = {}
|
||||
return OfflineFindingDevice.from_payload(device.address, apple_data, additional_data)
|
||||
return OfflineFindingDevice.from_ble_payload(device.address, apple_data, additional_data)
|
||||
|
||||
async def scan_for(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user