Skip to content

Instantly share code, notes, and snippets.

@ligfx
Created October 3, 2019 05:11
Show Gist options
  • Save ligfx/4fc0731ee4989135b611db6765624128 to your computer and use it in GitHub Desktop.
Save ligfx/4fc0731ee4989135b611db6765624128 to your computer and use it in GitHub Desktop.
import struct
FIELD_BASE_TYPES_TO_NAMES = {
0x0: 'enum',
0x1: 'sint8',
0x2: 'uint8',
0x7: 'string',
0x84: 'uint16',
0x85: 'sint32',
0x86: 'uint32',
0x8c: 'uint32z',
}
FIELD_NAME_TO_SIZE = {
'enum': 1,
'sint8': 1,
'uint8': 1,
'string': 1,
'uint16': 2,
'uint32': 4,
'uint32z': 4,
}
GLOBAL_MESSAGE_NUMBER_TO_NAME = {
0: "file_id",
1: "capabilities",
2: "device_settings",
3: "user_profile",
4: "hrm_profile",
5: "sdm_profile",
6: "bike_profile",
7: "zones_target",
8: "hr_zone",
9: "power_zone",
10: "met_zone",
12: "sport",
15: "goal",
18: "session",
19: "lap",
20: "record",
21: "event",
23: "device_info",
26: "workout",
27: "workout_step",
28: "schedule",
30: "weight_scale",
31: "course",
32: "course_point",
33: "totals",
34: "activity",
35: "software",
37: "file_capabilities",
38: "mesg_capabilities",
39: "field_capabilities",
49: "file_creator",
51: "blood_pressure",
53: "speed_zone",
55: "monitoring",
72: "training_file",
78: "hrv",
80: "ant_rx",
81: "ant_tx",
82: "ant_channel_id",
101: "length",
103: "monitoring_info",
105: "pad",
106: "slave_device",
127: "connectivity",
128: "weather_conditions",
129: "weather_alert",
131: "cadence_zone",
132: "hr",
142: "segment_lap",
145: "memo_glob",
148: "segment_id",
149: "segment_leaderboard_entry",
150: "segment_point",
151: "segment_file",
158: "workout_session",
159: "watchface_settings",
160: "gps_metadata",
161: "camera_event",
162: "timestamp_correlation",
164: "gyroscope_data",
165: "accelerometer_data",
167: "three_d_sensor_calibration",
169: "video_frame",
174: "obdii_data",
177: "nmea_sentence",
178: "aviation_attitude",
184: "video",
185: "video_title",
186: "video_description",
187: "video_clip",
188: "ohr_settings",
200: "exd_screen_configuration",
201: "exd_data_field_configuration",
202: "exd_data_concept_configuration",
206: "field_description",
207: "developer_data_id",
208: "magnetometer_data",
209: "barometer_data",
210: "one_d_sensor_calibration",
225: "set",
227: "stress_level",
258: "dive_settings",
259: "dive_gas",
262: "dive_alarm",
264: "exercise_title",
268: "dive_summary",
285: "jump",
}
def global_message_number_to_name(global_message_number):
return GLOBAL_MESSAGE_NUMBER_TO_NAME.get(global_message_number, str(global_message_number))
class Reader:
def __init__(self, data):
self._data = data
self._p = 0
self._definitions = {}
def pos(self):
return self._p
def remaining(self):
return len(self._data) - self._p
def peek(self, n):
if n == 1:
val = self._data[self._p]
else:
val = self._data[self._p:self._p + n]
return val
def read(self, n):
val = self.peek(n)
self._p += n
return val
def add_definition(self, local_message_type, global_message_number, fields):
self._definitions[local_message_type] = (global_message_number, fields)
def get_definition_fields(self, local_message_type):
return self._definitions[local_message_type][1]
def get_global_message_number(self, local_message_type):
return self._definitions[local_message_type][0]
def parse_field(reader, field_type, field_size):
buf = reader.read(field_size)
if field_type in ('enum', 'uint8', 'sint8'):
return buf
if field_type in ('uint16',):
return struct.unpack('<H', buf)[0]
if field_type in ('sint16',):
return struct.unpack('<h', buf)[0]
if field_type in ('uint32', 'uint32z'):
return struct.unpack('<I', buf)[0]
if field_type in ('sint32',):
return struct.unpack('<i', buf)[0]
if field_type == 'string':
return buf
raise Exception(field_type)
def get_field_name(global_message_number, field_id):
return {
'event': {
0: "event",
1: "event_type",
2: "data16",
3: "data",
4: "event_group",
7: "score",
8: "opponent_score",
9: "front_gear_num",
10: "front_gear",
11: "rear_gear_num",
12: "rear_gear",
13: "device_index",
},
'file_id': {
0: "type",
1: "manufacturer",
2: "product",
3: "serial_number",
4: "time_created",
5: "number",
8: "product_name",
},
'lap': {
254: "message_index",
253: "timestamp",
0: "event",
1: "event_type",
2: "start_time",
3: "start_position_lat",
4: "start_position_long",
5: "end_position_lat",
6: "end_position_long",
7: "total_elapsed_time",
8: "total_timer_time",
9: "total_distance",
10: "total_cycles,total_strides",
11: "total_calories",
12: "total_fat_calories",
13: "avg_speed",
14: "max_speed",
15: "avg_heart_rate",
16: "max_heart_rate",
17: "avg_cadence,avg_running_cadence",
18: "max_cadence,max_running_cadence",
19: "avg_power",
20: "max_power",
21: "total_ascent",
22: "total_descent",
23: "intensity",
24: "lap_trigger",
25: "sport",
26: "event_group",
32: "num_lengths",
33: "normalized_power",
34: "left_right_balance",
35: "first_length_index",
37: "avg_stroke_distance",
38: "swim_stroke",
39: "sub_sport",
40: "num_active_lengths",
41: "total_work",
42: "avg_altitude",
43: "max_altitude",
44: "gps_accuracy",
45: "avg_grade",
46: "avg_pos_grade",
47: "avg_neg_grade",
48: "max_pos_grade",
49: "max_neg_grade",
50: "avg_temperature",
51: "max_temperature",
52: "total_moving_time",
53: "avg_pos_vertical_speed",
54: "avg_neg_vertical_speed",
55: "max_pos_vertical_speed",
56: "max_neg_vertical_speed",
57: "time_in_hr_zone",
58: "time_in_speed_zone",
59: "time_in_cadence_zone",
60: "time_in_power_zone",
61: "repetition_num",
62: "min_altitude",
63: "min_heart_rate",
71: "wkt_step_index",
74: "opponent_score",
75: "stroke_count",
76: "zone_count",
77: "avg_vertical_oscillation",
78: "avg_stance_time_percent",
79: "avg_stance_time",
80: "avg_fractional_cadence",
81: "max_fractional_cadence",
82: "total_fractional_cycles",
83: "player_score",
84: "avg_total_hemoglobin_conc",
85: "min_total_hemoglobin_conc",
86: "max_total_hemoglobin_conc",
87: "avg_saturated_hemoglobin_percent",
88: "min_saturated_hemoglobin_percent",
89: "max_saturated_hemoglobin_percent",
91: "avg_left_torque_effectiveness",
92: "avg_right_torque_effectiveness",
93: "avg_left_pedal_smoothness",
94: "avg_right_pedal_smoothness",
95: "avg_combined_pedal_smoothness",
98: "time_standing",
99: "stand_count",
100: "avg_left_pco",
101: "avg_right_pco",
102: "avg_left_power_phase",
103: "avg_left_power_phase_peak",
104: "avg_right_power_phase",
105: "avg_right_power_phase_peak",
106: "avg_power_position",
107: "max_power_position",
108: "avg_cadence_position",
109: "max_cadence_position",
110: "enhanced_avg_speed",
111: "enhanced_max_speed",
112: "enhanced_avg_altitude",
113: "enhanced_min_altitude",
114: "enhanced_max_altitude",
115: "avg_lev_motor_power",
116: "max_lev_motor_power",
117: "lev_battery_consumption",
118: "avg_vertical_ratio",
119: "avg_stance_time_balance",
120: "avg_step_length",
121: "avg_vam",
149: "total_grit",
150: "total_flow",
151: "jump_count",
153: "avg_grit",
154: "avg_flow",
},
'record': {
253: "timestamp",
0: "position_lat",
1: "position_long",
2: "altitude",
3: "heart_rate",
4: "cadence",
5: "distance",
6: "speed",
7: "power",
8: "compressed_speed_distance",
9: "grade",
10: "resistance",
11: "time_from_course",
12: "cycle_length",
13: "temperature",
17: "speed_1s",
18: "cycles",
19: "total_cycles",
28: "compressed_accumulated_power",
29: "accumulated_power",
30: "left_right_balance",
31: "gps_accuracy",
32: "vertical_speed",
33: "calories",
39: "vertical_oscillation",
40: "stance_time_percent",
41: "stance_time",
42: "activity_type",
43: "left_torque_effectiveness",
44: "right_torque_effectiveness",
45: "left_pedal_smoothness",
46: "right_pedal_smoothness",
47: "combined_pedal_smoothness",
48: "time128",
49: "stroke_type",
50: "zone",
51: "ball_speed",
52: "cadence256",
53: "fractional_cadence",
54: "total_hemoglobin_conc",
55: "total_hemoglobin_conc_min",
56: "total_hemoglobin_conc_max",
57: "saturated_hemoglobin_percent",
58: "saturated_hemoglobin_percent_min",
59: "saturated_hemoglobin_percent_max",
62: "device_index",
67: "left_pco",
68: "right_pco",
69: "left_power_phase",
70: "left_power_phase_peak",
71: "right_power_phase",
72: "right_power_phase_peak",
73: "enhanced_speed",
78: "enhanced_altitude",
81: "battery_soc",
82: "motor_power",
83: "vertical_ratio",
84: "stance_time_balance",
85: "step_length",
91: "absolute_pressure",
92: "depth",
93: "next_stop_depth",
94: "next_stop_time",
95: "time_to_surface",
96: "ndl_time",
97: "cns_load",
98: "n2_load",
114: "grit",
115: "flow",
}
}.get(global_message_number_to_name(global_message_number), {}).get(field_id, field_id)
def parse_record(reader):
header = reader.read(1)
normal = (header >> 7) & 0b1
if normal == 1:
# compressed timestamp
print("compressed timestamp header")
local_message_type = (header >> 5) & 0b11
# local_message_type = local_message_type ^ 0b11
print(bin(header))
# local_message_type = 7
print(local_message_type)
global_message_number = reader.get_global_message_number(local_message_type)
print(global_message_number_to_name(global_message_number))
time_offset = header & 0b11111
print("time_offset", time_offset)
fields = reader.get_definition_fields(local_message_type)
# if fields[0][0] != 253: # timestamp
# raise Exception(fields)
for (field_id, field_size, field_type) in fields[1:]:
if field_id == 253: # timestamp
continue
print(" %s = %s, # %s" % (get_field_name(global_message_number, field_id), parse_field(reader, field_type, field_size), field_type))
return
message_type = (header >> 6) & 1
message_type_specific = (header >> 5) & 1
reserved = (header >> 4) & 1
local_message_type = header & 0b1111
if reserved != 0:
raise Exception()
if message_type_specific != 0:
if global_message_number_to_name(reader.get_global_message_number(local_message_type)) == "file_creator":
print("WARNING: message_type_specific field in message header is 0 in a data message of type file_creator, not sure what that means")
else:
print("message_type %i local_message_type %i (%s) message_type_specific %i" % (message_type, local_message_type, global_message_number_to_name(reader.get_global_message_number(local_message_type)), message_type_specific))
raise Exception()
if message_type == 1:
# definition
reserved = reader.read(1)
if reserved != 0:
raise Exception()
architecture = reader.read(1)
if architecture != 0:
# if it's 1, big-endian instead
raise Exception()
global_message_number = struct.unpack('<H', reader.read(2))[0]
num_fields = reader.read(1)
fields = []
for i in range(num_fields):
definition_number = reader.read(1)
field_size = reader.read(1)
# if field_size is multiple of size of base type, then is an array
field_base_type = FIELD_BASE_TYPES_TO_NAMES[reader.read(1)]
fields.append((definition_number, field_size, field_base_type))
# fields = sorted(fields, key=lambda x: x[0])
# fields = [x for x in fields]
# developer data flag??
print("definition", local_message_type, "=", global_message_number_to_name(global_message_number), "{", fields, "}")
reader.add_definition(local_message_type, global_message_number, fields)
else:
# data
# print("data")
# assert it starts with file_id definition and message
# assert that this is an activity file
# print("header", header)
global_message_number = reader.get_global_message_number(local_message_type)
# print("message type:", local_message_type, "(%s)" % global_message_number_to_name(global_message_number))
fields = reader.get_definition_fields(local_message_type)
print(global_message_number_to_name(global_message_number) + "(")
# print(" local_message_type='%s'," % global_message_number_to_name(global_message_number))
for (field_id, field_size, field_type) in fields:
print(" %s = %s, # %s" % (get_field_name(global_message_number, field_id), parse_field(reader, field_type, field_size), field_type))
# print("%s %s %s" % (get_field_name(global_message_number, field_id), field_type, parse_field(reader, field_type, field_size)))
# if field_type == 'string':
# print(b"string:" + reader.read(field_size))
# else:
# for _ in range(field_size):
# s = b''
# while True:
# c = reader.read(1)
# if c == 0x0:
# print("string: '%s'" % s)
# break
# s += bytes([c])
#
# else:
# reader.read(field_size)
print(")")
# reader.read(reader.get_data_size(local_message_type))
pass
def main(filename):
with open(filename, 'rb') as f:
data = f.read()
# series of records
# each with header and content
# record content is either: 1) definition message 2) fields with data
# global profile and device subset profiles
# file header, main records, 2-byte crc
# record header is 1 byte
# file header
# minimum size 12-bytes, 14-byte preferred
# header crc is optional with 14-byte header, set it to 0x0000
header_size = data[0]
protocol_version = data[1]
profile_version = struct.unpack('<H', data[2:4])[0]
data_size = struct.unpack('<I', data[4:8])[0] # in bytes
magic = data[8:12]
header_crc = data[12:14]
print(
f"""fit_header(
header_size = {header_size},
protocol_version = {protocol_version},
profile_version = {profile_version},
data_size = {data_size},
magic = {magic},
header_crc = {header_crc},
)
"""
)
if data_size != len(data):
print(f"WARNING: data size recorded in header ({data_size}) does not match actual data size ({len(data)})")
reader = Reader(data)
reader.read(14)
i = 0
while reader.remaining() > 2:
i += 1
if i == 2367:
# skip to 55796
print("WARNING: Skipping %i bytes at %i" % (55796 - reader.pos(), reader.pos()))
reader.read(55796 - reader.pos())
# # find a byte 7, followed by four bytes that unpack to something around 938916000
# while True:
# block = reader.peek(5)
# timestamp = struct.unpack('<I', block[1:])[0]
# if timestamp >= 938916000 and timestamp < 938916300:
# print(reader.pos(), timestamp)
# exit()
# else:
# reader.read(1)
#
# pass
print()
print("# message", i, "@", reader.pos())
parse_record(reader)
main('9A1I0426.FIT')
# with open('9A1I0426.FIT', 'rb') as f:
# data = f.read()
#
# with open('skipped.fit', 'wb') as f:
# f.write(data[:55762])
# f.write(data[55796:])
#
# ch2 fix-fit skipped.FIT --fix-header --fix-checksum -o fixed.fit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment