Merge branch 'main' into feat/better-docs

This commit is contained in:
Mike A.
2025-08-04 15:11:51 +02:00
43 changed files with 3751 additions and 2602 deletions

1
.github/FUNDING.yml vendored
View File

@@ -1 +0,0 @@
github: [malmeloo]

View File

@@ -1,43 +1,25 @@
name: Common Python + Poetry Setup name: Common Python + UV Setup
inputs: inputs:
dependency-groups:
description: 'A comma-separated list of dependency groups to install'
default: 'main'
python-version: python-version:
description: 'The Python version to use' description: 'The Python version to install'
default: '3.10' required: false
runs: runs:
using: 'composite' using: 'composite'
steps: steps:
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
python-version: ${{ matrix.python-version }}
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5 if: ${{ inputs.python-version != '' }}
with:
python-version: ${{ inputs.python-version }}
- name: Install poetry
shell: bash shell: bash
run: | run: uv python install
python -m pip install poetry
poetry config virtualenvs.in-project true
- name: Get cache key
id: cache-key
shell: bash
run: |
key=$(echo "${{ inputs.dependency-groups }}" | sed 's/,/+/')
echo "key=$key" >> "$GITHUB_OUTPUT"
- name: Load cached venv
id: cache-dependencies
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-python-${{ inputs.python-version }}-groups-${{ steps.cache-key.outputs.key }}-${{ hashFiles('**/poetry.lock') }}
- name: Install dependencies - name: Install dependencies
if: steps.cache-dependencies.outputs.cache-hit != 'true'
shell: bash shell: bash
run: poetry install --with ${{ inputs.dependency-groups }} run: uv sync --all-extras --all-groups

View File

@@ -17,14 +17,15 @@ jobs:
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: './.github/actions/setup-project' - name: Install uv and set the python version
uses: astral-sh/setup-uv@v6
with: with:
dependency-groups: 'docs' python-version: ${{ matrix.python-version }}
- name: Build documentation - name: Build documentation
run: | run: |
cd docs cd docs
poetry run make html uv run make html
- name: Setup Pages - name: Setup Pages
uses: actions/configure-pages@v5 uses: actions/configure-pages@v5

View File

@@ -3,6 +3,8 @@ name: Pre-commit
on: on:
workflow_dispatch: workflow_dispatch:
push: push:
branches: [main]
pull_request:
jobs: jobs:
check: check:
@@ -12,10 +14,8 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: './.github/actions/setup-project' - uses: './.github/actions/setup-project'
with:
dependency-groups: 'dev,test'
- uses: pre-commit/action@v3.0.1 - uses: pre-commit/action@v3.0.1
- uses: pre-commit-ci/lite-action@v1.0.2 - uses: pre-commit-ci/lite-action@v1.1.0
if: always() if: always()

View File

@@ -17,19 +17,16 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: './.github/actions/setup-project' - uses: './.github/actions/setup-project'
with:
dependency-groups: 'dev'
- name: Prepare README - name: Prepare README
run: ./scripts/refactor_readme.py README.md run: ./scripts/refactor_readme.py README.md
- name: Build package - name: Build package
run: poetry build run: uv build
- name: Publish package - name: Publish package
run: | run: |
poetry config pypi-token.pypi ${{ secrets.PYPI_API_TOKEN }} uv publish --token ${{ secrets.PYPI_API_TOKEN }}
poetry publish
- name: Create release - name: Create release
uses: softprops/action-gh-release@v2 uses: softprops/action-gh-release@v2

View File

@@ -3,6 +3,8 @@ name: Run unit tests
on: on:
workflow_dispatch: workflow_dispatch:
push: push:
branches: [main]
pull_request:
jobs: jobs:
versions: versions:
@@ -15,14 +17,12 @@ jobs:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: './.github/actions/setup-project' - uses: './.github/actions/setup-project'
with:
dependency-groups: 'dev'
- id: supported-versions - id: supported-versions
name: Get supported versions name: Get supported versions
run: | run: |
set -e set -e
echo "py-versions=$(poetry run ./scripts/supported_py_versions.py)" >> "$GITHUB_OUTPUT" echo "py-versions=$(uv run ./scripts/supported_py_versions.py)" >> "$GITHUB_OUTPUT"
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -38,10 +38,9 @@ jobs:
- uses: './.github/actions/setup-project' - uses: './.github/actions/setup-project'
with: with:
python-version: ${{ matrix.py-version }} python-version: ${{ matrix.py-version }}
dependency-groups: 'test'
- name: Run unit tests - name: Run unit tests
run: poetry run pytest run: uv run pytest
results: results:
runs-on: ubuntu-latest runs-on: ubuntu-latest

1
.gitignore vendored
View File

@@ -164,3 +164,4 @@ account.json
airtag.plist airtag.plist
DO_NOT_COMMIT* DO_NOT_COMMIT*
.direnv/ .direnv/
accessories/

View File

@@ -1,11 +1,31 @@
default_install_hook_types:
- pre-commit
- post-checkout
- post-merge
- post-rewrite
repos: repos:
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/uv-pre-commit
rev: v0.6.3 rev: 0.8.4
hooks: hooks:
- id: ruff - id: uv-sync
args: ["--fix"] args: ["--all-groups"]
- id: ruff-format
- repo: https://github.com/RobertCraigie/pyright-python - repo: local
rev: v1.1.378
hooks: hooks:
- id: pyright - id: uv-basedpyright
name: Run basedpyright via uv
entry: uv run basedpyright
language: system
types: [python]
- id: uv-ruff-check
name: Run ruff check via uv
entry: uv run ruff check --fix
language: system
types: [python]
- id: uv-ruff-fmt
name: Run ruff format via uv
entry: uv run ruff format
language: system
types: [python]

View File

@@ -1,13 +1,27 @@
# FindMy.py <div align="center">
<img alt="FindMy.py Logo" src="assets/icon.png" width="500">
<h1>FindMy.py</h1>
</div>
[![](https://img.shields.io/pypi/v/FindMy)](https://pypi.org/project/FindMy/) <div align="center">
[![](https://img.shields.io/pypi/dm/FindMy)](#)
[![](https://img.shields.io/github/license/malmeloo/FindMy.py)](LICENSE.md)
[![](https://img.shields.io/pypi/pyversions/FindMy)](#)
The all-in-one library that provides everything you need _Query Apple's FindMy network with Python!_
to query Apple's FindMy network!
<h5>
<a href="https://docs.mikealmel.ooo/FindMy.py">
Docs
</a>
<span> | </span>
<a href="examples/">
Examples
</a>
<span> | </span>
<a href="https://pypi.org/project/FindMy/">
PyPI
</a>
</div>
## 🚀 Overview
The current "Find My-scene" is quite fragmented, with code The current "Find My-scene" is quite fragmented, with code
being all over the place across multiple repositories, being all over the place across multiple repositories,
written by [several authors](#Credits). This project aims to written by [several authors](#Credits). This project aims to
@@ -22,7 +36,7 @@ application wishing to integrate with the Find My network.
> You are encouraged to report any issues you can find on the > You are encouraged to report any issues you can find on the
> [issue tracker](https://github.com/malmeloo/FindMy.py/issues/)! > [issue tracker](https://github.com/malmeloo/FindMy.py/issues/)!
### Features ## 🧪 Features
- [x] Cross-platform: no Mac needed - [x] Cross-platform: no Mac needed
- [x] Fetch and decrypt location reports - [x] Fetch and decrypt location reports
@@ -36,12 +50,7 @@ application wishing to integrate with the Find My network.
- [x] Import or create your own accessory keys - [x] Import or create your own accessory keys
- [x] Both async and sync APIs - [x] Both async and sync APIs
### Roadmap ## 📥 Installation
- [ ] Local anisette generation (without server)
- More information: [#2](https://github.com/malmeloo/FindMy.py/issues/2)
## Installation
The package can be installed from [PyPi](https://pypi.org/project/findmy/): The package can be installed from [PyPi](https://pypi.org/project/findmy/):
@@ -49,9 +58,11 @@ The package can be installed from [PyPi](https://pypi.org/project/findmy/):
pip install findmy pip install findmy
``` ```
For usage examples, see the [examples](examples) directory. Documentation can be found [here](http://docs.mikealmel.ooo/FindMy.py/). For usage examples, see the [examples](examples) directory.
We are also building out a CLI. Try `python -m findmy` to see the current state of it.
Documentation can be found [here](http://docs.mikealmel.ooo/FindMy.py/).
## Contributing ## 🤝 Contributing
Want to contribute code? That's great! For new features, please open an Want to contribute code? That's great! For new features, please open an
[issue](https://github.com/malmeloo/FindMy.py/issues) first so we can discuss. [issue](https://github.com/malmeloo/FindMy.py/issues) first so we can discuss.
@@ -61,22 +72,26 @@ Before opening a pull request, please ensure that your code adheres to these rul
There are pre-commit hooks included to help you with this, which you can set up as follows: There are pre-commit hooks included to help you with this, which you can set up as follows:
```shell ```shell
pip install poetry ruff pip install uv
poetry install # this installs pre-commit into your environment uv sync # this installs ruff & pre-commit into your environment
pre-commit install pre-commit install
``` ```
After following the above steps, your code will be linted and formatted automatically After following the above steps, your code will be linted and formatted automatically
before committing it. before committing it.
## Derivative projects ## 🧠 Derivative projects
There are several other cool projects based on this library! Some of them have been listed below, make sure to check them out as well. There are several other cool projects based on this library! Some of them have been listed below, make sure to check them out as well.
* [OfflineFindRecovery](https://github.com/hajekj/OfflineFindRecovery) - Set of scripts to be able to precisely locate your lost MacBook via Apple's Offline Find through Bluetooth Low Energy. * [OfflineFindRecovery](https://github.com/hajekj/OfflineFindRecovery) - Set of scripts to precisely locate your lost MacBook.
* [SwiftFindMy](https://github.com/airy10/SwiftFindMy) - Swift port of FindMy.py * [SwiftFindMy](https://github.com/airy10/SwiftFindMy) - Swift port of FindMy.py.
* [FindMy Home Assistant (1)](https://github.com/malmeloo/hass-FindMy) - Home Assistant integration made by the author of FindMy.py.
* [FindMy Home Assistant (2)](github.com/krmax44/homeassistant-findmy) - Home Assistant integration made by [krmax44](https://github.com/krmax44).
* [OpenTagViewer](https://github.com/parawanderer/OpenTagViewer) - Android App to locate your AirTags.
* [Find My Dad](https://github.com/NickCrews/findmydad) - Geofencing application for AirTags using Google Sheets and SMS.
## Credits ## 🏅 Credits
While I designed the library, the vast majority of actual functionality While I designed the library, the vast majority of actual functionality
is made possible by the following wonderful people and organizations: is made possible by the following wonderful people and organizations:

BIN
assets/banner.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 915 KiB

BIN
assets/icon.gox Normal file

Binary file not shown.

BIN
assets/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 227 KiB

View File

@@ -1,4 +1,5 @@
# Configuration file for the Sphinx documentation builder. """Configuration file for the Sphinx documentation builder."""
# ruff: noqa: A001
# #
# For the full list of built-in configuration values, see the documentation: # For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html # https://www.sphinx-doc.org/en/master/usage/configuration.html
@@ -19,7 +20,7 @@ extensions = [
"sphinx.ext.duration", "sphinx.ext.duration",
"sphinx.ext.autodoc", "sphinx.ext.autodoc",
"sphinx.ext.inheritance_diagram", "sphinx.ext.inheritance_diagram",
"autoapi.extension" "autoapi.extension",
] ]
templates_path = ["_templates"] templates_path = ["_templates"]

View File

@@ -1,18 +1,13 @@
# ruff: noqa: ASYNC230 from __future__ import annotations
import json
from pathlib import Path
from findmy.reports import ( from findmy.reports import (
AppleAccount, AppleAccount,
AsyncAppleAccount, AsyncAppleAccount,
BaseAnisetteProvider,
LoginState, LoginState,
SmsSecondFactorMethod, SmsSecondFactorMethod,
TrustedDeviceSecondFactorMethod, TrustedDeviceSecondFactorMethod,
) )
from findmy.reports.anisette import LocalAnisetteProvider, RemoteAnisetteProvider
ACCOUNT_STORE = "account.json"
def _login_sync(account: AppleAccount) -> None: def _login_sync(account: AppleAccount) -> None:
@@ -69,35 +64,45 @@ async def _login_async(account: AsyncAppleAccount) -> None:
await method.submit(code) await method.submit(code)
def get_account_sync(anisette: BaseAnisetteProvider) -> AppleAccount: def get_account_sync(
store_path: str,
anisette_url: str | None,
libs_path: str | None,
) -> AppleAccount:
"""Tries to restore a saved Apple account, or prompts the user for login otherwise. (sync)""" """Tries to restore a saved Apple account, or prompts the user for login otherwise. (sync)"""
acc = AppleAccount(anisette)
# Save / restore account logic
acc_store = Path("account.json")
try: try:
with acc_store.open() as f: acc = AppleAccount.from_json(store_path, anisette_libs_path=libs_path)
acc.restore(json.load(f))
except FileNotFoundError: except FileNotFoundError:
ani = (
LocalAnisetteProvider(libs_path=libs_path)
if anisette_url is None
else RemoteAnisetteProvider(anisette_url)
)
acc = AppleAccount(ani)
_login_sync(acc) _login_sync(acc)
with acc_store.open("w+") as f:
json.dump(acc.export(), f) acc.to_json(store_path)
return acc return acc
async def get_account_async(anisette: BaseAnisetteProvider) -> AsyncAppleAccount: async def get_account_async(
store_path: str,
anisette_url: str | None,
libs_path: str | None,
) -> AsyncAppleAccount:
"""Tries to restore a saved Apple account, or prompts the user for login otherwise. (async)""" """Tries to restore a saved Apple account, or prompts the user for login otherwise. (async)"""
acc = AsyncAppleAccount(anisette)
# Save / restore account logic
acc_store = Path("account.json")
try: try:
with acc_store.open() as f: acc = AsyncAppleAccount.from_json(store_path, anisette_libs_path=libs_path)
acc.restore(json.load(f))
except FileNotFoundError: except FileNotFoundError:
ani = (
LocalAnisetteProvider(libs_path=libs_path)
if anisette_url is None
else RemoteAnisetteProvider(anisette_url)
)
acc = AsyncAppleAccount(ani)
await _login_async(acc) await _login_async(acc)
with acc_store.open("w+") as f:
json.dump(acc.export(), f) acc.to_json(store_path)
return acc return acc

View File

@@ -1,5 +1,8 @@
from __future__ import annotations
import asyncio import asyncio
import logging import logging
import sys
from findmy import KeyPair from findmy import KeyPair
from findmy.scanner import ( from findmy.scanner import (
@@ -10,11 +13,6 @@ from findmy.scanner import (
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
# Set if you want to check whether a specific key (or accessory!) is in the scan results.
# Make sure to enter its private key!
# Leave empty (= None) to not check.
CHECK_KEY = KeyPair.from_b64("")
def _print_nearby(device: NearbyOfflineFindingDevice) -> None: def _print_nearby(device: NearbyOfflineFindingDevice) -> None:
print(f"NEARBY Device - {device.mac_address}") print(f"NEARBY Device - {device.mac_address}")
@@ -37,7 +35,7 @@ def _print_separated(device: SeparatedOfflineFindingDevice) -> None:
print() print()
async def scan() -> None: async def scan(check_key: KeyPair | None = None) -> None:
scanner = await OfflineFindingScanner.create() scanner = await OfflineFindingScanner.create()
print("Scanning for FindMy-devices...") print("Scanning for FindMy-devices...")
@@ -55,14 +53,18 @@ async def scan() -> None:
print() print()
continue continue
if CHECK_KEY and device.is_from(CHECK_KEY): if check_key and device.is_from(check_key):
scan_device = device scan_device = device
if scan_device: if scan_device:
print("Key or accessory was found in scan results! :D") print("Key or accessory was found in scan results! :D")
elif CHECK_KEY: elif check_key:
print("Selected key or accessory was not found in scan results... :c") print("Selected key or accessory was not found in scan results... :c")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(scan()) key = None
if len(sys.argv) >= 2:
key = KeyPair.from_b64(sys.argv[1])
asyncio.run(scan(key))

View File

@@ -4,28 +4,47 @@ import sys
from _login import get_account_sync from _login import get_account_sync
from findmy import KeyPair from findmy import KeyPair
from findmy.reports import RemoteAnisetteProvider
# URL to (public or local) anisette server # Path where login session will be stored.
ANISETTE_SERVER = "http://localhost:6969" # This is necessary to avoid generating a new session every time we log in.
STORE_PATH = "account.json"
# URL to LOCAL anisette server. Set to None to use built-in Anisette generator instead (recommended)
# IF YOU USE A PUBLIC SERVER, DO NOT COMPLAIN THAT YOU KEEP RUNNING INTO AUTHENTICATION ERRORS!
# If you change this value, make sure to remove the account store file.
ANISETTE_SERVER = None
# Path where Anisette libraries will be stored.
# This is only relevant when using the built-in Anisette server.
# It can be omitted (set to None) to avoid saving to disk,
# but specifying a path is highly recommended to avoid downloading the bundle on every run.
ANISETTE_LIBS_PATH = "ani_libs.bin"
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
def fetch_reports(priv_key: str) -> int: def fetch_reports(priv_key: str) -> int:
key = KeyPair.from_b64(priv_key) # Step 0: construct an account instance
acc = get_account_sync( # We use a helper for this to simplify interactive authentication
RemoteAnisetteProvider(ANISETTE_SERVER), acc = get_account_sync(STORE_PATH, ANISETTE_SERVER, ANISETTE_LIBS_PATH)
)
print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})") print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})")
# It's that simple! # Step 1: construct a key object and get its location reports
key = KeyPair.from_b64(priv_key)
reports = acc.fetch_last_reports(key) reports = acc.fetch_last_reports(key)
# Step 2: print the reports!
for report in sorted(reports): for report in sorted(reports):
print(report) print(report)
return 1 # We can save the report to a file if we want
report.to_json("last_report.json")
# Step 3: Make sure to save account state when you're done!
acc.to_json(STORE_PATH)
return 0
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -5,30 +5,49 @@ import sys
from _login import get_account_async from _login import get_account_async
from findmy import KeyPair from findmy import KeyPair
from findmy.reports import RemoteAnisetteProvider
# URL to (public or local) anisette server # Path where login session will be stored.
ANISETTE_SERVER = "http://localhost:6969" # This is necessary to avoid generating a new session every time we log in.
STORE_PATH = "account.json"
# URL to LOCAL anisette server. Set to None to use built-in Anisette generator instead (recommended)
# IF YOU USE A PUBLIC SERVER, DO NOT COMPLAIN THAT YOU KEEP RUNNING INTO AUTHENTICATION ERRORS!
# If you change this value, make sure to remove the account store file.
ANISETTE_SERVER = None
# Path where Anisette libraries will be stored.
# This is only relevant when using the built-in Anisette server.
# It can be omitted (set to None) to avoid saving to disk,
# but specifying a path is highly recommended to avoid downloading the bundle on every run.
ANISETTE_LIBS_PATH = "ani_libs.bin"
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
async def fetch_reports(priv_key: str) -> int: async def fetch_reports(priv_key: str) -> int:
key = KeyPair.from_b64(priv_key) # Step 0: construct an account instance
acc = await get_account_async( # We use a helper for this to simplify interactive authentication
RemoteAnisetteProvider(ANISETTE_SERVER), acc = await get_account_async(STORE_PATH, ANISETTE_SERVER, ANISETTE_LIBS_PATH)
)
try: try:
print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})") print(f"Logged in as: {acc.account_name} ({acc.first_name} {acc.last_name})")
# It's that simple! # Step 1: construct a key object and get its location reports
key = KeyPair.from_b64(priv_key)
reports = await acc.fetch_last_reports(key) reports = await acc.fetch_last_reports(key)
# Step 2: print the reports!
for report in sorted(reports): for report in sorted(reports):
print(report) print(report)
# We can save the report to a file if we want
report.to_json("last_report.json")
finally: finally:
await acc.close() await acc.close()
# Make sure to save account state when you're done!
acc.to_json(STORE_PATH)
return 0 return 0

View File

@@ -11,10 +11,21 @@ from pathlib import Path
from _login import get_account_sync from _login import get_account_sync
from findmy import FindMyAccessory from findmy import FindMyAccessory
from findmy.reports import RemoteAnisetteProvider
# URL to (public or local) anisette server # Path where login session will be stored.
ANISETTE_SERVER = "http://localhost:6969" # This is necessary to avoid generating a new session every time we log in.
STORE_PATH = "account.json"
# URL to LOCAL anisette server. Set to None to use built-in Anisette generator instead (recommended)
# IF YOU USE A PUBLIC SERVER, DO NOT COMPLAIN THAT YOU KEEP RUNNING INTO AUTHENTICATION ERRORS!
# If you change this value, make sure to remove the account store file.
ANISETTE_SERVER = None
# Path where Anisette libraries will be stored.
# This is only relevant when using the built-in Anisette server.
# It can be omitted (set to None) to avoid saving to disk,
# but specifying a path is highly recommended to avoid downloading the bundle on every run.
ANISETTE_LIBS_PATH = "ani_libs.bin"
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -26,8 +37,7 @@ def main(plist_path: str) -> int:
# Step 1: log into an Apple account # Step 1: log into an Apple account
print("Logging into account") print("Logging into account")
anisette = RemoteAnisetteProvider(ANISETTE_SERVER) acc = get_account_sync(STORE_PATH, ANISETTE_SERVER, ANISETTE_LIBS_PATH)
acc = get_account_sync(anisette)
# step 2: fetch reports! # step 2: fetch reports!
print("Fetching reports") print("Fetching reports")
@@ -39,6 +49,9 @@ def main(plist_path: str) -> int:
for report in sorted(reports): for report in sorted(reports):
print(f" - {report}") print(f" - {report}")
# step 4: save current account state to disk
acc.to_json(STORE_PATH)
return 0 return 0

View File

@@ -1,14 +1,15 @@
"""A package providing everything you need to work with Apple's FindMy network.""" """A package providing everything you need to work with Apple's FindMy network."""
from . import errors, keys, reports, scanner from . import errors, keys, plist, reports, scanner
from .accessory import FindMyAccessory from .accessory import FindMyAccessory
from .keys import KeyPair from .keys import KeyPair
__all__ = ( __all__ = (
"keys",
"reports",
"scanner",
"errors",
"FindMyAccessory", "FindMyAccessory",
"KeyPair", "KeyPair",
"errors",
"keys",
"plist",
"reports",
"scanner",
) )

106
findmy/__main__.py Normal file
View File

@@ -0,0 +1,106 @@
"""usage: python -m findmy""" # noqa: D400, D415
from __future__ import annotations
import argparse
import json
import logging
from importlib.metadata import version
from pathlib import Path
from .plist import get_key, list_accessories
def main() -> None: # noqa: D103
parser = argparse.ArgumentParser(prog="findmy", description="FindMy.py CLI tool")
parser.add_argument(
"-v",
"--version",
action="version",
version=version("FindMy"),
)
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="Set the logging level (default: INFO)",
)
subparsers = parser.add_subparsers(dest="command", title="commands")
subparsers.required = True
decrypt_parser = subparsers.add_parser(
"decrypt",
help="""
Decrypt and print (in json) all the local FindMy accessories.
This looks through the local FindMy accessory plist files,
decrypts them using the system keychain, and prints the
decrypted JSON representation of each accessory.
eg
```
[
{
"master_key": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"skn": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"sks": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"paired_at": "2020-01-08T21:26:36.177409+00:00",
"name": "Nick's MacBook Pro",
"model": "MacBookPro11,5",
"identifier": "03FF9E28-2508-425B-BD57-D738F2D2F6C0"
},
{
"master_key": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"skn": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"sks": "e01ae426431867e92d512ae1cb6c9e5bbc20a2b7d1c677d7",
"paired_at": "2023-10-22T20:40:39.285225+00:00",
"name": "ncmbp",
"model": "MacBookPro18,2",
"identifier": "71D276DF-A8FA-47C8-A93C-9B3B714BDFEC"
}
]
```
You can chain the output with jq or similar tools.
eg `python -m findmy decrypt | jq '.[] | select(.name == "my airtag")' > my_airtag.json`
""",
)
decrypt_parser.add_argument(
"--out-dir",
type=Path,
default=None,
help="Output directory for decrypted files. If not specified, files will not be saved to disk.", # noqa: E501
)
args = parser.parse_args()
logging.basicConfig(level=args.log_level.upper())
if args.command == "decrypt":
decrypt_all(args.out_dir)
else:
# This else block should ideally not be reached if subparsers.required is True
# and a default command isn't set, or if a command is always given.
# However, it's good practice for unexpected cases or if the logic changes.
parser.print_help()
parser.exit(1)
def decrypt_all(out_dir: str | Path | None = None) -> None:
"""Decrypt all accessories and save them to the specified directory as JSON files."""
def get_path(d, acc) -> Path | None: # noqa: ANN001
if out_dir is None:
return None
d = Path(d)
d = d.resolve().absolute()
d.mkdir(parents=True, exist_ok=True)
return d / f"{acc.identifier}.json"
key = get_key()
accs = list_accessories(key=key)
jsons = [acc.to_json(get_path(out_dir, acc)) for acc in accs]
print(json.dumps(jsons, indent=4, ensure_ascii=False)) # noqa: T201
if __name__ == "__main__":
main()

View File

@@ -10,14 +10,34 @@ import logging
import plistlib import plistlib
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import IO, Generator, overload from pathlib import Path
from typing import IO, TYPE_CHECKING, Literal, TypedDict, overload
from typing_extensions import override from typing_extensions import override
from findmy.util.abc import Serializable
from findmy.util.files import read_data_json, save_and_return_json
from .keys import KeyGenerator, KeyPair, KeyType from .keys import KeyGenerator, KeyPair, KeyType
from .util import crypto from .util import crypto
logging.getLogger(__name__) if TYPE_CHECKING:
from collections.abc import Generator
logger = logging.getLogger(__name__)
class FindMyAccessoryMapping(TypedDict):
"""JSON mapping representing state of a FindMyAccessory instance."""
type: Literal["accessory"]
master_key: str
skn: str
sks: str
paired_at: str
name: str | None
model: str | None
identifier: str | None
class RollingKeyPairSource(ABC): class RollingKeyPairSource(ABC):
@@ -62,16 +82,19 @@ class RollingKeyPairSource(ABC):
return keys return keys
class FindMyAccessory(RollingKeyPairSource): class FindMyAccessory(RollingKeyPairSource, Serializable[FindMyAccessoryMapping]):
"""A findable Find My-accessory using official key rollover.""" """A findable Find My-accessory using official key rollover."""
def __init__( def __init__( # noqa: PLR0913
self, self,
*,
master_key: bytes, master_key: bytes,
skn: bytes, skn: bytes,
sks: bytes, sks: bytes,
paired_at: datetime, paired_at: datetime,
name: str | None = None, name: str | None = None,
model: str | None = None,
identifier: str | None = None,
) -> None: ) -> None:
""" """
Initialize a FindMyAccessory. These values are usually obtained during pairing. Initialize a FindMyAccessory. These values are usually obtained during pairing.
@@ -85,12 +108,53 @@ class FindMyAccessory(RollingKeyPairSource):
self._paired_at: datetime = paired_at self._paired_at: datetime = paired_at
if self._paired_at.tzinfo is None: if self._paired_at.tzinfo is None:
self._paired_at = self._paired_at.astimezone() self._paired_at = self._paired_at.astimezone()
logging.warning( logger.warning(
"Pairing datetime is timezone-naive. Assuming system tz: %s.", "Pairing datetime is timezone-naive. Assuming system tz: %s.",
self._paired_at.tzname(), self._paired_at.tzname(),
) )
self._name = name self._name = name
self._model = model
self._identifier = identifier
@property
def master_key(self) -> bytes:
"""The private master key."""
return self._primary_gen.master_key
@property
def skn(self) -> bytes:
"""The SKN for the primary key."""
return self._primary_gen.initial_sk
@property
def sks(self) -> bytes:
"""The SKS for the secondary key."""
return self._secondary_gen.initial_sk
@property
def paired_at(self) -> datetime:
"""Date and time at which this accessory was paired with an Apple account."""
return self._paired_at
@property
def name(self) -> str | None:
"""Name of this accessory."""
return self._name
@name.setter
def name(self, name: str | None) -> None:
self._name = name
@property
def model(self) -> str | None:
"""Model string of this accessory, as provided by the manufacturer."""
return self._model
@property
def identifier(self) -> str | None:
"""Internal identifier of this accessory."""
return self._identifier
@property @property
@override @override
@@ -146,9 +210,22 @@ class FindMyAccessory(RollingKeyPairSource):
return possible_keys return possible_keys
@classmethod @classmethod
def from_plist(cls, plist: IO[bytes]) -> FindMyAccessory: def from_plist(
cls,
plist: str | Path | dict | bytes | IO[bytes],
*,
name: str | None = None,
) -> FindMyAccessory:
"""Create a FindMyAccessory from a .plist file dumped from the FindMy app.""" """Create a FindMyAccessory from a .plist file dumped from the FindMy app."""
device_data = plistlib.load(plist) if isinstance(plist, bytes):
# plist is a bytes object
device_data = plistlib.loads(plist)
elif isinstance(plist, (str, Path)):
device_data = plistlib.loads(Path(plist).read_bytes())
elif isinstance(plist, IO):
device_data = plistlib.load(plist)
else:
device_data = plist
# PRIVATE master key. 28 (?) bytes. # PRIVATE master key. 28 (?) bytes.
master_key = device_data["privateKey"]["key"]["data"][-28:] master_key = device_data["privateKey"]["key"]["data"][-28:]
@@ -167,7 +244,57 @@ class FindMyAccessory(RollingKeyPairSource):
# "Paired at" timestamp (UTC) # "Paired at" timestamp (UTC)
paired_at = device_data["pairingDate"].replace(tzinfo=timezone.utc) paired_at = device_data["pairingDate"].replace(tzinfo=timezone.utc)
return cls(master_key, skn, sks, paired_at) model = device_data["model"]
identifier = device_data["identifier"]
return cls(
master_key=master_key,
skn=skn,
sks=sks,
paired_at=paired_at,
name=name,
model=model,
identifier=identifier,
)
@override
def to_json(self, path: str | Path | None = None, /) -> FindMyAccessoryMapping:
res: FindMyAccessoryMapping = {
"type": "accessory",
"master_key": self._primary_gen.master_key.hex(),
"skn": self.skn.hex(),
"sks": self.sks.hex(),
"paired_at": self._paired_at.isoformat(),
"name": self.name,
"model": self.model,
"identifier": self.identifier,
}
return save_and_return_json(res, path)
@classmethod
@override
def from_json(
cls,
val: str | Path | FindMyAccessoryMapping,
/,
) -> FindMyAccessory:
val = read_data_json(val)
assert val["type"] == "accessory"
try:
return cls(
master_key=bytes.fromhex(val["master_key"]),
skn=bytes.fromhex(val["skn"]),
sks=bytes.fromhex(val["sks"]),
paired_at=datetime.fromisoformat(val["paired_at"]),
name=val["name"],
model=val["model"],
identifier=val["identifier"],
)
except KeyError as e:
msg = f"Failed to restore account data: {e}"
raise ValueError(msg) from None
class AccessoryKeyGenerator(KeyGenerator[KeyPair]): class AccessoryKeyGenerator(KeyGenerator[KeyPair]):
@@ -202,6 +329,21 @@ class AccessoryKeyGenerator(KeyGenerator[KeyPair]):
self._iter_ind = 0 self._iter_ind = 0
@property
def master_key(self) -> bytes:
"""The private master key."""
return self._master_key
@property
def initial_sk(self) -> bytes:
"""The initial secret key."""
return self._initial_sk
@property
def key_type(self) -> KeyType:
"""The type of key this generator produces."""
return self._key_type
def _get_sk(self, ind: int) -> bytes: def _get_sk(self, ind: int) -> bytes:
if ind < self._cur_sk_ind: # behind us; need to reset :( if ind < self._cur_sk_ind: # behind us; need to reset :(
self._cur_sk = self._initial_sk self._cur_sk = self._initial_sk

View File

@@ -7,12 +7,19 @@ import hashlib
import secrets import secrets
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
from typing import Generator, Generic, TypeVar, overload from typing import TYPE_CHECKING, Generic, Literal, TypedDict, TypeVar, overload
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from typing_extensions import override from typing_extensions import override
from .util import crypto from findmy.util.abc import Serializable
from findmy.util.files import read_data_json, save_and_return_json
from .util import crypto, parsers
if TYPE_CHECKING:
from collections.abc import Generator
from pathlib import Path
class KeyType(Enum): class KeyType(Enum):
@@ -23,6 +30,16 @@ class KeyType(Enum):
SECONDARY = 2 SECONDARY = 2
class KeyPairMapping(TypedDict):
"""JSON mapping representing a KeyPair."""
type: Literal["keypair"]
private_key: str
key_type: int
name: str | None
class HasHashedPublicKey(ABC): class HasHashedPublicKey(ABC):
""" """
ABC for anything that has a public, hashed FindMy-key. ABC for anything that has a public, hashed FindMy-key.
@@ -77,11 +94,48 @@ class HasPublicKey(HasHashedPublicKey, ABC):
"""See `HasHashedPublicKey.hashed_adv_key_bytes`.""" """See `HasHashedPublicKey.hashed_adv_key_bytes`."""
return hashlib.sha256(self.adv_key_bytes).digest() return hashlib.sha256(self.adv_key_bytes).digest()
@property
def mac_address(self) -> str:
"""Get the mac address from the public key."""
first_byte = (self.adv_key_bytes[0] | 0b11000000).to_bytes(1)
return ":".join([parsers.format_hex_byte(x) for x in first_byte + self.adv_key_bytes[1:6]])
class KeyPair(HasPublicKey): def adv_data(self, status: int = 0, hint: int = 0) -> bytes:
"""Get the BLE advertisement data that should be broadcast to advertise this key."""
return bytes(
[
# apple company id
0x4C,
0x00,
],
) + self.of_data(status, hint)
def of_data(self, status: int = 0, hint: int = 0) -> bytes:
"""Get the Offline Finding data that should be broadcast to advertise this key."""
return bytes(
[
# offline finding
0x12,
# offline finding data length
25,
status,
# remaining public key bytes
*self.adv_key_bytes[6:],
self.adv_key_bytes[0] >> 6,
hint,
],
)
class KeyPair(HasPublicKey, Serializable[KeyPairMapping]):
"""A private-public keypair for a trackable FindMy accessory.""" """A private-public keypair for a trackable FindMy accessory."""
def __init__(self, private_key: bytes, key_type: KeyType = KeyType.UNKNOWN) -> None: def __init__(
self,
private_key: bytes,
key_type: KeyType = KeyType.UNKNOWN,
name: str | None = None,
) -> None:
"""Initialize the `KeyPair` with the private key bytes.""" """Initialize the `KeyPair` with the private key bytes."""
priv_int = crypto.bytes_to_int(private_key) priv_int = crypto.bytes_to_int(private_key)
self._priv_key = ec.derive_private_key( self._priv_key = ec.derive_private_key(
@@ -90,12 +144,22 @@ class KeyPair(HasPublicKey):
) )
self._key_type = key_type self._key_type = key_type
self._name = name
@property @property
def key_type(self) -> KeyType: def key_type(self) -> KeyType:
"""Type of this key.""" """Type of this key."""
return self._key_type return self._key_type
@property
def name(self) -> str | None:
"""Name of this KeyPair."""
return self._name
@name.setter
def name(self, name: str | None) -> None:
self._name = name
@classmethod @classmethod
def new(cls) -> KeyPair: def new(cls) -> KeyPair:
"""Generate a new random `KeyPair`.""" """Generate a new random `KeyPair`."""
@@ -132,13 +196,41 @@ class KeyPair(HasPublicKey):
key_bytes = self._priv_key.public_key().public_numbers().x key_bytes = self._priv_key.public_key().public_numbers().x
return int.to_bytes(key_bytes, 28, "big") return int.to_bytes(key_bytes, 28, "big")
@override
def to_json(self, dst: str | Path | None = None, /) -> KeyPairMapping:
return save_and_return_json(
{
"type": "keypair",
"private_key": base64.b64encode(self.private_key_bytes).decode("ascii"),
"key_type": self._key_type.value,
"name": self.name,
},
dst,
)
@classmethod
@override
def from_json(cls, val: str | Path | KeyPairMapping, /) -> KeyPair:
val = read_data_json(val)
assert val["type"] == "keypair"
try:
return cls(
private_key=base64.b64decode(val["private_key"]),
key_type=KeyType(val["key_type"]),
name=val["name"],
)
except KeyError as e:
msg = f"Failed to restore KeyPair data: {e}"
raise ValueError(msg) from None
def dh_exchange(self, other_pub_key: ec.EllipticCurvePublicKey) -> bytes: def dh_exchange(self, other_pub_key: ec.EllipticCurvePublicKey) -> bytes:
"""Do a Diffie-Hellman key exchange using another EC public key.""" """Do a Diffie-Hellman key exchange using another EC public key."""
return self._priv_key.exchange(ec.ECDH(), other_pub_key) return self._priv_key.exchange(ec.ECDH(), other_pub_key)
@override @override
def __repr__(self) -> str: def __repr__(self) -> str:
return f'KeyPair(public_key="{self.adv_key_b64}", type={self.key_type})' return f'KeyPair(name="{self.name}", public_key="{self.adv_key_b64}", type={self.key_type})'
K = TypeVar("K") K = TypeVar("K")

90
findmy/plist.py Normal file
View File

@@ -0,0 +1,90 @@
"""Utils for decrypting the encypted .record files into .plist files."""
from __future__ import annotations
import plistlib
import subprocess
from pathlib import Path
from typing import IO
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from .accessory import FindMyAccessory
# Originally from:
# Author: Shane B. <shane@wander.dev>
# in https://github.com/parawanderer/OpenTagViewer/blob/08a59cab551721afb9dc9f829ad31dae8d5bd400/python/airtag_decryptor.py
# which was based on:
# Based on: https://gist.github.com/airy10/5205dc851fbd0715fcd7a5cdde25e7c8
# consider switching to this library https://github.com/microsoft/keyper
# once they publish a version of it that includes my MR with the changes to make it compatible
# with keys that are non-utf-8 encoded (like the BeaconStore one)
# if I contribute this, properly escape the label argument here...
def get_key() -> bytes:
"""Get the decryption key for BeaconStore using the system password prompt window."""
# This thing will pop up 2 Password Input windows...
key_in_hex = subprocess.getoutput("/usr/bin/security find-generic-password -l 'BeaconStore' -w") # noqa: S605
return bytes.fromhex(key_in_hex)
def decrypt_plist(encrypted: str | Path | bytes | IO[bytes], key: bytes) -> dict:
"""
Decrypts the encrypted plist file at `encrypted` using the provided `key`.
:param encrypted: If bytes or IO, the encrypted plist data.
If str or Path, the path to the encrypted plist file, which is
generally something like `/Users/<username>/Library/com.apple.icloud.searchpartyd/OwnedBeacons/<UUID>.record`
:param key: Raw key to decrypt plist file with.
See: `get_key()`
:returns: The decoded plist dict
""" # noqa: E501
if isinstance(encrypted, (str, Path)):
with Path(encrypted).open("rb") as f:
encrypted_bytes = f.read()
elif isinstance(encrypted, bytes):
encrypted_bytes = encrypted
elif isinstance(encrypted, IO):
encrypted_bytes = encrypted.read()
else:
raise TypeError("encrypted must be a str, Path, bytes, or IO[bytes]") # noqa: EM101, TRY003
plist = plistlib.loads(encrypted_bytes)
if not isinstance(plist, list) or len(plist) < 3:
raise ValueError(plist, "encrypted plist should be a list of 3 elements")
nonce, tag, ciphertext = plist[0], plist[1], plist[2]
cipher = Cipher(algorithms.AES(key), modes.GCM(nonce, tag))
decryptor = cipher.decryptor()
decrypted_plist_bytes = decryptor.update(ciphertext) + decryptor.finalize()
decrypted_plist = plistlib.loads(decrypted_plist_bytes)
if not isinstance(decrypted_plist, dict):
raise ValueError(decrypted_plist, "decrypted plist should be a dictionary") # noqa: TRY004
return decrypted_plist
def list_accessories(
*,
key: bytes | None = None,
search_path: str | Path | None = None,
) -> list[FindMyAccessory]:
"""Get all accesories from the encrypted .plist files dumped from the FindMy app."""
if search_path is None:
search_path = Path.home() / "Library" / "com.apple.icloud.searchpartyd"
search_path = Path(search_path)
if key is None:
key = get_key()
accesories = []
encrypted_plist_paths = search_path.glob("OwnedBeacons/*.record")
for path in encrypted_plist_paths:
plist = decrypt_plist(path, key)
naming_record_path = next((search_path / "BeaconNamingRecord" / path.stem).glob("*.record"))
naming_record_plist = decrypt_plist(naming_record_path, key)
name = naming_record_plist["name"]
accessory = FindMyAccessory.from_plist(plist, name=name)
accesories.append(accessory)
return accesories

View File

@@ -8,8 +8,8 @@ from .twofactor import SmsSecondFactorMethod, TrustedDeviceSecondFactorMethod
__all__ = ( __all__ = (
"AppleAccount", "AppleAccount",
"AsyncAppleAccount", "AsyncAppleAccount",
"LoginState",
"BaseAnisetteProvider", "BaseAnisetteProvider",
"LoginState",
"RemoteAnisetteProvider", "RemoteAnisetteProvider",
"SmsSecondFactorMethod", "SmsSecondFactorMethod",
"TrustedDeviceSecondFactorMethod", "TrustedDeviceSecondFactorMethod",

View File

@@ -15,7 +15,7 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Callable, Callable,
Sequence, Literal,
TypedDict, TypedDict,
TypeVar, TypeVar,
cast, cast,
@@ -32,8 +32,10 @@ from findmy.errors import (
UnauthorizedError, UnauthorizedError,
UnhandledProtocolError, UnhandledProtocolError,
) )
from findmy.reports.anisette import AnisetteMapping, get_provider_from_mapping
from findmy.util import crypto from findmy.util import crypto
from findmy.util.closable import Closable from findmy.util.abc import Closable, Serializable
from findmy.util.files import read_data_json, save_and_return_json
from findmy.util.http import HttpResponse, HttpSession, decode_plist from findmy.util.http import HttpResponse, HttpSession, decode_plist
from .reports import LocationReport, LocationReportsFetcher from .reports import LocationReport, LocationReportsFetcher
@@ -49,13 +51,16 @@ from .twofactor import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
from findmy.accessory import RollingKeyPairSource from findmy.accessory import RollingKeyPairSource
from findmy.keys import HasHashedPublicKey from findmy.keys import HasHashedPublicKey
from findmy.util.types import MaybeCoro from findmy.util.types import MaybeCoro
from .anisette import BaseAnisetteProvider from .anisette import BaseAnisetteProvider
logging.getLogger(__name__) logger = logging.getLogger(__name__)
srp.rfc5054_enable() srp.rfc5054_enable()
srp.no_username_in_x() srp.no_username_in_x()
@@ -68,6 +73,33 @@ class _AccountInfo(TypedDict):
trusted_device_2fa: bool trusted_device_2fa: bool
class _AccountStateMappingIds(TypedDict):
uid: str
devid: str
class _AccountStateMappingAccount(TypedDict):
username: str | None
password: str | None
info: _AccountInfo | None
class _AccountStateMappingLoginState(TypedDict):
state: int
data: dict # TODO: make typed # noqa: TD002, TD003
class AccountStateMapping(TypedDict):
"""JSON mapping representing state of an Apple account instance."""
type: Literal["account"]
ids: _AccountStateMappingIds
account: _AccountStateMappingAccount
login: _AccountStateMappingLoginState
anisette: AnisetteMapping
_P = ParamSpec("_P") _P = ParamSpec("_P")
_R = TypeVar("_R") _R = TypeVar("_R")
_A = TypeVar("_A", bound="BaseAppleAccount") _A = TypeVar("_A", bound="BaseAppleAccount")
@@ -109,7 +141,7 @@ def _extract_phone_numbers(html: str) -> list[dict]:
return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", []) return data.get("direct", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", [])
class BaseAppleAccount(Closable, ABC): class BaseAppleAccount(Closable, Serializable[AccountStateMapping], ABC):
"""Base class for an Apple account.""" """Base class for an Apple account."""
@property @property
@@ -149,28 +181,6 @@ class BaseAppleAccount(Closable, ABC):
""" """
raise NotImplementedError raise NotImplementedError
@abstractmethod
def export(self) -> dict:
"""
Export a representation of the current state of the account as a dictionary.
The output of this method is guaranteed to be JSON-serializable, and passing
the return value of this function as an argument to `BaseAppleAccount.restore`
will always result in an exact copy of the internal state as it was when exported.
This method is especially useful to avoid having to keep going through the login flow.
"""
raise NotImplementedError
@abstractmethod
def restore(self, data: dict) -> None:
"""
Restore a previous export of the internal state of the account.
See `BaseAppleAccount.export` for more information.
"""
raise NotImplementedError
@abstractmethod @abstractmethod
def login(self, username: str, password: str) -> MaybeCoro[LoginState]: def login(self, username: str, password: str) -> MaybeCoro[LoginState]:
"""Log in to an Apple account using a username and password.""" """Log in to an Apple account using a username and password."""
@@ -230,15 +240,6 @@ class BaseAppleAccount(Closable, ABC):
date_to: datetime | None, date_to: datetime | None,
) -> MaybeCoro[list[LocationReport]]: ... ) -> MaybeCoro[list[LocationReport]]: ...
@overload
@abstractmethod
def fetch_reports(
self,
keys: Sequence[HasHashedPublicKey],
date_from: datetime,
date_to: datetime | None,
) -> MaybeCoro[dict[HasHashedPublicKey, list[LocationReport]]]: ...
@overload @overload
@abstractmethod @abstractmethod
def fetch_reports( def fetch_reports(
@@ -248,13 +249,26 @@ class BaseAppleAccount(Closable, ABC):
date_to: datetime | None, date_to: datetime | None,
) -> MaybeCoro[list[LocationReport]]: ... ) -> MaybeCoro[list[LocationReport]]: ...
@overload
@abstractmethod @abstractmethod
def fetch_reports( def fetch_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
date_from: datetime, date_from: datetime,
date_to: datetime | None, date_to: datetime | None,
) -> MaybeCoro[list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]]: ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ...
@abstractmethod
def fetch_reports(
self,
keys: HasHashedPublicKey
| Sequence[HasHashedPublicKey | RollingKeyPairSource]
| RollingKeyPairSource,
date_from: datetime,
date_to: datetime | None,
) -> MaybeCoro[
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
]:
""" """
Fetch location reports for `HasHashedPublicKey`s between `date_from` and `date_end`. Fetch location reports for `HasHashedPublicKey`s between `date_from` and `date_end`.
@@ -274,24 +288,28 @@ class BaseAppleAccount(Closable, ABC):
@abstractmethod @abstractmethod
def fetch_last_reports( def fetch_last_reports(
self, self,
keys: Sequence[HasHashedPublicKey], keys: RollingKeyPairSource,
hours: int = 7 * 24, hours: int = 7 * 24,
) -> MaybeCoro[dict[HasHashedPublicKey, list[LocationReport]]]: ... ) -> MaybeCoro[list[LocationReport]]: ...
@overload @overload
@abstractmethod @abstractmethod
def fetch_last_reports( def fetch_last_reports(
self, self,
keys: RollingKeyPairSource, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24, hours: int = 7 * 24,
) -> MaybeCoro[list[LocationReport]]: ... ) -> MaybeCoro[dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]]: ...
@abstractmethod @abstractmethod
def fetch_last_reports( def fetch_last_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: HasHashedPublicKey
| RollingKeyPairSource
| Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24, hours: int = 7 * 24,
) -> MaybeCoro[list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]]: ) -> MaybeCoro[
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
]:
""" """
Fetch location reports for a sequence of `HasHashedPublicKey`s for the last `hours` hours. Fetch location reports for a sequence of `HasHashedPublicKey`s for the last `hours` hours.
@@ -328,37 +346,41 @@ class AsyncAppleAccount(BaseAppleAccount):
_ENDPOINT_2FA_TD_SUBMIT = "https://gsa.apple.com/grandslam/GsService2/validate" _ENDPOINT_2FA_TD_SUBMIT = "https://gsa.apple.com/grandslam/GsService2/validate"
# reports endpoints # reports endpoints
_ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/acsnservice/fetch" _ENDPOINT_REPORTS_FETCH = "https://gateway.icloud.com/findmyservice/v2/fetch"
def __init__( def __init__(
self, self,
anisette: BaseAnisetteProvider, anisette: BaseAnisetteProvider,
user_id: str | None = None, *,
device_id: str | None = None, state_info: AccountStateMapping | None = None,
) -> None: ) -> None:
""" """
Initialize the apple account. Initialize the apple account.
:param anisette: An instance of `AsyncAnisetteProvider`. :param anisette: An instance of `AsyncAnisetteProvider`.
:param user_id: An optional user ID to use. Will be auto-generated if missing.
:param device_id: An optional device ID to use. Will be auto-generated if missing.
""" """
super().__init__() super().__init__()
self._anisette: BaseAnisetteProvider = anisette self._anisette: BaseAnisetteProvider = anisette
self._uid: str = user_id or str(uuid.uuid4()) self._uid: str = state_info["ids"]["uid"] if state_info else str(uuid.uuid4())
self._devid: str = device_id or str(uuid.uuid4()) self._devid: str = state_info["ids"]["devid"] if state_info else str(uuid.uuid4())
self._username: str | None = None # TODO: combine, user/pass should be "all or nothing" # noqa: TD002, TD003
self._password: str | None = None self._username: str | None = state_info["account"]["username"] if state_info else None
self._password: str | None = state_info["account"]["password"] if state_info else None
self._login_state: LoginState = LoginState.LOGGED_OUT self._login_state: LoginState = (
self._login_state_data: dict = {} LoginState(state_info["login"]["state"]) if state_info else LoginState.LOGGED_OUT
)
self._login_state_data: dict = state_info["login"]["data"] if state_info else {}
self._account_info: _AccountInfo | None = None self._account_info: _AccountInfo | None = (
state_info["account"]["info"] if state_info else None
)
self._http: HttpSession = HttpSession() self._http: HttpSession = HttpSession()
self._reports: LocationReportsFetcher = LocationReportsFetcher(self) self._reports: LocationReportsFetcher = LocationReportsFetcher(self)
self._closed: bool = False
def _set_login_state( def _set_login_state(
self, self,
@@ -367,10 +389,10 @@ class AsyncAppleAccount(BaseAppleAccount):
) -> LoginState: ) -> LoginState:
# clear account info if downgrading state (e.g. LOGGED_IN -> LOGGED_OUT) # clear account info if downgrading state (e.g. LOGGED_IN -> LOGGED_OUT)
if state < self._login_state: if state < self._login_state:
logging.debug("Clearing cached account information") logger.debug("Clearing cached account information")
self._account_info = None self._account_info = None
logging.info("Transitioning login state: %s -> %s", self._login_state, state) logger.info("Transitioning login state: %s -> %s", self._login_state, state)
self._login_state = state self._login_state = state
self._login_state_data = data or {} self._login_state_data = data or {}
@@ -416,34 +438,39 @@ class AsyncAppleAccount(BaseAppleAccount):
return self._account_info["last_name"] if self._account_info else None return self._account_info["last_name"] if self._account_info else None
@override @override
def export(self) -> dict: def to_json(self, path: str | Path | None = None, /) -> AccountStateMapping:
"""See `BaseAppleAccount.export`.""" res: AccountStateMapping = {
return { "type": "account",
"ids": {"uid": self._uid, "devid": self._devid}, "ids": {"uid": self._uid, "devid": self._devid},
"account": { "account": {
"username": self._username, "username": self._username,
"password": self._password, "password": self._password,
"info": self._account_info, "info": self._account_info,
}, },
"login_state": { "login": {
"state": self._login_state.value, "state": self._login_state.value,
"data": self._login_state_data, "data": self._login_state_data,
}, },
"anisette": self._anisette.to_json(),
} }
return save_and_return_json(res, path)
@classmethod
@override @override
def restore(self, data: dict) -> None: def from_json(
"""See `BaseAppleAccount.restore`.""" cls,
val: str | Path | AccountStateMapping,
/,
*,
anisette_libs_path: str | Path | None = None,
) -> AsyncAppleAccount:
val = read_data_json(val)
assert val["type"] == "account"
try: try:
self._uid = data["ids"]["uid"] ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path)
self._devid = data["ids"]["devid"] return cls(ani_provider, state_info=val)
self._username = data["account"]["username"]
self._password = data["account"]["password"]
self._account_info = data["account"]["info"]
self._login_state = LoginState(data["login_state"]["state"])
self._login_state_data = data["login_state"]["data"]
except KeyError as e: except KeyError as e:
msg = f"Failed to restore account data: {e}" msg = f"Failed to restore account data: {e}"
raise ValueError(msg) from None raise ValueError(msg) from None
@@ -455,8 +482,21 @@ class AsyncAppleAccount(BaseAppleAccount):
Should be called when the object will no longer be used. Should be called when the object will no longer be used.
""" """
await self._anisette.close() if self._closed:
await self._http.close() return # Already closed, make it idempotent
self._closed = True
# Close in proper order: anisette first, then HTTP session
try:
await self._anisette.close()
except (RuntimeError, OSError, ConnectionError) as e:
logger.warning("Error closing anisette provider: %s", e)
try:
await self._http.close()
except (RuntimeError, OSError, ConnectionError) as e:
logger.warning("Error closing HTTP session: %s", e)
@require_login_state(LoginState.LOGGED_OUT) @require_login_state(LoginState.LOGGED_OUT)
@override @override
@@ -495,7 +535,7 @@ class AsyncAppleAccount(BaseAppleAccount):
for number in phone_numbers for number in phone_numbers
) )
except RuntimeError: except RuntimeError:
logging.warning("Unable to extract phone numbers from login page") logger.warning("Unable to extract phone numbers from login page")
return methods return methods
@@ -575,13 +615,37 @@ class AsyncAppleAccount(BaseAppleAccount):
return await self._login_mobileme() return await self._login_mobileme()
@require_login_state(LoginState.LOGGED_IN) @require_login_state(LoginState.LOGGED_IN)
async def fetch_raw_reports(self, start: int, end: int, ids: list[str]) -> dict[str, Any]: async def fetch_raw_reports(
self,
start: datetime,
end: datetime,
devices: list[list[str]],
) -> dict[str, Any]:
"""Make a request for location reports, returning raw data.""" """Make a request for location reports, returning raw data."""
auth = ( auth = (
self._login_state_data["dsid"], self._login_state_data["dsid"],
self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"], self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"],
) )
data = {"search": [{"startDate": start, "endDate": end, "ids": ids}]} start_ts = int(start.timestamp() * 1000)
end_ts = int(end.timestamp() * 1000)
data = {
"clientContext": {
"clientBundleIdentifier": "com.apple.icloud.searchpartyuseragent",
"policy": "foregroundClient",
},
"fetch": [
{
"ownedDeviceIds": [],
"keyType": 1,
"startDate": start_ts,
"startDateSecondary": start_ts,
"endDate": end_ts,
# passing all keys as primary seems to work fine
"primaryIds": device_keys,
}
for device_keys in devices
],
}
async def _do_request() -> HttpResponse: async def _do_request() -> HttpResponse:
return await self._http.post( return await self._http.post(
@@ -593,7 +657,7 @@ class AsyncAppleAccount(BaseAppleAccount):
r = await _do_request() r = await _do_request()
if r.status_code == 401: if r.status_code == 401:
logging.info("Got 401 while fetching reports, redoing login") logger.info("Got 401 while fetching reports, redoing login")
new_state = await self._gsa_authenticate() new_state = await self._gsa_authenticate()
if new_state != LoginState.AUTHENTICATED: if new_state != LoginState.AUTHENTICATED:
@@ -611,11 +675,11 @@ class AsyncAppleAccount(BaseAppleAccount):
resp = r.json() resp = r.json()
except json.JSONDecodeError: except json.JSONDecodeError:
resp = {} resp = {}
if not r.ok or resp.get("statusCode") != "200": if not r.ok or resp.get("acsnLocations", {}).get("statusCode") != "200":
msg = f"Failed to fetch reports: {resp.get('statusCode')}" msg = f"Failed to fetch reports: {resp.get('statusCode')}"
raise UnhandledProtocolError(msg) raise UnhandledProtocolError(msg)
return resp return resp["acsnLocations"]
@overload @overload
async def fetch_reports( async def fetch_reports(
@@ -625,14 +689,6 @@ class AsyncAppleAccount(BaseAppleAccount):
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
async def fetch_reports(
self,
keys: Sequence[HasHashedPublicKey],
date_from: datetime,
date_to: datetime | None,
) -> dict[HasHashedPublicKey, list[LocationReport]]: ...
@overload @overload
async def fetch_reports( async def fetch_reports(
self, self,
@@ -641,14 +697,26 @@ class AsyncAppleAccount(BaseAppleAccount):
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
async def fetch_reports(
self,
keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
date_from: datetime,
date_to: datetime | None,
) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
@require_login_state(LoginState.LOGGED_IN) @require_login_state(LoginState.LOGGED_IN)
@override @override
async def fetch_reports( async def fetch_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: HasHashedPublicKey
| RollingKeyPairSource
| Sequence[HasHashedPublicKey | RollingKeyPairSource],
date_from: datetime, date_from: datetime,
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]: ) -> (
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
):
"""See `BaseAppleAccount.fetch_reports`.""" """See `BaseAppleAccount.fetch_reports`."""
date_to = date_to or datetime.now().astimezone() date_to = date_to or datetime.now().astimezone()
@@ -668,24 +736,28 @@ class AsyncAppleAccount(BaseAppleAccount):
@overload @overload
async def fetch_last_reports( async def fetch_last_reports(
self, self,
keys: Sequence[HasHashedPublicKey], keys: RollingKeyPairSource,
hours: int = 7 * 24, hours: int = 7 * 24,
) -> dict[HasHashedPublicKey, list[LocationReport]]: ... ) -> list[LocationReport]: ...
@overload @overload
async def fetch_last_reports( async def fetch_last_reports(
self, self,
keys: RollingKeyPairSource, keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24, hours: int = 7 * 24,
) -> list[LocationReport]: ... ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
@require_login_state(LoginState.LOGGED_IN) @require_login_state(LoginState.LOGGED_IN)
@override @override
async def fetch_last_reports( async def fetch_last_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: HasHashedPublicKey
| RollingKeyPairSource
| Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24, hours: int = 7 * 24,
) -> list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]: ) -> (
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
):
"""See `BaseAppleAccount.fetch_last_reports`.""" """See `BaseAppleAccount.fetch_last_reports`."""
end = datetime.now(tz=timezone.utc) end = datetime.now(tz=timezone.utc)
start = end - timedelta(hours=hours) start = end - timedelta(hours=hours)
@@ -702,13 +774,13 @@ class AsyncAppleAccount(BaseAppleAccount):
self._username = username or self._username self._username = username or self._username
self._password = password or self._password self._password = password or self._password
logging.info("Attempting authentication for user %s", self._username) logger.info("Attempting authentication for user %s", self._username)
if not self._username or not self._password: if not self._username or not self._password:
msg = "No username or password specified" msg = "No username or password specified"
raise ValueError(msg) raise ValueError(msg)
logging.debug("Starting authentication with username") logger.debug("Starting authentication with username")
usr = srp.User(self._username, b"", hash_alg=srp.SHA256, ng_type=srp.NG_2048) usr = srp.User(self._username, b"", hash_alg=srp.SHA256, ng_type=srp.NG_2048)
_, a2k = usr.start_authentication() _, a2k = usr.start_authentication()
@@ -716,7 +788,7 @@ class AsyncAppleAccount(BaseAppleAccount):
{"A2k": a2k, "u": self._username, "ps": ["s2k", "s2k_fo"], "o": "init"}, {"A2k": a2k, "u": self._username, "ps": ["s2k", "s2k_fo"], "o": "init"},
) )
logging.debug("Verifying response to auth request") logger.debug("Verifying response to auth request")
if r["Status"].get("ec") != 0: if r["Status"].get("ec") != 0:
msg = "Email verification failed: " + r["Status"].get("em") msg = "Email verification failed: " + r["Status"].get("em")
@@ -726,7 +798,7 @@ class AsyncAppleAccount(BaseAppleAccount):
msg = f"This implementation only supports s2k and sk2_fo. Server returned {sp}" msg = f"This implementation only supports s2k and sk2_fo. Server returned {sp}"
raise UnhandledProtocolError(msg) raise UnhandledProtocolError(msg)
logging.debug("Attempting password challenge") logger.debug("Attempting password challenge")
usr.p = crypto.encrypt_password(self._password, r["s"], r["i"], sp) usr.p = crypto.encrypt_password(self._password, r["s"], r["i"], sp)
m1 = usr.process_challenge(r["s"], r["B"]) m1 = usr.process_challenge(r["s"], r["B"])
@@ -737,7 +809,7 @@ class AsyncAppleAccount(BaseAppleAccount):
{"c": r["c"], "M1": m1, "u": self._username, "o": "complete"}, {"c": r["c"], "M1": m1, "u": self._username, "o": "complete"},
) )
logging.debug("Verifying password challenge response") logger.debug("Verifying password challenge response")
if r["Status"].get("ec") != 0: if r["Status"].get("ec") != 0:
msg = "Password authentication failed: " + r["Status"].get("em") msg = "Password authentication failed: " + r["Status"].get("em")
@@ -747,7 +819,7 @@ class AsyncAppleAccount(BaseAppleAccount):
msg = "Failed to verify session" msg = "Failed to verify session"
raise UnhandledProtocolError(msg) raise UnhandledProtocolError(msg)
logging.debug("Decrypting SPD data in response") logger.debug("Decrypting SPD data in response")
spd = decode_plist( spd = decode_plist(
crypto.decrypt_spd_aes_cbc( crypto.decrypt_spd_aes_cbc(
@@ -756,9 +828,9 @@ class AsyncAppleAccount(BaseAppleAccount):
), ),
) )
logging.debug("Received account information") logger.debug("Received account information")
self._account_info = cast( self._account_info = cast(
_AccountInfo, "_AccountInfo",
{ {
"account_name": spd.get("acname"), "account_name": spd.get("acname"),
"first_name": spd.get("fn"), "first_name": spd.get("fn"),
@@ -769,7 +841,7 @@ class AsyncAppleAccount(BaseAppleAccount):
au = r["Status"].get("au") au = r["Status"].get("au")
if au in ("secondaryAuth", "trustedDeviceSecondaryAuth"): if au in ("secondaryAuth", "trustedDeviceSecondaryAuth"):
logging.info("Detected 2FA requirement: %s", au) logger.info("Detected 2FA requirement: %s", au)
self._account_info["trusted_device_2fa"] = au == "trustedDeviceSecondaryAuth" self._account_info["trusted_device_2fa"] = au == "trustedDeviceSecondaryAuth"
@@ -778,7 +850,7 @@ class AsyncAppleAccount(BaseAppleAccount):
{"adsid": spd["adsid"], "idms_token": spd["GsIdmsToken"]}, {"adsid": spd["adsid"], "idms_token": spd["GsIdmsToken"]},
) )
if au is None: if au is None:
logging.info("GSA authentication successful") logger.info("GSA authentication successful")
idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "") idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "")
return self._set_login_state( return self._set_login_state(
@@ -791,7 +863,7 @@ class AsyncAppleAccount(BaseAppleAccount):
@require_login_state(LoginState.AUTHENTICATED) @require_login_state(LoginState.AUTHENTICATED)
async def _login_mobileme(self) -> LoginState: async def _login_mobileme(self) -> LoginState:
logging.info("Logging into com.apple.mobileme") logger.info("Logging into com.apple.mobileme")
data = plistlib.dumps( data = plistlib.dumps(
{ {
"apple-id": self._username, "apple-id": self._username,
@@ -913,11 +985,11 @@ class AppleAccount(BaseAppleAccount):
def __init__( def __init__(
self, self,
anisette: BaseAnisetteProvider, anisette: BaseAnisetteProvider,
user_id: str | None = None, *,
device_id: str | None = None, state_info: AccountStateMapping | None = None,
) -> None: ) -> None:
"""See `AsyncAppleAccount.__init__`.""" """See `AsyncAppleAccount.__init__`."""
self._asyncacc = AsyncAppleAccount(anisette, user_id, device_id) self._asyncacc = AsyncAppleAccount(anisette=anisette, state_info=state_info)
try: try:
self._evt_loop = asyncio.get_running_loop() self._evt_loop = asyncio.get_running_loop()
@@ -957,14 +1029,25 @@ class AppleAccount(BaseAppleAccount):
return self._asyncacc.last_name return self._asyncacc.last_name
@override @override
def export(self) -> dict: def to_json(self, dst: str | Path | None = None, /) -> AccountStateMapping:
"""See `AsyncAppleAccount.export`.""" return self._asyncacc.to_json(dst)
return self._asyncacc.export()
@classmethod
@override @override
def restore(self, data: dict) -> None: def from_json(
"""See `AsyncAppleAccount.restore`.""" cls,
return self._asyncacc.restore(data) val: str | Path | AccountStateMapping,
/,
*,
anisette_libs_path: str | Path | None = None,
) -> AppleAccount:
val = read_data_json(val)
try:
ani_provider = get_provider_from_mapping(val["anisette"], libs_path=anisette_libs_path)
return cls(ani_provider, state_info=val)
except KeyError as e:
msg = f"Failed to restore account data: {e}"
raise ValueError(msg) from None
@override @override
def login(self, username: str, password: str) -> LoginState: def login(self, username: str, password: str) -> LoginState:
@@ -1025,14 +1108,6 @@ class AppleAccount(BaseAppleAccount):
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
def fetch_reports(
self,
keys: Sequence[HasHashedPublicKey],
date_from: datetime,
date_to: datetime | None,
) -> dict[HasHashedPublicKey, list[LocationReport]]: ...
@overload @overload
def fetch_reports( def fetch_reports(
self, self,
@@ -1041,13 +1116,25 @@ class AppleAccount(BaseAppleAccount):
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
def fetch_reports(
self,
keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
date_from: datetime,
date_to: datetime | None,
) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
@override @override
def fetch_reports( def fetch_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: HasHashedPublicKey
| Sequence[HasHashedPublicKey | RollingKeyPairSource]
| RollingKeyPairSource,
date_from: datetime, date_from: datetime,
date_to: datetime | None, date_to: datetime | None,
) -> list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]: ) -> (
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
):
"""See `AsyncAppleAccount.fetch_reports`.""" """See `AsyncAppleAccount.fetch_reports`."""
coro = self._asyncacc.fetch_reports(keys, date_from, date_to) coro = self._asyncacc.fetch_reports(keys, date_from, date_to)
return self._evt_loop.run_until_complete(coro) return self._evt_loop.run_until_complete(coro)
@@ -1059,13 +1146,6 @@ class AppleAccount(BaseAppleAccount):
hours: int = 7 * 24, hours: int = 7 * 24,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
def fetch_last_reports(
self,
keys: Sequence[HasHashedPublicKey],
hours: int = 7 * 24,
) -> dict[HasHashedPublicKey, list[LocationReport]]: ...
@overload @overload
def fetch_last_reports( def fetch_last_reports(
self, self,
@@ -1073,12 +1153,23 @@ class AppleAccount(BaseAppleAccount):
hours: int = 7 * 24, hours: int = 7 * 24,
) -> list[LocationReport]: ... ) -> list[LocationReport]: ...
@overload
def fetch_last_reports(
self,
keys: Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24,
) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
@override @override
def fetch_last_reports( def fetch_last_reports(
self, self,
keys: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, keys: HasHashedPublicKey
| RollingKeyPairSource
| Sequence[HasHashedPublicKey | RollingKeyPairSource],
hours: int = 7 * 24, hours: int = 7 * 24,
) -> list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]: ) -> (
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
):
"""See `AsyncAppleAccount.fetch_last_reports`.""" """See `AsyncAppleAccount.fetch_last_reports`."""
coro = self._asyncacc.fetch_last_reports(keys, hours) coro = self._asyncacc.fetch_last_reports(keys, hours)
return self._evt_loop.run_until_complete(coro) return self._evt_loop.run_until_complete(coro)

View File

@@ -8,14 +8,52 @@ import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime, timezone from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import BinaryIO, Literal, TypedDict, Union
from anisette import Anisette, AnisetteHeaders
from typing_extensions import override from typing_extensions import override
from findmy.util.closable import Closable from findmy.util.abc import Closable, Serializable
from findmy.util.files import read_data_json, save_and_return_json
from findmy.util.http import HttpSession from findmy.util.http import HttpSession
logger = logging.getLogger(__name__)
class BaseAnisetteProvider(Closable, ABC):
class RemoteAnisetteMapping(TypedDict):
"""JSON mapping representing state of a remote Anisette provider."""
type: Literal["aniRemote"]
url: str
class LocalAnisetteMapping(TypedDict):
"""JSON mapping representing state of a local Anisette provider."""
type: Literal["aniLocal"]
prov_data: str
AnisetteMapping = Union[RemoteAnisetteMapping, LocalAnisetteMapping]
def get_provider_from_mapping(
mapping: AnisetteMapping,
*,
libs_path: str | Path | None = None,
) -> RemoteAnisetteProvider | LocalAnisetteProvider:
"""Get the correct Anisette provider instance from saved JSON data."""
if mapping["type"] == "aniRemote":
return RemoteAnisetteProvider.from_json(mapping)
if mapping["type"] == "aniLocal":
return LocalAnisetteProvider.from_json(mapping, libs_path=libs_path)
msg = f"Unknown anisette type: {mapping['type']}"
raise ValueError(msg)
class BaseAnisetteProvider(Closable, Serializable, ABC):
""" """
Abstract base class for Anisette providers. Abstract base class for Anisette providers.
@@ -25,22 +63,13 @@ class BaseAnisetteProvider(Closable, ABC):
@property @property
@abstractmethod @abstractmethod
def otp(self) -> str: def otp(self) -> str:
""" """A seemingly random base64 string containing 28 bytes."""
A seemingly random base64 string containing 28 bytes.
TODO: Figure out how to generate this.
"""
raise NotImplementedError raise NotImplementedError
@property @property
@abstractmethod @abstractmethod
def machine(self) -> str: def machine(self) -> str:
""" """A base64 encoded string of 60 'random' bytes."""
A base64 encoded string of 60 'random' bytes.
We're not sure how this is generated, we have to rely on the server.
TODO: Figure out how to generate this.
"""
raise NotImplementedError raise NotImplementedError
@property @property
@@ -159,7 +188,7 @@ class BaseAnisetteProvider(Closable, ABC):
return cpd return cpd
class RemoteAnisetteProvider(BaseAnisetteProvider): class RemoteAnisetteProvider(BaseAnisetteProvider, Serializable[RemoteAnisetteMapping]):
"""Anisette provider. Fetches headers from a remote Anisette server.""" """Anisette provider. Fetches headers from a remote Anisette server."""
_ANISETTE_DATA_VALID_FOR = 30 _ANISETTE_DATA_VALID_FOR = 30
@@ -174,6 +203,30 @@ class RemoteAnisetteProvider(BaseAnisetteProvider):
self._anisette_data: dict[str, str] | None = None self._anisette_data: dict[str, str] | None = None
self._anisette_data_expires_at: float = 0 self._anisette_data_expires_at: float = 0
self._closed = False
@override
def to_json(self, dst: str | Path | None = None, /) -> RemoteAnisetteMapping:
"""See `BaseAnisetteProvider.serialize`."""
return save_and_return_json(
{
"type": "aniRemote",
"url": self._server_url,
},
dst,
)
@classmethod
@override
def from_json(cls, val: str | Path | RemoteAnisetteMapping) -> RemoteAnisetteProvider:
"""See `BaseAnisetteProvider.deserialize`."""
val = read_data_json(val)
assert val["type"] == "aniRemote"
server_url = val["url"]
return cls(server_url)
@property @property
@override @override
@@ -181,7 +234,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider):
"""See `BaseAnisetteProvider.otp`_.""" """See `BaseAnisetteProvider.otp`_."""
otp = (self._anisette_data or {}).get("X-Apple-I-MD") otp = (self._anisette_data or {}).get("X-Apple-I-MD")
if otp is None: if otp is None:
logging.warning("X-Apple-I-MD header not found! Returning fallback...") logger.warning("X-Apple-I-MD header not found! Returning fallback...")
return otp or "" return otp or ""
@property @property
@@ -190,7 +243,7 @@ class RemoteAnisetteProvider(BaseAnisetteProvider):
"""See `BaseAnisetteProvider.machine`_.""" """See `BaseAnisetteProvider.machine`_."""
machine = (self._anisette_data or {}).get("X-Apple-I-MD-M") machine = (self._anisette_data or {}).get("X-Apple-I-MD-M")
if machine is None: if machine is None:
logging.warning("X-Apple-I-MD-M header not found! Returning fallback...") logger.warning("X-Apple-I-MD-M header not found! Returning fallback...")
return machine or "" return machine or ""
@override @override
@@ -202,10 +255,14 @@ class RemoteAnisetteProvider(BaseAnisetteProvider):
with_client_info: bool = False, with_client_info: bool = False,
) -> dict[str, str]: ) -> dict[str, str]:
"""See `BaseAnisetteProvider.get_headers`_.""" """See `BaseAnisetteProvider.get_headers`_."""
if self._anisette_data is None or time.time() >= self._anisette_data_expires_at: if self._closed:
logging.info("Fetching anisette data from %s", self._server_url) msg = "RemoteAnisetteProvider has been closed and cannot be used"
raise RuntimeError(msg)
r = await self._http.get(self._server_url) if self._anisette_data is None or time.time() >= self._anisette_data_expires_at:
logger.info("Fetching anisette data from %s", self._server_url)
r = await self._http.get(self._server_url, auto_retry=True)
self._anisette_data = r.json() self._anisette_data = r.json()
self._anisette_data_expires_at = time.time() + self._ANISETTE_DATA_VALID_FOR self._anisette_data_expires_at = time.time() + self._ANISETTE_DATA_VALID_FOR
@@ -214,25 +271,123 @@ class RemoteAnisetteProvider(BaseAnisetteProvider):
@override @override
async def close(self) -> None: async def close(self) -> None:
"""See `AnisetteProvider.close`.""" """See `AnisetteProvider.close`."""
await self._http.close() if self._closed:
return # Already closed, make it idempotent
self._closed = True
try:
await self._http.close()
except (RuntimeError, OSError, ConnectionError) as e:
logger.warning("Error closing anisette HTTP session: %s", e)
# TODO(malmeloo): implement using pyprovision class LocalAnisetteProvider(BaseAnisetteProvider, Serializable[LocalAnisetteMapping]):
# https://github.com/malmeloo/FindMy.py/issues/2 """Anisette provider. Generates headers without a remote server using the `anisette` library."""
class LocalAnisetteProvider(BaseAnisetteProvider):
"""Anisette provider. Generates headers without a remote server using pyprovision.""" def __init__(
self,
*,
state_blob: BinaryIO | None = None,
libs_path: str | Path | None = None,
) -> None:
"""Initialize the provider."""
super().__init__()
if isinstance(libs_path, str):
libs_path = Path(libs_path)
if libs_path is None or not libs_path.is_file():
logger.info(
"The Anisette engine will download libraries required for operation, "
"this may take a few seconds...",
)
if libs_path is None:
logger.info(
"To speed up future local Anisette initializations, "
"provide a filesystem path to load the libraries from.",
)
files: list[BinaryIO | Path] = []
if state_blob is not None:
files.append(state_blob)
if libs_path is not None and libs_path.exists():
files.append(libs_path)
self._ani = Anisette.load(*files)
self._ani_data: AnisetteHeaders | None = None
self._libs_path: Path | None = libs_path
if libs_path is not None:
self._ani.save_libs(libs_path)
if state_blob is not None and not self._ani.is_provisioned:
logger.warning(
"The Anisette state that was loaded has not yet been provisioned. "
"Was the previous session saved properly?",
)
@override
def to_json(self, dst: str | Path | None = None, /) -> LocalAnisetteMapping:
"""See `BaseAnisetteProvider.serialize`."""
with BytesIO() as buf:
self._ani.save_provisioning(buf)
prov_data = base64.b64encode(buf.getvalue()).decode("utf-8")
return save_and_return_json(
{
"type": "aniLocal",
"prov_data": prov_data,
},
dst,
)
@classmethod
@override
def from_json(
cls,
val: str | Path | LocalAnisetteMapping,
*,
libs_path: str | Path | None = None,
) -> LocalAnisetteProvider:
"""See `BaseAnisetteProvider.deserialize`."""
val = read_data_json(val)
assert val["type"] == "aniLocal"
state_blob = BytesIO(base64.b64decode(val["prov_data"]))
return cls(state_blob=state_blob, libs_path=libs_path)
@override
async def get_headers(
self,
user_id: str,
device_id: str,
serial: str = "0",
with_client_info: bool = False,
) -> dict[str, str]:
"""See `BaseAnisetteProvider.get_headers`_."""
self._ani_data = self._ani.get_data()
return await super().get_headers(user_id, device_id, serial, with_client_info)
@property @property
@override @override
def otp(self) -> str: def otp(self) -> str:
"""See `BaseAnisetteProvider.otp`_.""" """See `BaseAnisetteProvider.otp`_."""
raise NotImplementedError machine = (self._ani_data or {}).get("X-Apple-I-MD")
if machine is None:
logger.warning("X-Apple-I-MD header not found! Returning fallback...")
return machine or ""
@property @property
@override @override
def machine(self) -> str: def machine(self) -> str:
"""See `BaseAnisetteProvider.machine`_.""" """See `BaseAnisetteProvider.machine`_."""
raise NotImplementedError machine = (self._ani_data or {}).get("X-Apple-I-MD-M")
if machine is None:
logger.warning("X-Apple-I-MD-M header not found! Returning fallback...")
return machine or ""
@override @override
async def close(self) -> None: async def close(self) -> None:

View File

@@ -6,8 +6,9 @@ import base64
import hashlib import hashlib
import logging import logging
import struct import struct
from collections import defaultdict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Sequence, overload from typing import TYPE_CHECKING, Literal, TypedDict, Union, cast, overload
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
@@ -15,29 +16,52 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from typing_extensions import override from typing_extensions import override
from findmy.accessory import RollingKeyPairSource from findmy.accessory import RollingKeyPairSource
from findmy.keys import HasHashedPublicKey, KeyPair from findmy.keys import HasHashedPublicKey, KeyPair, KeyPairMapping
from findmy.util.abc import Serializable
from findmy.util.files import read_data_json, save_and_return_json
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import Sequence
from pathlib import Path
from .account import AsyncAppleAccount from .account import AsyncAppleAccount
logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LocationReport(HasHashedPublicKey): class LocationReportEncryptedMapping(TypedDict):
"""JSON mapping representing an encrypted location report."""
type: Literal["locReportEncrypted"]
payload: str
hashed_adv_key: str
class LocationReportDecryptedMapping(TypedDict):
"""JSON mapping representing a decrypted location report."""
type: Literal["locReportDecrypted"]
payload: str
hashed_adv_key: str
key: KeyPairMapping
LocationReportMapping = Union[LocationReportEncryptedMapping, LocationReportDecryptedMapping]
class LocationReport(HasHashedPublicKey, Serializable[LocationReportMapping]):
"""Location report corresponding to a certain `HasHashedPublicKey`.""" """Location report corresponding to a certain `HasHashedPublicKey`."""
def __init__( def __init__(
self, self,
payload: bytes, payload: bytes,
hashed_adv_key: bytes, hashed_adv_key: bytes,
published_at: datetime,
description: str = "",
) -> None: ) -> None:
"""Initialize a `KeyReport`. You should probably use `KeyReport.from_payload` instead.""" """Initialize a `KeyReport`. You should probably use `KeyReport.from_payload` instead."""
self._payload: bytes = payload self._payload: bytes = payload
self._hashed_adv_key: bytes = hashed_adv_key self._hashed_adv_key: bytes = hashed_adv_key
self._published_at: datetime = published_at
self._description: str = description
self._decrypted_data: tuple[KeyPair, bytes] | None = None self._decrypted_data: tuple[KeyPair, bytes] | None = None
@@ -67,9 +91,13 @@ class LocationReport(HasHashedPublicKey):
"""Whether the report is currently decrypted.""" """Whether the report is currently decrypted."""
return self._decrypted_data is not None return self._decrypted_data is not None
def can_decrypt(self, key: KeyPair, /) -> bool:
"""Whether the report can be decrypted using the given key."""
return key.hashed_adv_key_bytes == self._hashed_adv_key
def decrypt(self, key: KeyPair) -> None: def decrypt(self, key: KeyPair) -> None:
"""Decrypt the report using its corresponding `KeyPair`.""" """Decrypt the report using its corresponding `KeyPair`."""
if key.hashed_adv_key_bytes != self._hashed_adv_key: if not self.can_decrypt(key):
msg = "Cannot decrypt with this key!" msg = "Cannot decrypt with this key!"
raise ValueError(msg) raise ValueError(msg)
@@ -106,22 +134,20 @@ class LocationReport(HasHashedPublicKey):
self._decrypted_data = (key, decrypted_payload) self._decrypted_data = (key, decrypted_payload)
@property
def published_at(self) -> datetime:
"""The `datetime` when this report was published by a device."""
return self._published_at
@property
def description(self) -> str:
"""Description of the location report as published by Apple."""
return self._description
@property @property
def timestamp(self) -> datetime: def timestamp(self) -> datetime:
"""The `datetime` when this report was recorded by a device.""" """The `datetime` when this report was recorded by a device."""
timestamp_int = int.from_bytes(self._payload[0:4], "big") + (60 * 60 * 24 * 11323) timestamp_int = int.from_bytes(self._payload[0:4], "big") + (60 * 60 * 24 * 11323)
return datetime.fromtimestamp(timestamp_int, tz=timezone.utc).astimezone() return datetime.fromtimestamp(timestamp_int, tz=timezone.utc).astimezone()
@property
def confidence(self) -> int:
"""Confidence of the location of this report. Int between 1 and 3."""
# If the payload length is 88, the confidence is the 5th byte, otherwise it's the 6th byte
if len(self._payload) == 88:
return self._payload[4]
return self._payload[5]
@property @property
def latitude(self) -> float: def latitude(self) -> float:
"""Latitude of the location of this report.""" """Latitude of the location of this report."""
@@ -145,10 +171,10 @@ class LocationReport(HasHashedPublicKey):
return struct.unpack(">i", lon_bytes)[0] / 10000000 return struct.unpack(">i", lon_bytes)[0] / 10000000
@property @property
def confidence(self) -> int: def horizontal_accuracy(self) -> int:
"""Confidence of the location of this report.""" """Horizontal accuracy of the location of this report."""
if not self.is_decrypted: if not self.is_decrypted:
msg = "Confidence is unavailable while the report is encrypted." msg = "Horizontal accuracy is unavailable while the report is encrypted."
raise RuntimeError(msg) raise RuntimeError(msg)
assert self._decrypted_data is not None assert self._decrypted_data is not None
@@ -166,6 +192,86 @@ class LocationReport(HasHashedPublicKey):
status_bytes = self._decrypted_data[1][9:10] status_bytes = self._decrypted_data[1][9:10]
return int.from_bytes(status_bytes, "big") return int.from_bytes(status_bytes, "big")
@overload
def to_json(
self,
dst: str | Path | None = None,
/,
*,
include_key: Literal[True],
) -> LocationReportEncryptedMapping:
pass
@overload
def to_json(
self,
dst: str | Path | None = None,
/,
*,
include_key: Literal[False],
) -> LocationReportDecryptedMapping:
pass
@overload
def to_json(
self,
dst: str | Path | None = None,
/,
*,
include_key: None = None,
) -> LocationReportMapping:
pass
@override
def to_json(
self,
dst: str | Path | None = None,
/,
*,
include_key: bool | None = None,
) -> LocationReportMapping:
if include_key is None:
include_key = self.is_decrypted
if include_key:
return save_and_return_json(
{
"type": "locReportDecrypted",
"payload": base64.b64encode(self._payload).decode("utf-8"),
"hashed_adv_key": base64.b64encode(self._hashed_adv_key).decode("utf-8"),
"key": self.key.to_json(),
},
dst,
)
return save_and_return_json(
{
"type": "locReportEncrypted",
"payload": base64.b64encode(self._payload).decode("utf-8"),
"hashed_adv_key": base64.b64encode(self._hashed_adv_key).decode("utf-8"),
},
dst,
)
@classmethod
@override
def from_json(cls, val: str | Path | LocationReportMapping, /) -> LocationReport:
val = read_data_json(val)
assert val["type"] == "locReportEncrypted" or val["type"] == "locReportDecrypted"
try:
report = cls(
payload=base64.b64decode(val["payload"]),
hashed_adv_key=base64.b64decode(val["hashed_adv_key"]),
)
if val["type"] == "locReportDecrypted":
key = KeyPair.from_json(val["key"])
report.decrypt(key)
except KeyError as e:
msg = f"Failed to restore account data: {e}"
raise ValueError(msg) from None
else:
return report
@override @override
def __eq__(self, other: object) -> bool: def __eq__(self, other: object) -> bool:
""" """
@@ -239,97 +345,132 @@ class LocationReportsFetcher:
self, self,
date_from: datetime, date_from: datetime,
date_to: datetime, date_to: datetime,
device: Sequence[HasHashedPublicKey], device: RollingKeyPairSource,
) -> dict[HasHashedPublicKey, list[LocationReport]]: ... ) -> list[LocationReport]: ...
@overload @overload
async def fetch_reports( async def fetch_reports(
self, self,
date_from: datetime, date_from: datetime,
date_to: datetime, date_to: datetime,
device: RollingKeyPairSource, device: Sequence[HasHashedPublicKey | RollingKeyPairSource],
) -> list[LocationReport]: ... ) -> dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]: ...
async def fetch_reports( async def fetch_reports( # noqa: C901
self, self,
date_from: datetime, date_from: datetime,
date_to: datetime, date_to: datetime,
device: HasHashedPublicKey | Sequence[HasHashedPublicKey] | RollingKeyPairSource, device: HasHashedPublicKey
) -> list[LocationReport] | dict[HasHashedPublicKey, list[LocationReport]]: | RollingKeyPairSource
| Sequence[HasHashedPublicKey | RollingKeyPairSource],
) -> (
list[LocationReport] | dict[HasHashedPublicKey | RollingKeyPairSource, list[LocationReport]]
):
""" """
Fetch location reports for a certain device. Fetch location reports for a certain device.
When ``device`` is a single :class:`.HasHashedPublicKey`, this method will return When ``device`` is a single :class:`.HasHashedPublicKey`, this method will return
a list of location reports corresponding to that key. a list of location reports corresponding to that key.
When ``device`` is a sequence of :class:`.HasHashedPublicKey`s, it will return a dictionary
with the :class:`.HasHashedPublicKey` as key, and a list of location reports as value.
When ``device`` is a :class:`.RollingKeyPairSource`, it will return a list of When ``device`` is a :class:`.RollingKeyPairSource`, it will return a list of
location reports corresponding to that source. location reports corresponding to that source.
When ``device`` is a sequence of :class:`.HasHashedPublicKey`s or RollingKeyPairSource's,
it will return a dictionary with the :class:`.HasHashedPublicKey` or `.RollingKeyPairSource`
as key, and a list of location reports as value.
""" """
# single key key_devs: dict[HasHashedPublicKey, HasHashedPublicKey | RollingKeyPairSource] = {}
key_batches: list[list[HasHashedPublicKey]] = []
if isinstance(device, HasHashedPublicKey): if isinstance(device, HasHashedPublicKey):
return await self._fetch_reports(date_from, date_to, [device]) # single key
key_devs = {device: device}
# key generator key_batches.append([device])
# add 12h margin to the generator elif isinstance(device, RollingKeyPairSource):
if isinstance(device, RollingKeyPairSource): # key generator
keys = list( # add 12h margin to the generator
device.keys_between( keys = device.keys_between(
date_from - timedelta(hours=12), date_from - timedelta(hours=12),
date_to + timedelta(hours=12), date_to + timedelta(hours=12),
),
) )
key_devs = dict.fromkeys(keys, device)
key_batches.append(list(keys))
elif isinstance(device, list) and all(
isinstance(x, HasHashedPublicKey | RollingKeyPairSource) for x in device
):
# multiple key generators
# add 12h margin to each generator
device = cast("list[HasHashedPublicKey | RollingKeyPairSource]", device)
for dev in device:
if isinstance(dev, HasHashedPublicKey):
key_devs[dev] = dev
key_batches.append([dev])
elif isinstance(dev, RollingKeyPairSource):
keys = dev.keys_between(
date_from - timedelta(hours=12),
date_to + timedelta(hours=12),
)
for key in keys:
key_devs[key] = dev
key_batches.append(list(keys))
else: else:
keys = device msg = "Unknown device type: %s"
raise ValueError(msg, type(device))
# sequence of keys (fetch 256 max at a time) # sequence of keys (fetch 256 max at a time)
reports: list[LocationReport] = [] key_reports: dict[HasHashedPublicKey, list[LocationReport]] = await self._fetch_reports(
for key_offset in range(0, len(keys), 256): date_from,
chunk = keys[key_offset : key_offset + 256] date_to,
reports.extend(await self._fetch_reports(date_from, date_to, chunk)) key_batches,
)
if isinstance(device, RollingKeyPairSource): # combine (key -> list[report]) and (key -> device) into (device -> list[report])
return reports device_reports = defaultdict(list)
for key, reports in key_reports.items():
device_reports[key_devs[key]].extend(reports)
for dev in device_reports:
device_reports[dev] = sorted(device_reports[dev])
res: dict[HasHashedPublicKey, list[LocationReport]] = {key: [] for key in keys} # result
for report in reports: if isinstance(device, (HasHashedPublicKey, RollingKeyPairSource)):
for key in res: # single key or generator
if key.hashed_adv_key_bytes == report.hashed_adv_key_bytes: return device_reports[device]
res[key].append(report) # multiple static keys or key generators
break return device_reports
return res
async def _fetch_reports( async def _fetch_reports(
self, self,
date_from: datetime, date_from: datetime,
date_to: datetime, date_to: datetime,
keys: Sequence[HasHashedPublicKey], device_keys: Sequence[Sequence[HasHashedPublicKey]],
) -> list[LocationReport]: ) -> dict[HasHashedPublicKey, list[LocationReport]]:
logging.debug("Fetching reports for %s keys", len(keys)) logger.debug("Fetching reports for %s device(s)", len(device_keys))
start_date = int(date_from.timestamp() * 1000) # lock requested time range to the past 7 days, +- 12 hours, then filter the response.
end_date = int(date_to.timestamp() * 1000) # this is due to an Apple backend bug where the time range is not respected.
ids = [key.hashed_adv_key_b64 for key in keys] # More info: https://github.com/biemster/FindMy/issues/7
now = datetime.now().astimezone()
start_date = now - timedelta(days=7, hours=12)
end_date = now + timedelta(hours=12)
ids = [[key.hashed_adv_key_b64 for key in keys] for keys in device_keys]
data = await self._account.fetch_raw_reports(start_date, end_date, ids) data = await self._account.fetch_raw_reports(start_date, end_date, ids)
id_to_key: dict[bytes, HasHashedPublicKey] = {key.hashed_adv_key_bytes: key for key in keys} id_to_key: dict[bytes, HasHashedPublicKey] = {
reports: list[LocationReport] = [] key.hashed_adv_key_bytes: key for keys in device_keys for key in keys
for report in data.get("results", []): }
payload = base64.b64decode(report["payload"]) reports: dict[HasHashedPublicKey, list[LocationReport]] = defaultdict(list)
hashed_adv_key = base64.b64decode(report["id"]) for key_reports in data.get("locationPayload", []):
date_published = datetime.fromtimestamp( hashed_adv_key_bytes = base64.b64decode(key_reports["id"])
report.get("datePublished", 0) / 1000, key = id_to_key[hashed_adv_key_bytes]
tz=timezone.utc,
).astimezone()
description = report.get("description", "")
loc_report = LocationReport(payload, hashed_adv_key, date_published, description) for report in key_reports.get("locationInfo", []):
payload = base64.b64decode(report)
loc_report = LocationReport(payload, hashed_adv_key_bytes)
# pre-decrypt if possible if loc_report.timestamp < date_from or loc_report.timestamp > date_to:
key = id_to_key[hashed_adv_key] continue
if isinstance(key, KeyPair):
loc_report.decrypt(key)
reports.append(loc_report) # pre-decrypt if possible
if isinstance(key, KeyPair):
loc_report.decrypt(key)
reports[key].append(loc_report)
return reports return reports

View File

@@ -7,7 +7,7 @@ from .scanner import (
) )
__all__ = ( __all__ = (
"OfflineFindingScanner",
"NearbyOfflineFindingDevice", "NearbyOfflineFindingDevice",
"OfflineFindingScanner",
"SeparatedOfflineFindingDevice", "SeparatedOfflineFindingDevice",
) )

View File

@@ -6,8 +6,8 @@ import asyncio
import logging import logging
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, AsyncGenerator from typing import TYPE_CHECKING, Any
from bleak import BleakScanner from bleak import BleakScanner
from typing_extensions import override from typing_extensions import override
@@ -16,10 +16,12 @@ from findmy.accessory import RollingKeyPairSource
from findmy.keys import HasPublicKey from findmy.keys import HasPublicKey
if TYPE_CHECKING: if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from bleak.backends.device import BLEDevice from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData from bleak.backends.scanner import AdvertisementData
logging.getLogger(__name__) logger = logging.getLogger(__name__)
class OfflineFindingDevice(ABC): class OfflineFindingDevice(ABC):
@@ -28,13 +30,6 @@ class OfflineFindingDevice(ABC):
OF_HEADER_SIZE = 2 OF_HEADER_SIZE = 2
OF_TYPE = 0x12 OF_TYPE = 0x12
@classmethod
@property
@abstractmethod
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
raise NotImplementedError
def __init__( def __init__(
self, self,
mac_bytes: bytes, mac_bytes: bytes,
@@ -96,18 +91,22 @@ class OfflineFindingDevice(ABC):
) -> OfflineFindingDevice | None: ) -> OfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from a BLE packet payload.""" """Get a NearbyOfflineFindingDevice object from a BLE packet payload."""
if len(ble_payload) < cls.OF_HEADER_SIZE: if len(ble_payload) < cls.OF_HEADER_SIZE:
logging.error("Not enough bytes to decode: %s", len(ble_payload)) logger.error("Not enough bytes to decode: %s", len(ble_payload))
return None return None
if ble_payload[0] != cls.OF_TYPE: if ble_payload[0] != cls.OF_TYPE:
logging.debug("Unsupported OF type: %s", ble_payload[0]) logger.debug("Unsupported OF type: %s", ble_payload[0])
return None return None
device_type = next( device_type = next(
(dev for dev in cls.__subclasses__() if dev.payload_len == ble_payload[1]), (
dev
for dev in _DEVICE_TYPES
if getattr(dev, "OF_PAYLOAD_LEN", None) == ble_payload[1]
),
None, None,
) )
if device_type is None: if device_type is None:
logging.error("Invalid OF payload length: %s", ble_payload[1]) logger.error("Invalid OF payload length: %s", ble_payload[1])
return None return None
return device_type.from_payload( return device_type.from_payload(
@@ -132,12 +131,7 @@ class OfflineFindingDevice(ABC):
class NearbyOfflineFindingDevice(OfflineFindingDevice): class NearbyOfflineFindingDevice(OfflineFindingDevice):
"""Offline-Finding device in nearby state.""" """Offline-Finding device in nearby state."""
@classmethod OF_PAYLOAD_LEN = 0x02 # 2
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data payload in bytes."""
return 0x02 # 2
def __init__( def __init__(
self, self,
@@ -158,7 +152,12 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
if isinstance(other_device, HasPublicKey): if isinstance(other_device, HasPublicKey):
return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes) return other_device.adv_key_bytes.startswith(self._first_adv_key_bytes)
if isinstance(other_device, RollingKeyPairSource): if isinstance(other_device, RollingKeyPairSource):
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at)) # 1 hour margin around the detected time
potential_keys = other_device.keys_between(
self.detected_at - timedelta(hours=1),
self.detected_at + timedelta(hours=1),
)
return any(self.is_from(key) for key in potential_keys)
msg = f"Cannot compare against {type(other_device)}" msg = f"Cannot compare against {type(other_device)}"
raise ValueError(msg) raise ValueError(msg)
@@ -173,8 +172,8 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
additional_data: dict[Any, Any] | None = None, additional_data: dict[Any, Any] | None = None,
) -> NearbyOfflineFindingDevice | None: ) -> NearbyOfflineFindingDevice | None:
"""Get a NearbyOfflineFindingDevice object from an OF message payload.""" """Get a NearbyOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.payload_len: if len(payload) != cls.OF_PAYLOAD_LEN:
logging.error( logger.error(
"Invalid OF data length: %s instead of %s", "Invalid OF data length: %s instead of %s",
len(payload), len(payload),
payload[1], payload[1],
@@ -201,12 +200,7 @@ class NearbyOfflineFindingDevice(OfflineFindingDevice):
class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey): class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
"""Offline-Finding device in separated state.""" """Offline-Finding device in separated state."""
@classmethod OF_PAYLOAD_LEN = 0x19 # 25
@property
@override
def payload_len(cls) -> int:
"""Length of OfflineFinding data in bytes."""
return 0x19 # 25
def __init__( # noqa: PLR0913 def __init__( # noqa: PLR0913
self, self,
@@ -240,7 +234,12 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
if isinstance(other_device, HasPublicKey): if isinstance(other_device, HasPublicKey):
return self.adv_key_bytes == other_device.adv_key_bytes return self.adv_key_bytes == other_device.adv_key_bytes
if isinstance(other_device, RollingKeyPairSource): if isinstance(other_device, RollingKeyPairSource):
return any(self.is_from(key) for key in other_device.keys_at(self.detected_at)) # 12 hour margin around the detected time
potential_keys = other_device.keys_between(
self.detected_at - timedelta(hours=12),
self.detected_at + timedelta(hours=12),
)
return any(self.is_from(key) for key in potential_keys)
msg = f"Cannot compare against {type(other_device)}" msg = f"Cannot compare against {type(other_device)}"
raise ValueError(msg) raise ValueError(msg)
@@ -255,8 +254,8 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
additional_data: dict[Any, Any] | None = None, additional_data: dict[Any, Any] | None = None,
) -> SeparatedOfflineFindingDevice | None: ) -> SeparatedOfflineFindingDevice | None:
"""Get a SeparatedOfflineFindingDevice object from an OF message payload.""" """Get a SeparatedOfflineFindingDevice object from an OF message payload."""
if len(payload) != cls.payload_len: if len(payload) != cls.OF_PAYLOAD_LEN:
logging.error( logger.error(
"Invalid OF data length: %s instead of %s", "Invalid OF data length: %s instead of %s",
len(payload), len(payload),
payload[1], payload[1],
@@ -294,6 +293,12 @@ class SeparatedOfflineFindingDevice(OfflineFindingDevice, HasPublicKey):
) )
_DEVICE_TYPES = {
NearbyOfflineFindingDevice,
SeparatedOfflineFindingDevice,
}
class OfflineFindingScanner: class OfflineFindingScanner:
"""BLE scanner that searches for `OfflineFindingDevice`s.""" """BLE scanner that searches for `OfflineFindingDevice`s."""
@@ -324,7 +329,7 @@ class OfflineFindingScanner:
async def _start_scan(self) -> None: async def _start_scan(self) -> None:
async with self._scan_ctrl_lock: async with self._scan_ctrl_lock:
if self._scanner_count == 0: if self._scanner_count == 0:
logging.info("Starting BLE scanner") logger.info("Starting BLE scanner")
await self._scanner.start() await self._scanner.start()
self._scanner_count += 1 self._scanner_count += 1
@@ -332,7 +337,7 @@ class OfflineFindingScanner:
async with self._scan_ctrl_lock: async with self._scan_ctrl_lock:
self._scanner_count -= 1 self._scanner_count -= 1
if self._scanner_count == 0: if self._scanner_count == 0:
logging.info("Stopping BLE scanner") logger.info("Stopping BLE scanner")
await self._scanner.stop() await self._scanner.stop()
async def _scan_callback( async def _scan_callback(
@@ -393,7 +398,8 @@ class OfflineFindingScanner:
yield device yield device
time_left = stop_at - time.time() time_left = stop_at - time.time()
except (asyncio.CancelledError, asyncio.TimeoutError): # timeout reached except asyncio.TimeoutError: # timeout reached
self._device_fut = self._loop.create_future()
return return
finally: finally:
await self._stop_scan() await self._stop_scan()

View File

@@ -3,4 +3,4 @@
from .http import HttpResponse, HttpSession from .http import HttpResponse, HttpSession
from .parsers import decode_plist from .parsers import decode_plist
__all__ = ("HttpSession", "HttpResponse", "decode_plist") __all__ = ("HttpResponse", "HttpSession", "decode_plist")

79
findmy/util/abc.py Normal file
View File

@@ -0,0 +1,79 @@
"""Various utility ABCs for internal and external classes."""
from __future__ import annotations
import asyncio
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, Self, TypeVar
if TYPE_CHECKING:
from pathlib import Path
logger = logging.getLogger(__name__)
class Closable(ABC):
"""ABC for async classes that need to be cleaned up before exiting."""
def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None:
"""
Initialize the ``Closable``.
If an event loop is given, the ``Closable`` will attempt to close itself
using the loop when it is garbage collected.
"""
self._loop: asyncio.AbstractEventLoop | None = loop
@abstractmethod
async def close(self) -> None:
"""Clean up."""
raise NotImplementedError
def __del__(self) -> None:
"""Attempt to automatically clean up when garbage collected."""
try:
loop = self._loop or asyncio.get_running_loop()
if loop.is_running():
loop.call_soon_threadsafe(loop.create_task, self.close())
else:
loop.run_until_complete(self.close())
except RuntimeError:
pass
_T = TypeVar("_T", bound=Mapping)
class Serializable(Generic[_T], ABC):
"""ABC for serializable classes."""
@abstractmethod
def to_json(self, dst: str | Path | None = None, /) -> _T:
"""
Export the current state of the object as a JSON-serializable dictionary.
If an argument is provided, the output will also be written to that file.
The output of this method is guaranteed to be JSON-serializable, and passing
the return value of this function as an argument to `Serializable.from_json`
will always result in an exact copy of the internal state as it was when exported.
You are encouraged to save and load object states to and from disk whenever possible,
to prevent unnecessary API calls or otherwise unexpected behavior.
"""
raise NotImplementedError
@classmethod
@abstractmethod
def from_json(cls, val: str | Path | _T, /) -> Self:
"""
Restore state from a previous `Closable.to_json` export.
If given a str or Path, it must point to a json file from `Serializable.to_json`.
Otherwise, it should be the Mapping itself.
See `Serializable.to_json` for more information.
"""
raise NotImplementedError

View File

@@ -1,38 +0,0 @@
"""ABC for async classes that need to be cleaned up before exiting."""
from __future__ import annotations
import asyncio
import logging
from abc import ABC, abstractmethod
logging.getLogger(__name__)
class Closable(ABC):
"""ABC for async classes that need to be cleaned up before exiting."""
def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None:
"""
Initialize the ``Closable``.
If an event loop is given, the ``Closable`` will attempt to close itself
using the loop when it is garbage collected.
"""
self._loop: asyncio.AbstractEventLoop | None = loop
@abstractmethod
async def close(self) -> None:
"""Clean up."""
raise NotImplementedError
def __del__(self) -> None:
"""Attempt to automatically clean up when garbage collected."""
try:
loop = self._loop or asyncio.get_running_loop()
if loop.is_running():
loop.call_soon_threadsafe(loop.create_task, self.close())
else:
loop.run_until_complete(self.close())
except RuntimeError:
pass

34
findmy/util/files.py Normal file
View File

@@ -0,0 +1,34 @@
"""Utilities to simplify reading and writing data from and to files."""
from __future__ import annotations
import json
from collections.abc import Mapping
from pathlib import Path
from typing import TypeVar, cast
T = TypeVar("T", bound=Mapping)
def save_and_return_json(data: T, dst: str | Path | None) -> T:
"""Save and return a JSON-serializable data structure."""
if dst is None:
return data
if isinstance(dst, str):
dst = Path(dst)
dst.write_text(json.dumps(data, indent=4))
return data
def read_data_json(val: str | Path | T) -> T:
"""Read JSON data from a file if a path is passed, or return the argument itself."""
if isinstance(val, str):
val = Path(val)
if isinstance(val, Path):
val = cast("T", json.loads(val.read_text()))
return val

View File

@@ -2,22 +2,25 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import json import json
import logging import logging
from typing import Any, TypedDict, cast from typing import Any, TypedDict, cast
import aiohttp
from aiohttp import BasicAuth, ClientSession, ClientTimeout from aiohttp import BasicAuth, ClientSession, ClientTimeout
from typing_extensions import Unpack, override from typing_extensions import Unpack, override
from .closable import Closable from .abc import Closable
from .parsers import decode_plist from .parsers import decode_plist
logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _RequestOptions(TypedDict, total=False): class _RequestOptions(TypedDict, total=False):
json: dict[str, Any] | None json: dict[str, Any] | None
headers: dict[str, str] headers: dict[str, str]
auto_retry: bool
data: bytes data: bytes
@@ -72,22 +75,36 @@ class HttpSession(Closable):
super().__init__() super().__init__()
self._session: ClientSession | None = None self._session: ClientSession | None = None
self._closed: bool = False
async def _get_session(self) -> ClientSession: async def _get_session(self) -> ClientSession:
if self._closed:
msg = "HttpSession has been closed and cannot be used"
raise RuntimeError(msg)
if self._session is not None: if self._session is not None:
return self._session return self._session
logging.debug("Creating aiohttp session") logger.debug("Creating aiohttp session")
self._session = ClientSession(timeout=ClientTimeout(total=5)) self._session = ClientSession(timeout=ClientTimeout(total=5))
return self._session return self._session
@override @override
async def close(self) -> None: async def close(self) -> None:
"""Close the underlying session. Should be called when session will no longer be used.""" """Close the underlying session. Should be called when session will no longer be used."""
if self._closed:
return # Already closed, make it idempotent
self._closed = True
if self._session is not None: if self._session is not None:
logging.debug("Closing aiohttp session") logger.debug("Closing aiohttp session")
await self._session.close() try:
self._session = None await self._session.close()
except (RuntimeError, OSError, ConnectionError) as e:
logger.warning("Error closing aiohttp session: %s", e)
finally:
self._session = None
async def request( async def request(
self, self,
@@ -103,20 +120,37 @@ class HttpSession(Closable):
session = await self._get_session() session = await self._get_session()
# cast from http options to library supported options # cast from http options to library supported options
auth = kwargs.get("auth") auth = kwargs.pop("auth", None)
if isinstance(auth, tuple): if isinstance(auth, tuple):
kwargs["auth"] = BasicAuth(auth[0], auth[1]) kwargs["auth"] = BasicAuth(auth[0], auth[1])
else: options = cast("_AiohttpRequestOptions", kwargs)
kwargs.pop("auth")
options = cast(_AiohttpRequestOptions, kwargs)
async with await session.request( auto_retry = kwargs.pop("auto_retry", False)
method,
url, retry_count = 1
ssl=False, while True: # if auto_retry is set, raise for status and retry on error
**options, try:
) as r: async with await session.request(
return HttpResponse(r.status, await r.content.read()) method,
url,
ssl=False,
raise_for_status=auto_retry,
**options,
) as r:
return HttpResponse(r.status, await r.content.read())
except aiohttp.ClientError as e: # noqa: PERF203
if not auto_retry or retry_count > 3:
raise e from None
retry_after = 5 * retry_count
logger.warning(
"Error while making HTTP request; retrying after %i seconds. %s",
retry_after,
e,
)
await asyncio.sleep(retry_after)
retry_count += 1
async def get(self, url: str, **kwargs: Unpack[_HttpRequestOptions]) -> HttpResponse: async def get(self, url: str, **kwargs: Unpack[_HttpRequestOptions]) -> HttpResponse:
"""Alias for `HttpSession.request("GET", ...)`.""" """Alias for `HttpSession.request("GET", ...)`."""

View File

@@ -15,3 +15,8 @@ def decode_plist(data: bytes) -> Any: # noqa: ANN401
data = plist_header + data data = plist_header + data
return plistlib.loads(data) return plistlib.loads(data)
def format_hex_byte(byte: int) -> str:
"""Format a byte as a two character hex string in uppercase."""
return f"{byte:02x}".upper()

144
findmy/util/session.py Normal file
View File

@@ -0,0 +1,144 @@
"""Logic related to serializable classes."""
from __future__ import annotations
import random
from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Union
from findmy.util.abc import Closable, Serializable
if TYPE_CHECKING:
from pathlib import Path
from types import TracebackType
_S = TypeVar("_S", bound=Serializable)
_SC = TypeVar("_SC", bound=Union[Serializable, Closable])
class _BaseSessionManager(Generic[_SC]):
"""Base class for session managers."""
def __init__(self) -> None:
self._sessions: dict[_SC, str | Path | None] = {}
def _add(self, obj: _SC, path: str | Path | None) -> None:
self._sessions[obj] = path
def remove(self, obj: _SC) -> None:
self._sessions.pop(obj, None)
def save(self) -> None:
for obj, path in self._sessions.items():
if isinstance(obj, Serializable):
obj.to_json(path)
async def close(self) -> None:
for obj in self._sessions:
if isinstance(obj, Closable):
await obj.close()
async def save_and_close(self) -> None:
for obj, path in self._sessions.items():
if isinstance(obj, Serializable):
obj.to_json(path)
if isinstance(obj, Closable):
await obj.close()
def get_random(self) -> _SC:
if not self._sessions:
msg = "No objects in the session manager."
raise ValueError(msg)
return random.choice(list(self._sessions.keys())) # noqa: S311
def __len__(self) -> int:
return len(self._sessions)
def __enter__(self) -> Self:
return self
def __exit__(
self,
_exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> None:
self.save()
class MixedSessionManager(_BaseSessionManager[Union[Serializable, Closable]]):
"""Allows any Serializable or Closable object."""
def new(
self,
c_type: type[_SC],
path: str | Path | None = None,
/,
*args: Any, # noqa: ANN401
**kwargs: Any, # noqa: ANN401
) -> _SC:
"""Add an object to the manager by instantiating it using its constructor."""
obj = c_type(*args, **kwargs)
if isinstance(obj, Serializable) and path is not None:
obj.to_json(path)
self._add(obj, path)
return obj
def add_from_json(
self,
c_type: type[_S],
path: str | Path,
/,
**kwargs: Any, # noqa: ANN401
) -> _S:
"""Add an object to the manager by deserializing it from its JSON representation."""
obj = c_type.from_json(path, **kwargs)
self._add(obj, path)
return obj
def add(self, obj: Serializable | Closable, path: str | Path | None = None, /) -> None:
"""Add an object to the session manager."""
self._add(obj, path)
class UniformSessionManager(Generic[_SC], _BaseSessionManager[_SC]):
"""Only allows a single type of Serializable object."""
def __init__(self, obj_type: type[_SC]) -> None:
"""Create a new session manager."""
super().__init__()
self._obj_type = obj_type
def new(
self,
path: str | Path | None = None,
/,
*args: Any, # noqa: ANN401
**kwargs: Any, # noqa: ANN401
) -> _SC:
"""Add an object to the manager by instantiating it using its constructor."""
obj = self._obj_type(*args, **kwargs)
if isinstance(obj, Serializable) and path is not None:
obj.to_json(path)
self._add(obj, path)
return obj
def add_from_json(
self,
path: str | Path,
/,
**kwargs: Any, # noqa: ANN401
) -> _SC:
"""Add an object to the manager by deserializing it from its JSON representation."""
if not issubclass(self._obj_type, Serializable):
msg = "Can only add objects of type Serializable."
raise TypeError(msg)
obj = self._obj_type.from_json(path, **kwargs)
self._add(obj, path)
return obj
def add(self, obj: _SC, path: str | Path | None = None, /) -> None:
"""Add an object to the session manager."""
if not isinstance(obj, self._obj_type):
msg = f"Object must be of type {self._obj_type.__name__}"
raise TypeError(msg)
self._add(obj, path)

View File

@@ -1,6 +1,7 @@
"""Utility types.""" """Utility types."""
from typing import Coroutine, TypeVar, Union from collections.abc import Coroutine
from typing import TypeVar, Union
T = TypeVar("T") T = TypeVar("T")

2051
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,44 +1,39 @@
[tool.poetry] [project]
name = "FindMy" name = "FindMy"
version = "v0.7.3" version = "0.8.0"
description = "Everything you need to work with Apple's Find My network!" description = "Everything you need to work with Apple's Find My network!"
authors = ["Mike Almeloo <git@mikealmel.ooo>"]
readme = "README.md" readme = "README.md"
packages = [{ include = "findmy" }] authors = [{ name = "Mike Almeloo", email = "git@mikealmel.ooo" }]
license-files = ["LICENSE.md"]
requires-python = ">=3.9,<3.14"
dependencies = [
"srp>=1.0.21,<2.0.0",
"cryptography>=42.0.0,<46.0.0",
"beautifulsoup4>=4.12.3,<5.0.0",
"aiohttp>=3.9.5,<4.0.0",
"bleak>=1.0.0,<2.0.0",
"typing-extensions>=4.12.2,<5.0.0",
"anisette>=1.2.1,<2.0.0",
]
[tool.poetry.dependencies] [tool.uv.dependency-groups.docs]
python = ">=3.9,<3.13" requires-python = ">=3.11"
srp = "^1.0.21"
cryptography = ">=42.0.0,<44.0.0"
beautifulsoup4 = "^4.12.3"
aiohttp = "^3.9.5"
bleak = "^0.22.2"
typing-extensions = "^4.12.2"
[tool.poetry.group.dev] [dependency-groups]
optional = true dev = [
"pre-commit>=4.0.0,<5.0.0",
[tool.poetry.group.dev.dependencies] "basedpyright>=1.31.1,<2.0.0",
pre-commit = "^3.8.0" "ruff>=0.8.4,<1.0.0",
pyright = "1.1.378" "tomli>=2.0.1,<3.0.0",
ruff = "0.6.3" "packaging>=25.0,<26.0",
tomli = "^2.0.1" ]
packaging = "^24.1" test = ["pytest>=8.3.2,<9.0.0"]
docs = [
[tool.poetry.group.test] "furo>=2025.7.19",
optional = true "myst-parser>=4.0.1",
"sphinx>=8.2.3,<8.3.0",
[tool.poetry.group.test.dependencies] "sphinx-autoapi==3.6.0",
pytest = "^8.3.2" ]
[tool.poetry.group.docs]
optional = true
[tool.poetry.group.docs.dependencies]
sphinx = "^7.2.6"
sphinx-autoapi = "3.3.1"
furo = "^2024.1.29"
myst-parser = "^2.0.0"
[tool.pyright] [tool.pyright]
venvPath = "." venvPath = "."
@@ -48,51 +43,50 @@ venv = ".venv"
typeCheckingMode = "standard" typeCheckingMode = "standard"
reportImplicitOverride = true reportImplicitOverride = true
# examples should be run from their own directory
executionEnvironments = [
{ root = "examples/" }
]
[tool.ruff] [tool.ruff]
line-length = 100 line-length = 100
exclude = [ exclude = ["docs/", "tests/"]
"docs/",
"tests/"
]
[tool.ruff.lint] [tool.ruff.lint]
select = [ select = ["ALL"]
"ALL",
]
ignore = [ ignore = [
"ANN101", # annotations on `self`
"ANN102", # annotations on `cls`
"FIX002", # resolving TODOs "FIX002", # resolving TODOs
"D203", # one blank line before class docstring "D203", # one blank line before class docstring
"D212", # multi-line docstring start at first line "D212", # multi-line docstring start at first line
"D105", # docstrings in magic methods "D105", # docstrings in magic methods
"S101", # assert statements "S101", # assert statements
"S603", # false-positive subprocess call (https://github.com/astral-sh/ruff/issues/4045) "S603", # false-positive subprocess call (https://github.com/astral-sh/ruff/issues/4045)
"PLR2004", # "magic" values >.> "PLR2004", # "magic" values >.>
"FBT", # boolean "traps" "FBT", # boolean "traps"
"COM812", # trailing commas
] ]
[tool.ruff.lint.per-file-ignores] [tool.ruff.lint.per-file-ignores]
"docs/*" = [
"INP001", # implicit namespaces
]
"examples/*" = [ "examples/*" = [
"T201", # use of "print" "T201", # use of "print"
"S101", # use of "assert" "S101", # use of "assert"
"D", # documentation "D", # documentation
"INP001", # namespacing "INP001", # namespacing
] ]
"scripts/*" = [ "scripts/*" = [
"T201", # use of "print" "T201", # use of "print"
"D", # documentation "D", # documentation
]
"tests/*" = [
"INP001", # implicit namespaces
"PLC0415", # import not on top of file
] ]
[tool.setuptools]
py-modules = ["findmy"]
[build-system] [build-system]
requires = ["poetry-core"] requires = ["setuptools", "setuptools-scm"]
build-backend = "poetry.core.masonry.api" build-backend = "setuptools.build_meta"

View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
from collections.abc import Generator
from itertools import count from itertools import count
from pathlib import Path from pathlib import Path
from typing import Generator
import tomli import tomli
from packaging.specifiers import SpecifierSet from packaging.specifiers import SpecifierSet
@@ -15,7 +15,7 @@ def get_python_versions() -> Generator[str, None, None]:
with Path("pyproject.toml").open("rb") as f: with Path("pyproject.toml").open("rb") as f:
pyproject_data = tomli.load(f) pyproject_data = tomli.load(f)
specifier = SpecifierSet(pyproject_data["tool"]["poetry"]["dependencies"]["python"]) specifier = SpecifierSet(pyproject_data["project"]["requires-python"])
below_spec = True below_spec = True
for v_minor in count(): for v_minor in count():

View File

@@ -1,9 +1,13 @@
{ pkgs ? import <nixpkgs> {} }: { pkgs ? import <nixpkgs> {} }:
let
unstable = import (fetchTarball https://channels.nixos.org/nixos-unstable/nixexprs.tar.xz) { };
in
pkgs.mkShell { pkgs.mkShell {
packages = with pkgs; [ packages = with pkgs; [
python312 python312
poetry unstable.uv
gh
]; ];
shellHook = '' shellHook = ''

View File

@@ -1,8 +1,11 @@
"""Key generation tests."""
import pytest import pytest
@pytest.mark.parametrize('execution_number', range(100)) @pytest.mark.parametrize("execution_number", range(100))
def test_import(execution_number): def test_keypair(execution_number: int) -> None: # noqa: ARG001
"""Test generation of new keypairs."""
import findmy import findmy
kp = findmy.KeyPair.new() kp = findmy.KeyPair.new()

2043
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff