Skip to content

Instantly share code, notes, and snippets.

@dgrnbrg
Last active November 30, 2023 13:09
Show Gist options
  • Save dgrnbrg/9f51c58c96ab3ce8e3d689a77def4ab7 to your computer and use it in GitHub Desktop.
Save dgrnbrg/9f51c58c96ab3ce8e3d689a77def4ab7 to your computer and use it in GitHub Desktop.
Randomized key resolution for apple device tracking
tracker_test:
class: IrkTracker
module: irk_tracker
training_input_text: input_text.irk_tag
data_loc: tracker_logs/
rows_per_flush: 100
esp32_ble_tracker:
on_ble_advertise:
- then:
- homeassistant.event:
event: esphome.ble_tracking_beacon
data:
source: ${device_name}
rssi: !lambda |-
return x.get_rssi();
addr: !lambda |-
return x.address_str();

How to use

Step 1: Get Identity Keys

To retrieve the IRK for an apple watch, check out these directions: https://espresense.com/beacons/apple#watch-os

To retrieve the IRK for an iPhone, I can't seem to find the website with the details. Once you've paired your iPhone with a Windows machine, there's a registry key you can access (with some difficulty) that contains the IRK.

Step 2: Record training data

Make 2 automations: IRK Start Recording and IRK Stop Recording. They should each fire the corresponding event irk_tracker.start_recording and irk_tracker.stop_recording. Also, make a text input helper.

Next, make a homeassistant dashboard with 2 buttons: one should "call service: automation trigger" with the entity "IRK Tracker Start Recording" and the other button should do "Stop Recording". Also, include the text input helper and set it as the training_input_text.

Now, you can use this dashboard by typing in the name of the room you're calibrating, and then walk around the room with your devices to generate training data.

Step 3: Tune your model

At this point, I hope you are comfortable with jupyter and can adjust the data/model to predict your location.

