Extract rssi/device type, fix crash, and move device printing

This commit is contained in:
Andrew Gaylord
2025-11-21 11:52:53 -05:00
parent c0707dcd35
commit 688369b8b4
2 changed files with 149 additions and 45 deletions

View File

@@ -16,27 +16,6 @@ from findmy import (
logging.basicConfig(level=logging.INFO)
def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
print(f"NEARBY Device - {device.mac_address}")
print(f" Status byte: {device.status:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()
def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
print(f"SEPARATED Device - {device.mac_address}")
print(f" Public key: {device.adv_key_b64}")
print(f" Lookup key: {device.hashed_adv_key_b64}")
print(f" Status byte: {device.status:x}")
print(f" Hint byte: {device.hint:x}")
print(" Extra data:")
for k, v in sorted(device.additional_data.items()):
print(f" {k:20}: {v}")
print()
async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool:
scanner = await OfflineFindingScanner.create()
@@ -46,10 +25,8 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool:
scan_device = None
async for device in scanner.scan_for(10, extend_timeout=True):
if isinstance(device, NearbyOfflineFindingDevice):
_print_nearby(device)
elif isinstance(device, SeparatedOfflineFindingDevice):
_print_separated(device)
if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)):
device.print_device()
else:
print(f"Unknown device: {device}")
print()

View File

