-
-
Save benallard/4495131 to your computer and use it in GitHub Desktop.
Script to help parsing and analysing of data stored on disk during synchronisation of the Fitbit tracker by benallard/fitbit
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, yaml | |
TRACKERID = '22WB2F' | |
def collect(dir): | |
dir = os.path.join(os.path.expanduser('~/.fitbit'), dir) | |
data = {} | |
for fname in os.listdir(dir): | |
if 'connection-' in fname: | |
f = open(os.path.join(dir, fname), 'r') | |
d = f.read() | |
f.close() | |
data[int(fname[11:21])] = yaml.load(d) | |
return data | |
import datetime | |
def p_0(data): | |
i = 0 | |
tstamp = 0 | |
while i < len(data): | |
if (data[i] & 0x80) == 0x80: | |
d = data[i:i+3] | |
d0 = d[0] - 0x80 # ??? | |
d1 = (d[1] - 10) / 10. # score | |
d2 = d[2] # steps | |
print "Time: %s ???: %d score: %d step: %d" % (datetime.datetime.fromtimestamp(tstamp), d0, d1, d2) | |
i += 3 | |
tstamp += 60 | |
continue | |
d = data[i:i+4] | |
tstamp = d[3] | d[2] << 8 | d[1] << 16 | d[0] << 24 | |
if i != 0: print '---xx--xx--xx--xx' | |
i += 4 | |
seen = set() | |
def p_1(data): | |
assert len(data) % 16 == 0 | |
for i in xrange(0, len(data), 16): | |
d = data[i:i+16] | |
date = d[0] | d[1] << 8 | d[2] << 16 | d[3] << 24 | |
if date in seen: continue | |
seen.add(date) | |
date = datetime.datetime.fromtimestamp(date) | |
if date.minute == 0 and date.hour == 0 and date.second == 0: | |
print a2s(d[4:]) | |
daily_steps = d[7] << 8 | d[6] | |
daily_dist = (d[11] << 8 | d[10] | d[13] << 24 | d[12] << 16) / 1000000. | |
daily_floors = (d[15] << 8 | d[14]) / 10 | |
daily_cals = (d[5] << 8 | d[4]) *.1103 - 7 | |
v = [] | |
v.append(daily_cals) | |
v.append(daily_steps) | |
v.append(d[9] << 8 | d[8]) | |
v.append(daily_dist) | |
v.append(daily_floors) | |
print "Time: %s Values: %s" % (date, str(v)) | |
start = 0 | |
def p_2(data): | |
assert len(data) % 15 == 0 | |
for i in xrange(0, len(data), 15): | |
d = data[i:i+15] | |
tstamp1 = d[0] | d[1] << 8 | d[2] << 16 | d[3] << 24 | |
if tstamp1 in seen: continue | |
seen.add(tstamp1) | |
date1 = datetime.datetime.fromtimestamp(tstamp1) | |
if d[6] == 0x1: #not in (0, 2, 8, 11): | |
elapsed = (d[5] << 8) | d[4] | |
steps = (d[9]<< 16) | (d[8] << 8) | d[7] | |
dist = (d[12] << 16) | (d[11]<< 8) | d[10] | |
floors = ((d[14] << 8) | d[13]) / 10 | |
print date1, "duration: %s, steps: %d, dist: %fkm, floors: %d" % (datetime.timedelta(seconds=elapsed), steps, dist/100000., floors),'\t', a2s(d[4:]) | |
elif d[4] == 0xd: | |
""" start """ | |
assert d[5:] == [0, 5, 0, 0, 0, 0, 0, 0, 0, 0], d[5:] | |
print date1, "<-- Start." | |
global start | |
start = tstamp1 | |
elif d[4] == 0x8: | |
""" second tstamp """ | |
tstamp2 = d[5] | d[6] << 8 | d[7] << 16 | d[8] << 24 | |
date2 = datetime.datetime.fromtimestamp(tstamp2) | |
print date1, date2, a2s(d[9:]) | |
else: | |
print date1, a2s(d[4:]) | |
def p_4(data): | |
""" hehe ...""" | |
return p_0w(data) | |
def p_5(data): | |
assert len(data) == 14 | |
print a2s(data) | |
print [data[i] | data[i+1] << 8 for i in range(0, 14, 2)] | |
def p_6(data): | |
i = 0 | |
tstamp = 0 | |
while i < len(data): | |
if data[i] == 0x80: | |
floors = data[i+1] / 10 | |
print "Time: %s: %d Floors" % (datetime.datetime.fromtimestamp(tstamp), floors) | |
i += 2 | |
tstamp += 60 | |
continue | |
d = data[i:i+4] | |
tstamp = d[3] | d[2] << 8 | d[1] << 16 | d[0] << 24 | |
i += 4 | |
def p_erase(data): | |
assert len(data) == 7 | |
index = data[1] | |
tstamp = data[2] | data[3] << 8 | data[4] << 16 | data[5] << 24 | |
print "Erase %d: %s" % (index, datetime.datetime.fromtimestamp(tstamp)) | |
def p_0w(payload): | |
assert len(payload) == 64 | |
print a2s(payload) | |
return | |
print payload[13] | |
print "Greeting: ", ''.join([chr(x) for x in payload[24:24+8]]) | |
print "Chatter: " | |
print " - ", ''.join([chr(x) for x in payload[34:34+8]]) | |
print " - ", ''.join([chr(x) for x in payload[44:44+8]]) | |
print " - ", ''.join([chr(x) for x in payload[54:54+8]]) | |
def p_1w(payload): | |
assert len(payload) == 16 | |
print a2s(payload) | |
data = collect(TRACKER_ID) | |
def a2s(array): | |
return ' '.join(['%02X' % x for x in array]) | |
for key in sorted(data.keys()): | |
#print len(data[key]) | |
#print datetime.datetime.fromtimestamp(key) | |
#print '---' | |
for conn in data[key]: | |
for req in conn: | |
code = req['request']['opcode'] | |
if code[:2] == [0x22, 0]: | |
p_0(req['response']) | |
continue | |
print '<- ', a2s(req['response']) | |
continue | |
# elif code[:2] == [0x23, 1]: | |
# print '-> ', a2s(req['request']['payload']) | |
continue | |
if code[1] != 4: continue | |
continue | |
print a2s(code), len(req['request']['payload']) | |
print '---' | |
print a2s(req['response']) | |
#p_6(req['response']) | |
#p_erase(req['request']['opcode']) | |
continue | |
s = str(code) | |
if req['request']['payload']: s += '\t -> %d ' % len(req['request']['payload']) | |
if req['response']: s += '\t <- %d' % len(req['response']) | |
print s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment