diff --git a/examples/scanner.py b/examples/scanner.py index 8f929cd..d4a0414 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -16,27 +16,6 @@ from findmy import ( logging.basicConfig(level=logging.INFO) -def _print_nearby(device: NearbyOfflineFindingDevice) -> None: - print(f"NEARBY Device - {device.mac_address}") - print(f" Status byte: {device.status:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - -def _print_separated(device: SeparatedOfflineFindingDevice) -> None: - print(f"SEPARATED Device - {device.mac_address}") - print(f" Public key: {device.adv_key_b64}") - print(f" Lookup key: {device.hashed_adv_key_b64}") - print(f" Status byte: {device.status:x}") - print(f" Hint byte: {device.hint:x}") - print(" Extra data:") - for k, v in sorted(device.additional_data.items()): - print(f" {k:20}: {v}") - print() - - async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scanner = await OfflineFindingScanner.create() @@ -46,10 +25,8 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_device = None async for device in scanner.scan_for(10, extend_timeout=True): - if isinstance(device, NearbyOfflineFindingDevice): - _print_nearby(device) - elif isinstance(device, SeparatedOfflineFindingDevice): - _print_separated(device) + if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)): + device.print_device() else: print(f"Unknown device: {device}") print() diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 1f9d7b6..365525f 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -23,6 +23,15 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +APPLE_DEVICE_TYPE = { + 0: "Apple Device", + 1: "AirTag", + 2: "Licensed 3rd Party Find My Device", + 3: "AirPods", +} + +BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"} + class OfflineFindingDevice(ABC): """Device discoverable through Apple's bluetooth-based Offline Finding protocol.""" @@ -35,12 +44,14 @@ class OfflineFindingDevice(ABC): mac_bytes: bytes, status_byte: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate an OfflineFindingDevice.""" self._mac_bytes: bytes = mac_bytes self._status: int = status_byte self._detected_at: datetime = detected_at + self._rssi: int | None = rssi self._additional_data: dict[Any, Any] = additional_data or {} @property @@ -59,16 +70,38 @@ class OfflineFindingDevice(ABC): """Timezone-aware datetime of when the device was detected.""" return self._detected_at + @property + def rssi(self) -> int | None: + """Received Signal Strength Indicator (RSSI) value.""" + return self._rssi + @property def additional_data(self) -> dict[Any, Any]: """Any additional data. No guarantees about the contents of this dictionary.""" return self._additional_data + @property + def device_type(self) -> str: + """Get the device type from status byte.""" + type_id = (self.status >> 4) & 0b00000011 + return APPLE_DEVICE_TYPE.get(type_id, "Unknown") + + @property + def battery_level(self) -> str: + """Get the battery level from status byte.""" + battery_id = (self.status >> 6) & 0b00000011 + return BATTERY_LEVEL.get(battery_id, "Unknown") + @abstractmethod def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" raise NotImplementedError + @abstractmethod + def print_device(self) -> None: + """Print human-readable information about the device.""" + raise NotImplementedError + @classmethod @abstractmethod def from_payload( @@ -76,7 +109,8 @@ class OfflineFindingDevice(ABC): mac_address: str, payload: bytes, detected_at: datetime, - additional_data: dict[Any, Any] | None, + rssi: int | None = None, + additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" raise NotImplementedError @@ -87,6 +121,7 @@ class OfflineFindingDevice(ABC): mac_address: str, ble_payload: bytes, detected_at: datetime | None = None, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> OfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" @@ -97,6 +132,7 @@ class OfflineFindingDevice(ABC): logger.debug("Unsupported OF type: %s", ble_payload[0]) return None + # Differentiate between Nearby and Separated advertisements by payload length device_type = next( ( dev @@ -113,6 +149,7 @@ class OfflineFindingDevice(ABC): mac_address, ble_payload[cls.OF_HEADER_SIZE :], detected_at or datetime.now().astimezone(), + rssi, additional_data, ) @@ -131,21 +168,27 @@ class OfflineFindingDevice(ABC): class NearbyOfflineFindingDevice(OfflineFindingDevice): """Offline-Finding device in nearby state.""" - OF_PAYLOAD_LEN = 0x02 # 2 + OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address) - def __init__( + def __init__( # noqa: PLR0913 self, mac_bytes: bytes, status_byte: int, first_adv_key_bytes: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Instantiate a NearbyOfflineFindingDevice.""" - super().__init__(mac_bytes, status_byte, detected_at, additional_data) - + super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data) + # When nearby, only the first 6 bytes of the public key are transmitted self._first_adv_key_bytes: bytes = first_adv_key_bytes + @property + def adv_key_bytes(self) -> bytes: + """Although not a full public key, still identifies device like one.""" + return self._first_adv_key_bytes + @override def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool: """Check whether the OF device's identity originates from a specific key source.""" @@ -179,14 +222,15 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> NearbyOfflineFindingDevice | None: """Get a NearbyOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", "")) @@ -203,14 +247,27 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): status_byte, partial_pubkey, detected_at, + rssi, additional_data, ) + @override + def print_device(self) -> None: + """Print human-readable information about the device.""" + logger.info("Nearby %s - %s", self.device_type, self.mac_address) + logger.info(" Status byte: 0x%x", self.status) + logger.info(" Battery lvl: %s", self.battery_level) + logger.info(" RSSI: %s", self.rssi) + logger.info(" Extra data:") + for k, v in sorted(self.additional_data.items()): + logger.info(" %s: %s", f"{k:20}", v) + logger.info("\n") + class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): """Offline-Finding device in separated state.""" - OF_PAYLOAD_LEN = 0x19 # 25 + OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint) def __init__( # noqa: PLR0913 self, @@ -219,11 +276,11 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): public_key: bytes, hint: int, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> None: """Initialize a :meth:`SeparatedOfflineFindingDevice`.""" - super().__init__(mac_bytes, status, detected_at, additional_data) - + super().__init__(mac_bytes, status, detected_at, rssi, additional_data) self._public_key: bytes = public_key self._hint: int = hint @@ -271,14 +328,15 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): mac_address: str, payload: bytes, detected_at: datetime, + rssi: int | None = None, additional_data: dict[Any, Any] | None = None, ) -> SeparatedOfflineFindingDevice | None: """Get a SeparatedOfflineFindingDevice object from an OF message payload.""" if len(payload) != cls.OF_PAYLOAD_LEN: logger.error( - "Invalid OF data length: %s instead of %s", + "Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s", len(payload), - payload[1], + cls.OF_PAYLOAD_LEN, ) return None @@ -301,9 +359,25 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): pubkey, hint, detected_at, + rssi, additional_data, ) + @override + def print_device(self) -> None: + """Print human-readable information about the device.""" + logger.info("Separated %s - %s", self.device_type, self.mac_address) + logger.info(" Public key: %s", self.adv_key_b64) + logger.info(" Lookup key: %s", self.hashed_adv_key_b64) + logger.info(" Status byte: 0x%x", self.status) + logger.info(" Battery lvl: %s", self.battery_level) + logger.info(" Hint byte: 0x%x", self.hint) + logger.info(" RSSI: %s", self.rssi) + logger.info(" Extra data:") + for k, v in sorted(self.additional_data.items()): + logger.info(" %s: %s", f"{k:20}", v) + logger.info("\n") + @override def __repr__(self) -> str: """Human-readable string representation of an OfflineFindingDevice.""" @@ -340,6 +414,38 @@ class OfflineFindingScanner: self._scanner_count: int = 0 + def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: + """Print summary of each device seen.""" + logger.info("================ RESULTS =========================") + for mac, devices in seen_devices.items(): + avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( + [d for d in devices if d.rssi is not None] + ) + logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi) + of_type = ( + "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" + ) + logger.info( + " %s with %s battery (%s state)", + devices[0].device_type, + devices[0].battery_level, + of_type, + ) + + if isinstance(devices[0], SeparatedOfflineFindingDevice): + logger.info(" Public key: %s", devices[0].adv_key_b64) + logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64) + + logger.info("===============================================") + + device_type_counts: dict[str, int] = {} + for devs in seen_devices.values(): + dev_type = devs[0].device_type + device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 + + for dev_type, count in device_type_counts.items(): + logger.info("Total %s: %d", dev_type, count) + @classmethod async def create(cls) -> OfflineFindingScanner: """Create an instance of the scanner.""" @@ -365,8 +471,10 @@ class OfflineFindingScanner: device: BLEDevice, data: AdvertisementData, ) -> None: - self._device_fut.set_result((device, data)) - self._device_fut = self._loop.create_future() + # Ensure that only one waiting coroutine is notified + if not self._device_fut.done(): + self._device_fut.set_result((device, data)) + self._device_fut = self._loop.create_future() async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None: device, data = await asyncio.wait_for(self._device_fut, timeout=timeout) @@ -377,6 +485,11 @@ class OfflineFindingScanner: detected_at = datetime.now().astimezone() + # Extract RSSI if it exists + rssi = None + if data.rssi is not None: + rssi = data.rssi + try: additional_data = device.details.get("props", {}) except AttributeError: @@ -387,6 +500,7 @@ class OfflineFindingScanner: device.address, apple_data, detected_at, + rssi, additional_data, ) @@ -405,21 +519,34 @@ class OfflineFindingScanner: await self._start_scan() stop_at = time.time() + timeout - devices_seen: set[OfflineFindingDevice] = set() + # Map MAC address to device objects (nearby doesn't send entire pubkey) + # This avoids double counting when device has different status/hint byte (but same pubkey) + devices_seen: dict[str, list[OfflineFindingDevice]] = {} try: time_left = stop_at - time.time() while time_left > 0: device = await self._wait_for_device(time_left) - if device is not None and device not in devices_seen: - devices_seen.add(device) - if extend_timeout: - stop_at = time.time() + timeout - yield device + + if device is not None: + # Check if we have already seen this device + new_device = device.mac_address not in devices_seen + + devices_seen[device.mac_address] = [ + *devices_seen.get(device.mac_address, []), + device, + ] + + if new_device: + if extend_timeout: + stop_at = time.time() + timeout + yield device time_left = stop_at - time.time() + except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() + self.print_scanning_results(devices_seen) return finally: await self._stop_scan()