diff --git a/examples/scanner.py b/examples/scanner.py index d4a0414..893d574 100644 --- a/examples/scanner.py +++ b/examples/scanner.py @@ -24,9 +24,14 @@ async def scan(check_key: KeyPair | FindMyAccessory | None = None) -> bool: scan_device = None - async for device in scanner.scan_for(10, extend_timeout=True): + scan_out_file = Path("scan_results.jsonl") + # Clear previous scan results + if scan_out_file.exists(): + scan_out_file.unlink() + + async for device in scanner.scan_for(10, extend_timeout=True, print_summary=True): if isinstance(device, (SeparatedOfflineFindingDevice, NearbyOfflineFindingDevice)): - device.print_device() + device.print_device(out_file=scan_out_file) else: print(f"Unknown device: {device}") print() diff --git a/findmy/scanner/scanner.py b/findmy/scanner/scanner.py index 365525f..8c4739f 100644 --- a/findmy/scanner/scanner.py +++ b/findmy/scanner/scanner.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import json import logging import time from abc import ABC, abstractmethod @@ -17,6 +18,7 @@ from findmy.keys import HasPublicKey if TYPE_CHECKING: from collections.abc import AsyncGenerator + from pathlib import Path from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData @@ -24,13 +26,13 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) APPLE_DEVICE_TYPE = { - 0: "Apple Device", - 1: "AirTag", - 2: "Licensed 3rd Party Find My Device", - 3: "AirPods", + 0b00: "Apple Device", + 0b01: "AirTag", + 0b10: "Licensed 3rd Party Find My Device", + 0b11: "AirPods", } -BATTERY_LEVEL = {0: "Full", 1: "Medium", 2: "Low", 3: "Very Low"} +BATTERY_LEVEL = {0b00: "Full", 0b01: "Medium", 0b10: "Low", 0b11: "Very Low"} class OfflineFindingDevice(ABC): @@ -98,8 +100,8 @@ class OfflineFindingDevice(ABC): raise NotImplementedError @abstractmethod - def print_device(self) -> None: - """Print human-readable information about the device.""" + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" raise NotImplementedError @classmethod @@ -185,7 +187,7 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): self._first_adv_key_bytes: bytes = first_adv_key_bytes @property - def adv_key_bytes(self) -> bytes: + def partial_adv_key(self) -> bytes: """Although not a full public key, still identifies device like one.""" return self._first_adv_key_bytes @@ -252,16 +254,33 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice): ) @override - def print_device(self) -> None: - """Print human-readable information about the device.""" - logger.info("Nearby %s - %s", self.device_type, self.mac_address) - logger.info(" Status byte: 0x%x", self.status) - logger.info(" Battery lvl: %s", self.battery_level) - logger.info(" RSSI: %s", self.rssi) - logger.info(" Extra data:") - for k, v in sorted(self.additional_data.items()): - logger.info(" %s: %s", f"{k:20}", v) - logger.info("\n") + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + if out_file: + data = { + "mac_address": self.mac_address, + "status": self.status, + "device_type": self.device_type, + "battery_level": self.battery_level, + "rssi": self.rssi, + "detected_at": self.detected_at.isoformat(), + "additional_data": self.additional_data, + } + + with out_file.open("a", encoding="utf-8") as f: + # Indent json for readability + json.dump(data, f, indent=4) + f.write("\n") + else: + print(f"Nearby {self.device_type} - {self.mac_address}") + print(f" Status byte: 0x{self.status:x}") + print(f" Battery lvl: {self.battery_level}") + print(f" RSSI: {self.rssi}") + print(" Extra data:") + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}") + print("\n") class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): @@ -364,19 +383,38 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): ) @override - def print_device(self) -> None: - """Print human-readable information about the device.""" - logger.info("Separated %s - %s", self.device_type, self.mac_address) - logger.info(" Public key: %s", self.adv_key_b64) - logger.info(" Lookup key: %s", self.hashed_adv_key_b64) - logger.info(" Status byte: 0x%x", self.status) - logger.info(" Battery lvl: %s", self.battery_level) - logger.info(" Hint byte: 0x%x", self.hint) - logger.info(" RSSI: %s", self.rssi) - logger.info(" Extra data:") - for k, v in sorted(self.additional_data.items()): - logger.info(" %s: %s", f"{k:20}", v) - logger.info("\n") + def print_device(self, out_file: Path | None = None) -> None: + """Print human-readable information about the device to stdout or file.""" + # ruff: noqa: T201 + if out_file: + data = { + "mac_address": self.mac_address, + "public_key": self.adv_key_b64, + "lookup_key": self.hashed_adv_key_b64, + "status": self.status, + "device_type": self.device_type, + "battery_level": self.battery_level, + "hint": self.hint, + "rssi": self.rssi, + "detected_at": self.detected_at.isoformat(), + "additional_data": self.additional_data, + } + + with out_file.open("a", encoding="utf-8") as f: + json.dump(data, f, indent=4) + f.write("\n") + else: + print(f"Separated {self.device_type} - {self.mac_address}") + print(f" Public key: {self.adv_key_b64}") + print(f" Lookup key: {self.hashed_adv_key_b64}") + print(f" Status byte: 0x{self.status:x}") + print(f" Battery lvl: {self.battery_level}") + print(f" Hint byte: 0x{self.hint:x}") + print(f" RSSI: {self.rssi}") + print(" Extra data:") + for k, v in sorted(self.additional_data.items()): + print(f" {k:20}: {v}") + print("\n") @override def __repr__(self) -> str: @@ -416,27 +454,25 @@ class OfflineFindingScanner: def print_scanning_results(self, seen_devices: dict[str, list[OfflineFindingDevice]]) -> None: """Print summary of each device seen.""" - logger.info("================ RESULTS =========================") + # ruff: noqa: T201 + print("================ RESULTS =========================") for mac, devices in seen_devices.items(): avg_rssi = sum(d.rssi for d in devices if d.rssi is not None) / len( [d for d in devices if d.rssi is not None] ) - logger.info("Device %s seen %d times, average RSSI: %.1f", mac, len(devices), avg_rssi) + print(f"Device {mac} seen {len(devices)} times, average RSSI: {avg_rssi:.1f}") of_type = ( "nearby" if isinstance(devices[0], NearbyOfflineFindingDevice) else "separated" ) - logger.info( - " %s with %s battery (%s state)", - devices[0].device_type, - devices[0].battery_level, - of_type, + print( + f" {devices[0].device_type} with {devices[0].battery_level} battery" + f" ({of_type} state)" ) if isinstance(devices[0], SeparatedOfflineFindingDevice): - logger.info(" Public key: %s", devices[0].adv_key_b64) - logger.info(" Lookup key: %s", devices[0].hashed_adv_key_b64) - - logger.info("===============================================") + print(f" Public key: {devices[0].adv_key_b64}") + print(f" Lookup key: {devices[0].hashed_adv_key_b64}") + print("===============================================") device_type_counts: dict[str, int] = {} for devs in seen_devices.values(): @@ -444,7 +480,7 @@ class OfflineFindingScanner: device_type_counts[dev_type] = device_type_counts.get(dev_type, 0) + 1 for dev_type, count in device_type_counts.items(): - logger.info("Total %s: %d", dev_type, count) + print(f"Total {dev_type}: {count}") @classmethod async def create(cls) -> OfflineFindingScanner: @@ -485,10 +521,8 @@ class OfflineFindingScanner: detected_at = datetime.now().astimezone() - # Extract RSSI if it exists - rssi = None - if data.rssi is not None: - rssi = data.rssi + # Extract RSSI (which may be None) + rssi = data.rssi try: additional_data = device.details.get("props", {}) @@ -509,6 +543,7 @@ class OfflineFindingScanner: timeout: float = 10, *, extend_timeout: bool = False, + print_summary: bool = False, ) -> AsyncGenerator[OfflineFindingDevice, None]: """ Scan for :meth:`OfflineFindingDevice`s for up to :meth:`timeout` seconds. @@ -546,7 +581,8 @@ class OfflineFindingScanner: except asyncio.TimeoutError: # timeout reached self._device_fut = self._loop.create_future() - self.print_scanning_results(devices_seen) + if print_summary: + self.print_scanning_results(devices_seen) return finally: await self._stop_scan()