Skip to content

Instantly share code, notes, and snippets.

@ramunasd
Last active August 20, 2023 22:23
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save ramunasd/a2e40cd13dd4cacf5685 to your computer and use it in GitHub Desktop.
Save ramunasd/a2e40cd13dd4cacf5685 to your computer and use it in GitHub Desktop.
ANT+ Garmin bike cadence and speed sensor reader
"""
Initialize a basic broadcast slave channel for listening to
an ANT+ HR monitor.
"""
import sys
import time
from ant.core import driver, node, event, message
from ant.core.constants import *
from config import *
def convertSB(raw):
value = int(raw[1]) << 8
value += int(raw[0])
return value
class CadenceListener(event.EventCallback):
lastEvent = None
def process(self, msg):
if isinstance(msg, message.ChannelBroadcastDataMessage):
page = msg.payload[1] & 0x7F
if page != 0:
return
eventTime = convertSB(msg.payload[5:7])
if eventTime == self.lastEvent:
return
self.lastEvent = eventTime
revolutions = convertSB(msg.payload[7:9])
print eventTime, ':', revolutions
NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'
# Initialize
stick = driver.USB1Driver(SERIAL, log=LOG, debug=DEBUG)
antnode = node.Node(stick)
antnode.start()
antnode.registerEventListener(CadenceListener())
# Set network key
network = node.Network(key=NETKEY, name='N:ANT+')
antnode.setNetworkKey(0, network)
# Get the first unused channel. Returns an instance of the node.Channel class.
channel = antnode.getFreeChannel()
# Initialize it as a receiving channel using our network key
channel.assign(network, CHANNEL_TYPE_TWOWAY_RECEIVE)
# Now set the channel id for pairing with an ANT+ HR monitor
channel.setID(122, 0, 0)
# Listen forever and ever (not really, but for a long time)
channel.searchTimeout = TIMEOUT_NEVER
# We want a ~4.06 Hz transmission period
channel.period = 8102
# And ANT frequency 57
channel.frequency = 57
try:
# Time to go live
channel.open()
print "Listening for events..."
while True:
time.sleep(60)
finally:
# Shutdown channel
channel.close()
channel.unassign()
# Shutdown
antnode.stop()
from ant.core import log
# USB1 ANT stick interface. Running `dmesg | tail -n 25` after plugging the
# stick on a USB port should tell you the exact interface.
SERIAL = '/dev/ttyUSB0'
# If set to True, the stick's driver will dump everything it reads/writes
# from/to the stick.
# Some demos depend on this setting being True, so unless you know what you
# are doing, leave it as is.
DEBUG = True
# Set to None to disable logging
LOG = None
#LOG = log.LogWriter()
# ========== DO NOT CHANGE ANYTHING BELOW THIS LINE ==========
#print "Using log file:", LOG.filename
#print ""
#!/usr/bin/python
import sys
import time
from ant.core import driver, node, event, message
from ant.core.constants import *
from config import *
NETKEY = '\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'
CIRCUMFERENCE = 2.0
def convertSB(raw):
value = int(raw[1]) << 8
value += int(raw[0])
return value
class SpeedListener(event.EventCallback):
lastTime = None
lastRevolutions = None
def calcSpeed(self, time, revolutions):
if self.lastTime is None:
return None
if time < self.lastTime:
time += 64 * 1024
if revolutions < self.lastRevolutions:
revolutions += 65535
return (revolutions - self.lastRevolutions) * 1024.0 * CIRCUMFERENCE / (time - self.lastTime)
def process(self, msg):
if isinstance(msg, message.ChannelBroadcastDataMessage):
page = msg.payload[1] & 0x7F
if page != 0 and page != 5:
return
if page == 5:
stopped = (msg.payload[2] & 1 == 1)
if stopped:
print self.lastRevolutions, 0
eventTime = convertSB(msg.payload[5:7])
if eventTime == self.lastTime:
return
revolutions = convertSB(msg.payload[7:9])
speed = self.calcSpeed(eventTime, revolutions)
print revolutions, speed
self.lastTime = eventTime
self.lastRevolutions = revolutions
# Initialize
stick = driver.USB1Driver(SERIAL, log=LOG, debug=DEBUG)
antnode = node.Node(stick)
antnode.start()
antnode.registerEventListener(SpeedListener())
# Set network key
network = node.Network(key=NETKEY, name='N:ANT+')
antnode.setNetworkKey(0, network)
# Get the first unused channel. Returns an instance of the node.Channel class.
channel = antnode.getFreeChannel()
# Initialize it as a receiving channel using our network key
channel.assign(network, CHANNEL_TYPE_TWOWAY_RECEIVE)
# Now set the channel id for pairing with an ANT+ HR monitor
channel.setID(123, 0, 0)
# Listen forever and ever (not really, but for a long time)
channel.searchTimeout = TIMEOUT_NEVER
# We want a ~4.06 Hz transmission period
channel.period = 8118
# And ANT frequency 57
channel.frequency = 57
try:
# Time to go live
channel.open()
print "Listening for events..."
while True:
time.sleep(60)
finally:
# Shutdown channel
channel.close()
channel.unassign()
# Shutdown
antnode.stop()
@ryober
Copy link

ryober commented Nov 4, 2018

Hello. I wonder if speed.py should work for speed and cadence sensor with only corresponding setID and period?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment