Skip to content

Instantly share code, notes, and snippets.

@julian-klode
Created May 25, 2021 09:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save julian-klode/b5f09837e28782cd81a00b4c3fbbdd97 to your computer and use it in GitHub Desktop.
Save julian-klode/b5f09837e28782cd81a00b4c3fbbdd97 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
#
# Copyright (C) 2020-2021 Julian Andres Klode <jak@jak-linux.org>
#
# This Source Code Form is subject to the terms of the Mozilla
# Public License, v. 2.0. If a copy of the MPL was not distributed
# with this file, You can obtain one at
# https://mozilla.org/MPL/2.0/.
#
"""HR plotting script.
Usage: plot-hr.py [--png]
Plot HR of oura ring vs elite hrv export.
"""
from typing import (
Iterator,
List,
Tuple,
NewType,
)
import datetime
import matplotlib.pyplot as plt # type: ignore
import matplotlib.animation as animation # type: ignore
import sys
import itertools
from hrvanalysis import ( # type: ignore
get_time_domain_features,
remove_outliers,
interpolate_nan_values,
remove_ectopic_beats,
)
import json
# Type declarations
Time = datetime.datetime
Interval = NewType("Interval", int)
HR = NewType("HR", float)
RMSSD = NewType("RMSSD", float)
NaN = float("NaN")
def windows(
times: List[Time], hrs: List[Interval], interval: Interval
) -> Iterator[Tuple[float, Iterator[int]]]:
"""Yields (_, indices) tuples for windows of length 5"""
return itertools.groupby(
range(len(hrs)), lambda k: times[k].timestamp() // interval
)
def expand_rmssd5(stamps: List[Time], rrs: List[Interval]) -> List[RMSSD]:
"""Calculate RMSSD over 5 minute intervals.
Takes a list of time stamps and the raw RR intervalls, and returns
a list of RMSSD values.
"""
out = []
for group, indices in windows(stamps, rrs, Interval(5 * 60)):
window_rrs = [rrs[i] for i in indices]
time_domain_features = get_time_domain_features(window_rrs)
for i in window_rrs:
out.append(time_domain_features["rmssd"])
return out
def read_elite() -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]:
"""Read EliteHRV RR export file"""
times: List[Time] = []
hrs: List[HR] = []
rrs: List[Interval] = []
stamp = datetime.datetime(
year=2021,
month=5,
day=24,
hour=23,
minute=4,
second=20,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
)
rr_intervals_list = [int(rr) for rr in open("2021-05-24 23-04-20.txt")]
# This remove outliers from signal
rr_intervals_without_outliers = remove_outliers(
rr_intervals=rr_intervals_list, low_rri=300, high_rri=2000
)
# This replace outliers nan values with linear interpolation
interpolated_rr_intervals = interpolate_nan_values(
rr_intervals=rr_intervals_without_outliers, interpolation_method="linear"
)
# This remove ectopic beats from signal
nn_intervals_list = remove_ectopic_beats(
rr_intervals=interpolated_rr_intervals, method="malik"
)
# This replace ectopic beats nan values with linear interpolation
interpolated_nn_intervals = interpolate_nan_values(rr_intervals=nn_intervals_list)
for rr in interpolated_rr_intervals:
stamp += datetime.timedelta(milliseconds=rr)
hr = 1.0 / rr
times.append(stamp)
hrs.append(hr)
rrs.append(rr)
time_domain_features = get_time_domain_features(rrs)
print("Elite:", time_domain_features["rmssd"])
rmssds = expand_rmssd5(times, rrs)
return times, hrs, rmssds, RMSSD(time_domain_features["rmssd"])
def read_oura() -> Tuple[List[Time], List[HR], List[RMSSD], RMSSD]:
"""Read oura ring export"""
with open("/home/jak/Downloads/oura_2021-05-25T07-45-08.json") as fobj:
oura = json.load(fobj)
times: List[Time] = []
hrs: List[HR] = []
rmssds: List[RMSSD] = []
sleep = oura["sleep"][-1]
stamp = datetime.datetime.fromisoformat(sleep["bedtime_start"])
for hr5, hrv5 in zip(sleep["hr_5min"], sleep["rmssd_5min"]):
for second in range(5 * 60):
times.append(stamp)
hrs.append(HR(hr5) if hr5 else HR(NaN))
rmssds.append(RMSSD(hrv5) if hrv5 else RMSSD(NaN))
stamp += datetime.timedelta(seconds=1)
return times, hrs, rmssds, sleep["rmssd"]
def do_plot(i: int) -> None:
"""Main entry point"""
times, hrs, rmssds, rmssd = read_elite()
times1, hrs1, rmssds1, rmssd1 = read_oura()
plt.gcf().clear()
# Plot the graphs
plt.plot(times1, rmssds1, "-", label="oura hrv")
plt.plot(times, rmssds, "-", label="tickr hrv", alpha=0.3)
# Plot the averages
plt.plot(times, [rmssd for _ in times], "--", label="tickr hrv average", alpha=0.5)
plt.plot(
times1, [rmssd1 for _ in times1], "--", label="oura hrv average", alpha=0.5
)
plt.grid()
plt.legend()
plt.gcf().autofmt_xdate()
def main(argv: List[str]) -> None:
fig = plt.figure()
if "--png" in argv:
do_plot(0)
plt.savefig("hr.png", bbox_inches="tight", dpi=150)
else:
ani = animation.FuncAnimation(fig, do_plot, interval=10000)
plt.show()
if __name__ == "__main__":
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment