mirror of
https://github.com/malmeloo/FindMy.py.git
synced 2026-04-17 21:53:57 +02:00
scanner: support checking for device - key correspondence
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from findmy import KeyPair
|
||||
from findmy.scanner import (
|
||||
NearbyOfflineFindingDevice,
|
||||
OfflineFindingScanner,
|
||||
@@ -9,6 +10,11 @@ from findmy.scanner import (
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# Set if you want to check whether a specific key (or accessory!) is in the scan results.
|
||||
# Make sure to enter its private key!
|
||||
# Leave empty (= None) to not check.
|
||||
CHECK_KEY = KeyPair.from_b64("")
|
||||
|
||||
|
||||
def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
|
||||
print(f"NEARBY Device - {device.mac_address}")
|
||||
@@ -37,6 +43,8 @@ async def scan() -> None:
|
||||
print("Scanning for FindMy-devices...")
|
||||
print()
|
||||
|
||||
scan_device = None
|
||||
|
||||
async for device in scanner.scan_for(10, extend_timeout=True):
|
||||
if isinstance(device, NearbyOfflineFindingDevice):
|
||||
_print_nearby(device)
|
||||
@@ -45,6 +53,15 @@ async def scan() -> None:
|
||||
else:
|
||||
print(f"Unknown device: {device}")
|
||||
print()
|
||||
continue
|
||||
|
||||
if CHECK_KEY and device.is_from(CHECK_KEY):
|
||||
scan_device = device
|
||||
|
||||
if scan_device:
|
||||
print("Key or accessory was found in scan results! :D")
|
||||
elif CHECK_KEY:
|
||||
print("Selected key or accessory was not found in scan results... :c")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -6,11 +6,13 @@ import asyncio
|
||||
import logging
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, AsyncGenerator
|
||||
|
||||
from bleak import BleakScanner
|
||||
from typing_extensions import override
|
||||
|
||||
from findmy.accessory import RollingKeyPairSource
|
||||
from findmy.keys import HasPublicKey
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -37,11 +39,13 @@ class OfflineFindingDevice(ABC):
|
||||
self,
|
||||
mac_bytes: bytes,
|
||||
status_byte: int,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Instantiate an OfflineFindingDevice."""
|
||||
self._mac_bytes: bytes = mac_bytes
|
||||
self._status: int = status_byte
|
||||
self._detected_at: datetime = detected_at
|
||||
self._additional_data: dict[Any, Any] = additional_data or {}
|
||||
|
||||
@property
|
||||
@@ -55,17 +59,28 @@ class OfflineFindingDevice(ABC):
|
||||
"""Status value as reported by the device."""
|
||||
return self._status % 255
|
||||
|
||||
@property
|
||||
def detected_at(self) -> datetime:
|
||||
"""Timezone-aware datetime of when the device was detected."""
|
||||
return self._detected_at
|
||||
|
||||
@property
|
||||
def additional_data(self) -> dict[Any, Any]:
|
||||
"""Any additional data. No guarantees about the contents of this dictionary."""
|
||||
return self._additional_data
|
||||
|
||||
@abstractmethod
|
||||
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
|
||||
"""Check whether the OF device's identity originates from a specific key source."""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def from_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None,
|
||||
) -> OfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
|
||||
@@ -76,6 +91,7 @@ class OfflineFindingDevice(ABC):
|
||||
cls,
|
||||
mac_address: str,
|
||||
ble_payload: bytes,
|
||||
detected_at: datetime | None = None,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> OfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
|
||||
@@ -97,6 +113,7 @@ class OfflineFindingDevice(ABC):
|
||||
return device_type.from_payload(
|
||||
mac_address,
|
||||
ble_payload[cls.OF_HEADER_SIZE :],
|
||||
detected_at or datetime.now().astimezone(),
|
||||
additional_data,
|
||||
)
|
||||
|
||||
@@ -122,17 +139,29 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
|
||||
"""Length of OfflineFinding data payload in bytes."""
|
||||
return 0x02 # 2
|
||||
|
||||
def __init__(
|
||||
def __init__( # noqa: PLR0913
|
||||
self,
|
||||
mac_bytes: bytes,
|
||||
status_byte: int,
|
||||
extra_byte: int,
|
||||
first_adv_key_bytes: bytes,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Instantiate a NearbyOfflineFindingDevice."""
|
||||
super().__init__(mac_bytes, status_byte, additional_data)
|
||||
super().__init__(mac_bytes, status_byte, detected_at, additional_data)
|
||||
|
||||
self._extra_byte: int = extra_byte
|
||||
self._first_adv_key_bytes: bytes = first_adv_key_bytes
|
||||
|
||||
@override
|
||||
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
|
||||
"""Check whether the OF device's identity originates from a specific key source."""
|
||||
if isinstance(other_device, HasPublicKey):
|
||||
return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes)
|
||||
if isinstance(other_device, RollingKeyPairSource):
|
||||
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at))
|
||||
|
||||
msg = f"Cannot compare against {type(other_device)}"
|
||||
raise ValueError(msg)
|
||||
|
||||
@classmethod
|
||||
@override
|
||||
@@ -140,6 +169,7 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> NearbyOfflineFindingDevice | None:
|
||||
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
|
||||
@@ -152,9 +182,20 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
|
||||
|
||||
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
|
||||
status_byte = payload[0]
|
||||
extra_byte = payload[1]
|
||||
|
||||
return NearbyOfflineFindingDevice(mac_bytes, status_byte, extra_byte, additional_data)
|
||||
pubkey_middle = mac_bytes[1:]
|
||||
pubkey_start_ms = payload[1] << 6
|
||||
pubkey_start_ls = mac_bytes[0] & 0b00111111
|
||||
pubkey_start = (pubkey_start_ms | pubkey_start_ls).to_bytes(1, "big")
|
||||
partial_pubkey = pubkey_start + pubkey_middle
|
||||
|
||||
return NearbyOfflineFindingDevice(
|
||||
mac_bytes,
|
||||
status_byte,
|
||||
partial_pubkey,
|
||||
detected_at,
|
||||
additional_data,
|
||||
)
|
||||
|
||||
|
||||
class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
|
||||
@@ -173,10 +214,11 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
|
||||
status: int,
|
||||
public_key: bytes,
|
||||
hint: int,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> None:
|
||||
"""Initialize a `SeparatedOfflineFindingDevice`."""
|
||||
super().__init__(mac_bytes, status, additional_data)
|
||||
super().__init__(mac_bytes, status, detected_at, additional_data)
|
||||
|
||||
self._public_key: bytes = public_key
|
||||
self._hint: int = hint
|
||||
@@ -192,12 +234,24 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
|
||||
"""See `HasPublicKey.adv_key_bytes`."""
|
||||
return self._public_key
|
||||
|
||||
@override
|
||||
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
|
||||
"""Check whether the OF device's identity originates from a specific key source."""
|
||||
if isinstance(other_device, HasPublicKey):
|
||||
return self.adv_key_bytes == other_device.adv_key_bytes
|
||||
if isinstance(other_device, RollingKeyPairSource):
|
||||
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at))
|
||||
|
||||
msg = f"Cannot compare against {type(other_device)}"
|
||||
raise ValueError(msg)
|
||||
|
||||
@classmethod
|
||||
@override
|
||||
def from_payload(
|
||||
cls,
|
||||
mac_address: str,
|
||||
payload: bytes,
|
||||
detected_at: datetime,
|
||||
additional_data: dict[Any, Any] | None = None,
|
||||
) -> SeparatedOfflineFindingDevice | None:
|
||||
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
|
||||
@@ -222,7 +276,14 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
|
||||
|
||||
hint = payload[24]
|
||||
|
||||
return SeparatedOfflineFindingDevice(mac_bytes, status, pubkey, hint, additional_data)
|
||||
return SeparatedOfflineFindingDevice(
|
||||
mac_bytes,
|
||||
status,
|
||||
pubkey,
|
||||
hint,
|
||||
detected_at,
|
||||
additional_data,
|
||||
)
|
||||
|
||||
@override
|
||||
def __repr__(self) -> str:
|
||||
@@ -289,13 +350,20 @@ class OfflineFindingScanner:
|
||||
if not apple_data:
|
||||
return None
|
||||
|
||||
detected_at = datetime.now().astimezone()
|
||||
|
||||
try:
|
||||
additional_data = device.details.get("props", {})
|
||||
except AttributeError:
|
||||
# Likely Windows host, where details is a '_RawAdvData' object.
|
||||
# See: https://github.com/malmeloo/FindMy.py/issues/24
|
||||
additional_data = {}
|
||||
return OfflineFindingDevice.from_ble_payload(device.address, apple_data, additional_data)
|
||||
return OfflineFindingDevice.from_ble_payload(
|
||||
device.address,
|
||||
apple_data,
|
||||
detected_at,
|
||||
additional_data,
|
||||
)
|
||||
|
||||
async def scan_for(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user