import hassapi as hass
import pandas as pd
from Crypto.Cipher import AES
import dateutil.parser as du
from datetime import timedelta, datetime, time
import os
from glob import glob
from sklearn.neighbors import KNeighborsClassifier
from sklearn import svm
from collections import defaultdict
import numpy as np
identities = {
'some phone': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
}
ciphers = {k: AES.new(v, AES.MODE_ECB) for k,v in identities.items()}
tracker_log_loc = '/config/appdaemon/tracker_logs/'
tracker_log_rows_per_flush = 100
class IrkTracker(hass.Hass):
def initialize(self):
if 'data_loc' in self.args:
self.data_loc = f"/config/appdaemon/{self.args['data_loc']}"
else:
self.data_loc = tracker_log_loc
if 'rows_per_flush' in self.args:
self.rows_per_flush = int(self.args['rows_per_flush'])
else:
self.rows_per_flush = tracker_log_rows_per_flush
all_examples = glob(f"{self.data_loc}examples*.csv")
if len(all_examples) > 0:
self.log(f"found {len(all_examples)} preexisting examples, fitting model...")
self.fit_model(None, {}, {})
self.log("model fit")
else:
self.knn = None
self.listen_event(self.ble_tracker_cb, "esphome.ble_tracking_beacon")
self.recording_df = None
self.listen_event(self.start_recording, "irk_tracker.start_recording")
self.listen_event(self.stop_recording, "irk_tracker.stop_recording")
self.listen_event(self.fit_model, "irk_tracker.fit_model")
self.recent_observations = defaultdict(lambda: [])
self.run_minutely(self.inference, time(0,0,0))
def fit_model(self, event_name, data, kwargs):
dfs = []
for x in glob(f'{self.data_loc}examples*.csv'):
df = pd.read_csv(x, parse_dates=[0])
dfs.append(df)
df = pd.concat(dfs)
df = df.sort_values(by=['time'])
base_station_names = list(df['source'].unique())
rolling_rssi = df.set_index('time').groupby(['device', 'source']).rolling("3min", min_periods=10)['rssi'].mean().reset_index()
rolling_labeled = pd.merge(df.drop('rssi', axis=1), rolling_rssi, on=['device', 'source', 'time']).dropna()
with_station = rolling_labeled
for station in base_station_names:
specific_station = rolling_labeled.query(f"source == '{station}'").copy()
specific_station = specific_station.drop('source',axis=1).rename(columns={'rssi': station})
with_station = pd.merge_asof(with_station, specific_station,
left_on='time',
right_on='time',
direction='backward',
allow_exact_matches=False,
tolerance=pd.Timedelta(seconds=5),
by=['device', 'tag'])
for station in base_station_names:
with_station[station] = np.where(with_station['source'] == station, with_station['rssi'], with_station[station])
with_station = with_station.drop(['source', 'rssi'], axis=1).dropna()
self.knn_columns = base_station_names
self.knn = KNeighborsClassifier(n_neighbors=7)
self.knn.fit(with_station[base_station_names].to_numpy(), with_station['tag'].to_numpy())
def flush_recording(self):
df = pd.DataFrame(self.recording_df)
df['tag'] = self.recording_tag
save_path = self.data_loc + f"examples-{self.recording_tag}-{self.recording_index}.csv"
self.log(f'recording data to {save_path}')
df.to_csv(save_path, index=False, header=True)
self.recording_index += 1
self.recording_df = {'time':[], 'device':[], 'source':[], 'rssi':[]}
def start_recording(self, event_name, data, kwargs):
input_irk_tag_ent = self.get_entity(self.args['training_input_text'])
tag = input_irk_tag_ent.get_state('state')
os.makedirs(self.data_loc, exist_ok = True)
# delete all files in dir startig with self.data_loc+"examples-"+tag for clean train
for f in glob(f"{self.data_loc}examples-{tag}*.csv"):
os.unlink(f)
self.log(f"started recording with tag {tag}")
self.recording_df = {'time':[], 'device':[], 'source':[], 'rssi':[]}
self.recording_index = 0
self.recording_tag = tag
def stop_recording(self, event_name, data, kwargs):
if self.recording_df is not None:
self.flush_recording()
self.log(f"stopped recording for {self.recording_tag}")
self.recording_df = None
def ble_tracker_cb(self, event_name, data, kwargs):
#self.log(f'event: {event_name} : {data}')
addr = bytes.fromhex(data['addr'].replace(":",""))
time = du.parse(data['metadata']['time_fired'])
source = data['source']
rssi = data['rssi']
pt = bytearray(b'\0' * 16)
pt[15] = addr[2]
pt[14] = addr[1]
pt[13] = addr[0]
for name, cipher in ciphers.items():
msg = cipher.encrypt(bytes(pt))
if msg[15] == addr[5] and msg[14] == addr[4] and msg[13] == addr[3]:
if self.recording_df is not None:
self.recording_df['time'].append(time)
self.recording_df['device'].append(name)
self.recording_df['source'].append(source)
self.recording_df['rssi'].append(rssi)
if len(self.recording_df['time']) > self.rows_per_flush:
self.flush_recording()
# handle publishing update for appropriate entity
obs = self.recent_observations[(source,name)]
obs.append((time, rssi))
self.prune_old_obs(obs)
#self.log(f"found a match for {name} with rssi {data['rssi']} from {source} at {time} (mac: {data['addr']})")
def prune_old_obs(self, obs):
while obs and (obs[0][0] + timedelta(minutes=3)).timestamp() < datetime.now().timestamp():
obs.pop(0)
def inference(self, kwargs):
for (source, name) in self.recent_observations:
if self.knn:
means = defaultdict(lambda: 0)
num_detected = 0
for (source, otherdevice), obs in self.recent_observations.items():
if otherdevice == name:
self.prune_old_obs(obs)
if len(obs) >= 10:
for (_, rssi) in obs:
means[source] += float(rssi)
means[source] /= len(obs)
num_detected += 1
else:
self.log(f"not predicting for {name} b/c {source} only has {len(obs)} obs for {name}")
means[source] = -100.0
device_entity = self.get_entity(f"sensor.ble_tracker_{name.replace(' ', '_')}")
if num_detected != 0:
if num_detected != len(means):
self.log(f"only detected {num_detected} stations ({[x for x,v in means.items() if v != -100.0]})")
input_arg = [means[source] for source in self.knn_columns]
room = self.knn.predict([input_arg])[0]
m = {k: v for k,v in zip(self.knn_columns, input_arg)}
self.log(f"Localized {name} to knn:{room} {m}")
device_entity.set_state(state=room)
else:
device_entity.set_state(state='insufficient')
vis = {k:v for k,v in means.items()}
self.log(f"Couldn't localize {name}; only obs from {vis}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment