Skip to content

Instantly share code, notes, and snippets.

@baderj
Last active July 20, 2023 03:19
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save baderj/c41d2bbe0aeded3506cf to your computer and use it in GitHub Desktop.
Save baderj/c41d2bbe0aeded3506cf to your computer and use it in GitHub Desktop.
Script to illustrate heart rate capturing with Ant+, see blog post at http://www.johannesbader.ch/2014/06/track-your-heartrate-on-raspberry-pi-with-ant
"""
Code based on:
https://github.com/mvillalba/python-ant/blob/develop/demos/ant.core/03-basicchannel.py
in the python-ant repository and
https://github.com/tomwardill/developerhealth
by Tom Wardill
"""
import sys
import time
from ant.core import driver, node, event, message, log
from ant.core.constants import CHANNEL_TYPE_TWOWAY_RECEIVE, TIMEOUT_NEVER
class HRM(event.EventCallback):
def __init__(self, serial, netkey):
self.serial = serial
self.netkey = netkey
self.antnode = None
self.channel = None
def start(self):
print("starting node")
self._start_antnode()
self._setup_channel()
self.channel.registerCallback(self)
print("start listening for hr events")
def stop(self):
if self.channel:
self.channel.close()
self.channel.unassign()
if self.antnode:
self.antnode.stop()
def __enter__(self):
return self
def __exit__(self, type_, value, traceback):
self.stop()
def _start_antnode(self):
stick = driver.USB2Driver(self.serial)
self.antnode = node.Node(stick)
self.antnode.start()
def _setup_channel(self):
key = node.NetworkKey('N:ANT+', self.netkey)
self.antnode.setNetworkKey(0, key)
self.channel = self.antnode.getFreeChannel()
self.channel.name = 'C:HRM'
self.channel.assign('N:ANT+', CHANNEL_TYPE_TWOWAY_RECEIVE)
self.channel.setID(120, 0, 0)
self.channel.setSearchTimeout(TIMEOUT_NEVER)
self.channel.setPeriod(8070)
self.channel.setFrequency(57)
self.channel.open()
def process(self, msg):
if isinstance(msg, message.ChannelBroadcastDataMessage):
print("heart rate is {}".format(ord(msg.payload[-1])))
SERIAL = '/dev/ttyUSB0'
NETKEY = 'B9A521FBBD72C345'.decode('hex')
with HRM(serial=SERIAL, netkey=NETKEY) as hrm:
hrm.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment