Skip to content

Instantly share code, notes, and snippets.

@ryang14
Created December 19, 2020 15:16
Show Gist options
  • Save ryang14/8d09939b9aa15716d4fbeca4fe982905 to your computer and use it in GitHub Desktop.
Save ryang14/8d09939b9aa15716d4fbeca4fe982905 to your computer and use it in GitHub Desktop.
A canio based CAN node library
import struct
import time
import board
import canio
import digitalio
# Create a node on a CAN network
# Uses the 29-bit extended message ID which contains both a node ID and topic ID
# Node ID is the source when sending from a node and destination when sending from the controller to a node
# This library assumes a network with one controller and multiple nodes
#
# Usage:
# from cannode import Node
#
# node = Node(42)
#
# while True:
# node.run()
class Node:
def __init__(self, node_id, rx=board.CAN_RX, tx=board.CAN_TX, timeout=.001):
self.node_id = node_id
self.callbacks = dict()
self.matches = []
self.heartbeat_rate = 1
self.__last_heartbeat = time.monotonic()
# Setup CAN stuff
# If the CAN transceiver has a standby pin, bring it out of standby mode
if hasattr(board, 'CAN_STANDBY'):
standby = digitalio.DigitalInOut(board.CAN_STANDBY)
standby.switch_to_output(False)
# If the CAN transceiver is powered by a boost converter, turn on its supply
if hasattr(board, 'BOOST_ENABLE'):
boost_enable = digitalio.DigitalInOut(board.BOOST_ENABLE)
boost_enable.switch_to_output(True)
# canio library objects
self.can = canio.CAN(rx=rx, tx=tx,
baudrate=250_000, auto_restart=True)
self.listener = self.can.listen(matches=self.matches, timeout=timeout)
# Add a heartbeat listener for debug
self.add_listener(0, self.handle_heartbeat)
# Handle heartbeat messages
# This is used for debug
def handle_heartbeat(self, message):
node_id, now_ms = struct.unpack("<II", message.data)
print(f"received heartbeat from node {node_id}: now_ms={now_ms}")
# Add a callback function that will be called when a message with the corresponding topic ID is received
# The message object will be passed to the function as a parameter
def add_listener(self, topic_id, callback):
# Listen to messages addressed to this node
message_id = int.from_bytes(struct.pack(">BH", self.node_id, topic_id), 'big')
self.matches.append(canio.Match(message_id, extended=True))
# Also listen to broadcast messages
message_id = int.from_bytes(struct.pack(">BH", 255, topic_id), 'big')
self.matches.append(canio.Match(message_id, extended=True))
self.callbacks[topic_id] = callback
# Restart the CAN bus listener with the added matches
self.listener.deinit()
self.listener = self.can.listen(matches=self.matches, timeout=.01)
# Listen for new messages and pass them to the corresponding callback
def listen(self):
if self.listener == None:
return
message = self.listener.receive()
if message is None:
#print("No messsage received within timeout")
return
data = message.data
if len(data) != 8:
print(f"Unusual message length {len(data)}")
return
node_id, topic_id = struct.unpack(">xBH", message.id.to_bytes(4, 'big'))
if topic_id in self.callbacks:
self.callbacks[topic_id](message)
# Send a message
def send(self, topic_id, data):
message_id = int.from_bytes(struct.pack(">BH", self.node_id, topic_id), 'big')
self.can.send(canio.Message(message_id, data, extended=True))
# Send a heartbeat message
def heartbeat(self):
now_ms = (time.monotonic_ns() // 1_000_000) & 0xffffffff
self.send(0, struct.pack("<II", self.node_id, now_ms))
def run(self):
self.listen()
if time.monotonic() - self.__last_heartbeat >= self.heartbeat_rate:
self.__last_heartbeat = time.monotonic()
self.heartbeat()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment