Files
FindMy.py/findmy/account.py
2023-12-31 14:02:56 +01:00

526 lines
17 KiB
Python

import base64
import hashlib
import hmac
import json
import logging
import plistlib
import uuid
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from enum import Enum
from functools import wraps
from typing import Optional, TypedDict, Any
from typing import Sequence
import bs4
import srp._pysrp as srp
from cryptography.hazmat.primitives import padding, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .anisette import AnisetteProvider
from .http import HttpSession
from .keys import KeyPair
from .reports import fetch_reports
logging.getLogger(__name__)
srp.rfc5054_enable()
srp.no_username_in_x()
class LoginState(Enum):
LOGGED_OUT = 0
REQUIRE_2FA = 1
AUTHENTICATED = 2
LOGGED_IN = 3
def __lt__(self, other):
if isinstance(other, LoginState):
return self.value < other.value
return NotImplemented
def __repr__(self):
return self.__str__()
class AccountInfo(TypedDict):
account_name: str
first_name: str
last_name: str
class LoginException(Exception):
pass
class InvalidStateException(RuntimeError):
pass
class ExportRestoreError(ValueError):
pass
def _load_plist(data: bytes) -> Any:
plist_header = (
b"<?xml version='1.0' encoding='UTF-8'?>"
b"<!DOCTYPE plist PUBLIC '-//Apple//DTD PLIST 1.0//EN' 'http://www.apple.com/DTDs/PropertyList-1.0.dtd'>"
)
if not data.startswith(b"<?xml"):
data = plist_header + data
return plistlib.loads(data)
def _encrypt_password(password: str, salt: bytes, iterations: int) -> bytes:
p = hashlib.sha256(password.encode("utf-8")).digest()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=iterations,
)
return kdf.derive(p)
def _decrypt_cbc(session_key: bytes, data: bytes) -> bytes:
extra_data_key = hmac.new(session_key, b"extra data key:", hashlib.sha256).digest()
extra_data_iv = hmac.new(session_key, b"extra data iv:", hashlib.sha256).digest()
# Get only the first 16 bytes of the iv
extra_data_iv = extra_data_iv[:16]
# Decrypt with AES CBC
cipher = Cipher(algorithms.AES(extra_data_key), modes.CBC(extra_data_iv))
decryptor = cipher.decryptor()
data = decryptor.update(data) + decryptor.finalize()
# Remove PKCS#7 padding
padder = padding.PKCS7(128).unpadder()
return padder.update(data) + padder.finalize()
def _extract_phone_numbers(html: str) -> list[dict]:
soup = bs4.BeautifulSoup(html, features="html.parser")
data_elem = soup.find("script", **{"class": "boot_args"})
if not data_elem:
raise RuntimeError("Could not find HTML element containing phone numbers")
data = json.loads(data_elem.text)
return (
data.get("direct", {})
.get("phoneNumberVerification", {})
.get("trustedPhoneNumbers", [])
)
def _require_login_state(*states: LoginState):
def decorator(func):
@wraps(func)
def wrapper(acc: "AppleAccount", *args, **kwargs):
if acc.login_state not in states:
raise InvalidStateException(
f"Invalid login state! Currently: {acc.login_state} but should be one of: {states}"
)
return func(acc, *args, **kwargs)
return wrapper
return decorator
class SecondFactorMethod(ABC):
def __init__(self, account: "AppleAccount"):
self._account = account
@property
def account(self):
return self._account
@abstractmethod
def request(self) -> None:
raise NotImplementedError()
@abstractmethod
def submit(self, code: str) -> LoginState:
raise NotImplementedError()
class SmsSecondFactor(SecondFactorMethod):
def __init__(self, account: "AppleAccount", number_id: int, phone_number: str):
super().__init__(account)
self._phone_number_id: int = number_id
self._phone_number: str = phone_number
@property
def phone_number(self):
return self._phone_number
async def request(self):
return await self.account.sms_2fa_request(self._phone_number_id)
async def submit(self, code: str) -> LoginState:
return await self.account.sms_2fa_submit(self._phone_number_id, code)
class AppleAccount:
def __init__(
self, anisette: AnisetteProvider, user_id: str = None, device_id: str = None
):
self._anisette: AnisetteProvider = anisette
self._uid: str = user_id or str(uuid.uuid4())
self._devid: str = device_id or str(uuid.uuid4())
self._username: Optional[str] = None
self._password: Optional[str] = None
self._login_state: LoginState = LoginState.LOGGED_OUT
self._login_state_data: dict = {}
self._account_info: Optional[AccountInfo] = None
self._http = HttpSession()
def _set_login_state(
self, state: LoginState, data: Optional[dict] = None
) -> LoginState:
# clear account info if downgrading state (e.g. LOGGED_IN -> LOGGED_OUT)
if state < self._login_state:
logging.debug("Clearing cached account information")
self._account_info = None
logging.info(f"Transitioning login state: {self._login_state} -> {state}")
self._login_state = state
self._login_state_data = data or {}
return state
@property
def login_state(self):
return self._login_state
@property
@_require_login_state(
LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA
)
def account_name(self):
return self._account_info["account_name"] if self._account_info else None
@property
@_require_login_state(
LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA
)
def first_name(self):
return self._account_info["first_name"] if self._account_info else None
@property
@_require_login_state(
LoginState.LOGGED_IN, LoginState.AUTHENTICATED, LoginState.REQUIRE_2FA
)
def last_name(self):
return self._account_info["last_name"] if self._account_info else None
def export(self) -> dict:
return {
"ids": {"uid": self._uid, "devid": self._devid},
"account": {
"username": self._username,
"password": self._password,
"info": self._account_info,
},
"login_state": {
"state": self._login_state.value,
"data": self._login_state_data,
},
}
def restore(self, data: dict) -> None:
try:
self._uid = data["ids"]["uid"]
self._devid = data["ids"]["devid"]
self._username = data["account"]["username"]
self._password = data["account"]["password"]
self._account_info = data["account"]["info"]
self._login_state = LoginState(data["login_state"]["state"])
self._login_state_data = data["login_state"]["data"]
except KeyError as e:
raise ExportRestoreError(f"Failed to restore account data: {e}")
async def close(self):
await self._anisette.close()
await self._http.close()
@_require_login_state(LoginState.LOGGED_OUT)
async def login(self, username: str, password: str) -> LoginState:
# LOGGED_OUT -> (REQUIRE_2FA or AUTHENTICATED)
new_state = await self._gsa_authenticate(username, password)
if new_state == LoginState.REQUIRE_2FA: # pass control back to handle 2FA
return new_state
# AUTHENTICATED -> LOGGED_IN
return await self._login_mobileme()
@_require_login_state(LoginState.REQUIRE_2FA)
async def get_2fa_methods(self) -> list[SecondFactorMethod]:
methods: list[SecondFactorMethod] = []
# sms
auth_page = await self._sms_2fa_request("GET", "https://gsa.apple.com/auth")
try:
phone_numbers = _extract_phone_numbers(auth_page)
methods.extend(
SmsSecondFactor(
self, number.get("id"), number.get("numberWithDialCode")
)
for number in phone_numbers
)
except RuntimeError:
logging.warning("Unable to extract phone numbers from login page")
return methods
@_require_login_state(LoginState.REQUIRE_2FA)
async def sms_2fa_request(self, phone_number_id):
data = {"phoneNumber": {"id": phone_number_id}, "mode": "sms"}
await self._sms_2fa_request(
"PUT", "https://gsa.apple.com/auth/verify/phone", data
)
@_require_login_state(LoginState.REQUIRE_2FA)
async def sms_2fa_submit(self, phone_number_id: int, code: str) -> LoginState:
data = {
"phoneNumber": {"id": phone_number_id},
"securityCode": {"code": str(code)},
"mode": "sms",
}
await self._sms_2fa_request(
"POST", "https://gsa.apple.com/auth/verify/phone/securitycode", data
)
# REQUIRE_2FA -> AUTHENTICATED
new_state = await self._gsa_authenticate()
if new_state != LoginState.AUTHENTICATED:
raise LoginException(f"Unexpected state after submitting 2FA: {new_state}")
# AUTHENTICATED -> LOGGED_IN
return await self._login_mobileme()
@_require_login_state(LoginState.LOGGED_IN)
async def fetch_reports(
self, keys: Sequence[KeyPair], date_from: datetime, date_to: datetime
):
anisette_headers = await self.get_anisette_headers()
return await fetch_reports(
self._login_state_data["dsid"],
self._login_state_data["mobileme_data"]["tokens"]["searchPartyToken"],
anisette_headers,
date_from,
date_to,
keys,
)
@_require_login_state(LoginState.LOGGED_IN)
async def fetch_last_reports(
self,
keys: Sequence[KeyPair],
hours: int = 7 * 24,
):
end = datetime.now()
start = end - timedelta(hours=hours)
return await self.fetch_reports(keys, start, end)
@_require_login_state(LoginState.LOGGED_OUT, LoginState.REQUIRE_2FA)
async def _gsa_authenticate(
self, username: Optional[str] = None, password: Optional[str] = None
) -> LoginState:
self._username = username or self._username
self._password = password or self._password
logging.info(f"Attempting authentication for user {self._username}")
if not self._username or not self._password:
raise ValueError("No username or password to log in")
logging.debug("Starting authentication with username")
usr = srp.User(self._username, b"", hash_alg=srp.SHA256, ng_type=srp.NG_2048)
_, a2k = usr.start_authentication()
r = await self._gsa_request(
{"A2k": a2k, "u": self._username, "ps": ["s2k", "s2k_fo"], "o": "init"}
)
logging.debug("Verifying response to auth request")
if r["Status"].get("ec") != 0:
message = r["Status"].get("em")
raise LoginException(f"Email verify failed: {message}")
sp = r.get("sp")
if sp != "s2k":
raise LoginException(
f"This implementation only supports s2k. Server returned {sp}"
)
logging.debug("Attempting password challenge")
usr.p = _encrypt_password(self._password, r["s"], r["i"])
m1 = usr.process_challenge(r["s"], r["B"])
if m1 is None:
raise LoginException("Failed to process challenge")
r = await self._gsa_request(
{"c": r["c"], "M1": m1, "u": self._username, "o": "complete"}
)
logging.debug("Verifying password challenge response")
if r["Status"].get("ec") != 0:
message = r["Status"].get("em")
raise LoginException(f"Password authentication failed: {message}")
usr.verify_session(r.get("M2"))
if not usr.authenticated():
raise LoginException("Failed to verify session")
logging.debug("Decrypting SPD data in response")
spd = _decrypt_cbc(usr.get_session_key(), r["spd"])
spd = _load_plist(spd)
logging.debug("Received account information")
self._account_info: AccountInfo = {
"account_name": spd.get("acname"),
"first_name": spd.get("fn"),
"last_name": spd.get("ln"),
}
# TODO: support trusted device auth (need account to test)
au = r["Status"].get("au")
if au in ("secondaryAuth",):
logging.info(f"Detected 2FA requirement: {au}")
return self._set_login_state(
LoginState.REQUIRE_2FA,
{"adsid": spd["adsid"], "idms_token": spd["GsIdmsToken"]},
)
elif au is not None:
raise LoginException(f"Unknown auth value: {au}")
logging.info("GSA authentication successful")
idms_pet = spd.get("t", {}).get("com.apple.gs.idms.pet", {}).get("token", "")
return self._set_login_state(
LoginState.AUTHENTICATED, {"idms_pet": idms_pet, "adsid": spd["adsid"]}
)
@_require_login_state(LoginState.AUTHENTICATED)
async def _login_mobileme(self):
logging.info("Logging into com.apple.mobileme")
data = plistlib.dumps(
{
"apple-id": self._username,
"delegates": {"com.apple.mobileme": {}},
"password": self._login_state_data["idms_pet"],
"client-id": self._uid,
}
)
headers = {
"X-Apple-ADSID": self._login_state_data["adsid"],
"User-Agent": "com.apple.iCloudHelper/282 CFNetwork/1408.0.4 Darwin/22.5.0",
"X-Mme-Client-Info": "<MacBookPro18,3> <Mac OS X;13.4.1;22F8>"
" <com.apple.AOSKit/282 (com.apple.accountsd/113)>",
}
headers.update(await self.get_anisette_headers())
async with await self._http.post(
"https://setup.icloud.com/setup/iosbuddy/loginDelegates",
auth=(self._username, self._login_state_data["idms_pet"]),
data=data,
headers=headers,
) as r:
content = await r.content.read()
resp = _load_plist(content)
mobileme_data = resp.get("delegates", {}).get("com.apple.mobileme", {})
status = mobileme_data.get("status")
if status != 0:
message = mobileme_data.get("status-message")
raise LoginException(
f"com.apple.mobileme login failed with status {status}: {message}"
)
return self._set_login_state(
LoginState.LOGGED_IN,
{"dsid": resp["dsid"], "mobileme_data": mobileme_data["service-data"]},
)
async def _sms_2fa_request(
self, method: str, url: str, data: Optional[dict] = None
) -> str:
adsid = self._login_state_data["adsid"]
idms_token = self._login_state_data["idms_token"]
identity_token = base64.b64encode((adsid + ":" + idms_token).encode()).decode()
headers = {
"User-Agent": "Xcode",
"Accept-Language": "en-us",
"X-Apple-Identity-Token": identity_token,
"X-Apple-App-Info": "com.apple.gs.xcode.auth",
"X-Xcode-Version": "11.2 (11B41)",
"X-Mme-Client-Info": "<MacBookPro18,3> <Mac OS X;13.4.1;22F8>"
" <com.apple.AOSKit/282 (com.apple.dt.Xcode/3594.4.19)>",
}
headers.update(await self.get_anisette_headers())
async with await self._http.request(
method, url, json=data, headers=headers
) as r:
if not r.ok:
raise LoginException(f"HTTP request failed: {r.status_code}")
return await r.text()
async def _gsa_request(self, params):
request_data = {
"cpd": {
"bootstrap": True,
"icscrec": True,
"pbe": False,
"prkgen": True,
"svct": "iCloud",
}
}
request_data["cpd"].update(await self.get_anisette_headers())
request_data.update(params)
body = {
"Header": {"Version": "1.0.1"},
"Request": request_data,
}
headers = {
"Content-Type": "text/x-xml-plist",
"Accept": "*/*",
"User-Agent": "akd/1.0 CFNetwork/978.0.7 Darwin/18.7.0",
"X-MMe-Client-Info": "<MacBookPro18,3> <Mac OS X;13.4.1;22F8> "
"<com.apple.AOSKit/282 (com.apple.dt.Xcode/3594.4.19)>",
}
async with await self._http.post(
"https://gsa.apple.com/grandslam/GsService2",
headers=headers,
data=plistlib.dumps(body),
) as r:
content = await r.content.read()
return _load_plist(content)["Response"]
async def get_anisette_headers(self, serial: str = "0") -> dict[str, str]:
return await self._anisette.get_headers(self._uid, self._devid, serial)