@@ -23,6 +23,15 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
APPLE_DEVICE_TYPE = {
0: "Apple Device",
1: "AirTag",
2: "Licensed 3rd Party Find My Device",
3: "AirPods",
}
BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"}
class OfflineFindingDevice(ABC):
"""Device discoverable through Apple's bluetooth-based Offline Finding protocol."""
@@ -35,12 +44,14 @@ class OfflineFindingDevice(ABC):
mac_bytes: bytes,
status_byte: int,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate an OfflineFindingDevice."""
self._mac_bytes: bytes = mac_bytes
self._status: int = status_byte
self._detected_at: datetime = detected_at
self._rssi: int | None = rssi
self._additional_data: dict[Any, Any] = additional_data or {}
@property
@@ -59,16 +70,38 @@ class OfflineFindingDevice(ABC):
"""Timezone-aware datetime of when the device was detected."""
return self._detected_at
@property
def rssi(self) -> int | None:
"""Received Signal Strength Indicator (RSSI) value."""
return self._rssi
@property
def additional_data(self) -> dict[Any, Any]:
"""Any additional data. No guarantees about the contents of this dictionary."""
return self._additional_data
@property
def device_type(self) -> str:
"""Get the device type from status byte."""
type_id = (self.status >> 4) & 0b00000011
return APPLE_DEVICE_TYPE.get(type_id, "Unknown")
@property
def battery_level(self) -> str:
"""Get the battery level from status byte."""
battery_id = (self.status >> 6) & 0b00000011
return BATTERY_LEVEL.get(battery_id, "Unknown")
@abstractmethod
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
raise NotImplementedError
@abstractmethod
def print_device(self) -> None:
"""Print human-readable information about the device."""
raise NotImplementedError
@classmethod
@abstractmethod
def from_payload(
@@ -76,7 +109,8 @@ class OfflineFindingDevice(ABC):
mac_address: str,
payload: bytes,
detected_at: datetime,
additional_data: dict[Any, Any] | None,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
raise NotImplementedError
@@ -87,6 +121,7 @@ class OfflineFindingDevice(ABC):
mac_address: str,
ble_payload: bytes,
detected_at: datetime | None = None,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
@@ -97,6 +132,7 @@ class OfflineFindingDevice(ABC):
logger.debug("Unsupported OF type: %s", ble_payload[0])
return None
# Differentiate between Nearby and Separated advertisements by payload length
device_type = next(
(
dev
@@ -113,6 +149,7 @@ class OfflineFindingDevice(ABC):
mac_address,
ble_payload[cls.OF_HEADER_SIZE :],
detected_at or datetime.now().astimezone(),
rssi,
additional_data,
)
@@ -131,21 +168,27 @@ class OfflineFindingDevice(ABC):
class NearbyOfflineFindingDevice(OfflineFindingDevice):
"""Offline-Finding device in nearby state."""
OF_PAYLOAD_LEN = 0x02 # 2
OF_PAYLOAD_LEN = 0x02 # 2 bytes (status, 2 bits of public key/mac address)
def __init__(
def __init__( # noqa: PLR0913
self,
mac_bytes: bytes,
status_byte: int,
first_adv_key_bytes: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Instantiate a NearbyOfflineFindingDevice."""
super().__init__(mac_bytes, status_byte, detected_at, additional_data)
super().__init__(mac_bytes, status_byte, detected_at, rssi, additional_data)
# When nearby, only the first 6 bytes of the public key are transmitted
self._first_adv_key_bytes: bytes = first_adv_key_bytes
@property
def adv_key_bytes(self) -> bytes:
"""Although not a full public key, still identifies device like one."""
return self._first_adv_key_bytes
@override
def is_from(self, other_device: HasPublicKey | RollingKeyPairSource) -> bool:
"""Check whether the OF device's identity originates from a specific key source."""
@@ -179,14 +222,15 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
mac_address: str,
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
logger.error(
"Invalid OF data length: %s instead of %s",
"Invalid OF data length for NearbyOfflineFindingDevice: %s instead of %s",
len(payload),
payload[1],
cls.OF_PAYLOAD_LEN,
)
mac_bytes = bytes.fromhex(mac_address.replace(":", "").replace("-", ""))
@@ -203,14 +247,27 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
status_byte,
partial_pubkey,
detected_at,
rssi,
additional_data,
)
@override
def print_device(self) -> None:
"""Print human-readable information about the device."""
logger.info("Nearby %s - %s", self.device_type, self.mac_address)
logger.info(" Status byte: 0x%x", self.status)
logger.info(" Battery lvl: %s", self.battery_level)
logger.info(" RSSI: %s", self.rssi)
logger.info(" Extra data:")
for k, v in sorted(self.additional_data.items()):
logger.info(" %s: %s", f"{k:20}", v)
logger.info("\n")
class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
"""Offline-Finding device in separated state."""
OF_PAYLOAD_LEN = 0x19 # 25
OF_PAYLOAD_LEN = 0x19 # 25 bytes (status, pubkey(22), first 2 bits of MAC/pubkey, hint)
def __init__( # noqa: PLR0913
self,
@@ -219,11 +276,11 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
public_key: bytes,
hint: int,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> None:
"""Initialize a :meth:`SeparatedOfflineFindingDevice`."""
super().__init__(mac_bytes, status, detected_at, additional_data)
super().__init__(mac_bytes, status, detected_at, rssi, additional_data)
self._public_key: bytes = public_key
self._hint: int = hint
@@ -271,14 +328,15 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
mac_address: str,
payload: bytes,
detected_at: datetime,
rssi: int | None = None,
additional_data: dict[Any, Any] | None = None,
) -> SeparatedOfflineFindingDevice | None:
"""Get a SeparatedOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.OF_PAYLOAD_LEN:
logger.error(
"Invalid OF data length: %s instead of %s",
"Invalid OF data length for SeparatedOfflineFindingDevice: %s instead of %s",
len(payload),
payload[1],
cls.OF_PAYLOAD_LEN,
)
return None
@@ -301,9 +359,25 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
pubkey,
hint,
detected_at,
rssi,
additional_data,
)
@override
def print_device(self) -> None:
"""Print human-readable information about the device."""
logger.info("Separated %s - %s", self.device_type, self.mac_address)
logger.info(" Public key: %s", self.adv_key_b64)
logger.info(" Lookup key: %s", self.hashed_adv_key_b64)
logger.info(" Status byte: 0x%x", self.status)
logger.info(" Battery lvl: %s", self.battery_level)
logger.info(" Hint byte: 0x%x", self.hint)
logger.info(" RSSI: %s", self.rssi)
logger.info(" Extra data:")
for k, v in sorted(self.additional_data.items()):
logger.info(" %s: %s", f"{k:20}", v)
logger.info("\n")
@override
def __repr__(self) -> str:
"""Human-readable string representation of an OfflineFindingDevice."""
@@ -340,6 +414,38 @@ class OfflineFindingScanner:
self._scanner_count: int = 0
def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None:
"""Print summary of each device seen."""
logger.info("================ RESULTS =========================")
for mac, devices in seen_devices.items():
avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len(
[d for d in devices if d.rssi is not None]
)
logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi)
of_type = (
"nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated"
)
logger.info(
" %s with %s battery (%s state)",
devices[0].device_type,
devices[0].battery_level,
of_type,
)
if isinstance(devices[0], SeparatedOfflineFindingDevice):
logger.info(" Public key: %s", devices[0].adv_key_b64)
logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64)
logger.info("===============================================")
device_type_counts: dict[str, int] = {}
for devs in seen_devices.values():
dev_type = devs[0].device_type
device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1
for dev_type, count in device_type_counts.items():
logger.info("Total %s: %d", dev_type, count)
@classmethod
async def create(cls) -> OfflineFindingScanner:
"""Create an instance of the scanner."""
@@ -365,8 +471,10 @@ class OfflineFindingScanner:
device: BLEDevice,
data: AdvertisementData,
) -> None:
self._device_fut.set_result((device, data))
self._device_fut = self._loop.create_future()
# Ensure that only one waiting coroutine is notified
if not self._device_fut.done():
self._device_fut.set_result((device, data))
self._device_fut = self._loop.create_future()
async def _wait_for_device(self, timeout: float) -> OfflineFindingDevice | None:
device, data = await asyncio.wait_for(self._device_fut, timeout=timeout)
@@ -377,6 +485,11 @@ class OfflineFindingScanner:
detected_at = datetime.now().astimezone()
# Extract RSSI if it exists
rssi = None
if data.rssi is not None:
rssi = data.rssi
try:
additional_data = device.details.get("props", {})
except AttributeError:
@@ -387,6 +500,7 @@ class OfflineFindingScanner:
device.address,
apple_data,
detected_at,
rssi,
additional_data,
)
@@ -405,21 +519,34 @@ class OfflineFindingScanner:
await self._start_scan()
stop_at = time.time() + timeout
devices_seen: set[OfflineFindingDevice] = set()
# Map MAC address to device objects (nearby doesn't send entire pubkey)
# This avoids double counting when device has different status/hint byte (but same pubkey)
devices_seen: dict[str, list[OfflineFindingDevice]] = {}
try:
time_left = stop_at - time.time()
while time_left > 0:
device = await self._wait_for_device(time_left)
if device is not None and device not in devices_seen:
devices_seen.add(device)
if extend_timeout:
stop_at = time.time() + timeout
yield device
if device is not None:
# Check if we have already seen this device
new_device = device.mac_address not in devices_seen
devices_seen[device.mac_address] = [
*devices_seen.get(device.mac_address, []),
device,
]
if new_device:
if extend_timeout:
stop_at = time.time() + timeout
yield device
time_left = stop_at - time.time()
except asyncio.TimeoutError: # timeout reached
self._device_fut = self._loop.create_future()
self.print_scanning_results(devices_seen)
return
finally:
await self._stop_scan()