Skip to content

Instantly share code, notes, and snippets.

@perillo
Last active October 30, 2023 09:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save perillo/816876d348be36b7727c9cc8c96a53f7 to your computer and use it in GitHub Desktop.
Save perillo/816876d348be36b7727c9cc8c96a53f7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from math import nan
import sys
from garmin_fit_sdk import Decoder, Stream
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
def load(path):
"""load loads a FIT file.
Returns the FIT messages and errors.
"""
stream = Stream.from_file(path)
decoder = Decoder(stream)
ok = decoder.check_integrity()
messages, errors = decoder.read()
if not ok:
# Ensure that an integrity error is always reported.
errors.insert(0, f"{path}: integrity error")
return messages, errors
def average(x):
"""Returns the average of the array elements, ignoring NaNs."""
return np.nanmean(x)
def average_nonzero(x):
"""Returns the average of the array elements, ignoring zeroes."""
return np.nanmean(x[np.nonzero(x)])
def moving_average(x, window, rate=1):
"""Return the moving average (aka rolling average) of array x, using the
specified window, as seconds.
The array x is assumed to be sampled at a constant rate, in seconds. The
sampling rate rate is measured in hertz.
NaNs are ignored when computing the average value of each sliding window.
"""
windowsize = window * rate
if x.size < windowsize <= 0:
return 0
return np.nanmean(sliding_window_view(x, window_shape=window), axis=1)
def normalized_power(x, rate=1):
"""Returns the normalized power.
The sampling rate is measured in hertz.
"""
rolling = moving_average(x, 30, rate)
tmp = np.power(rolling, 4)
avg = np.average(tmp)
return pow(avg, 1 / 4)
def main():
messages, errors = load(sys.argv[1])
records = messages["record_mesgs"]
size = len(records)
cadence_array = np.empty(size)
heart_rate_array = np.empty(size)
power_array = np.empty(size)
speed_array = np.empty(size)
temperature_array = np.empty(size)
for i, record in enumerate(records):
cadence_array[i] = record.get("cadence", nan)
heart_rate_array[i] = record.get("heart_rate", nan)
power_array[i] = record.get("power", nan)
speed_array[i] = record.get("speed", nan)
temperature_array[i] = record.get("temperature", nan)
print("avg_cadence:", average_nonzero(cadence_array))
print("avg_heart_rate:", average(heart_rate_array))
print("avg_power:", average(power_array))
print("avg_speed:", average(speed_array))
print("agv_temperature:", average(temperature_array))
print("normalized power:", normalized_power(power_array))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment