Skip to content

Instantly share code, notes, and snippets.

@artizirk
Created August 2, 2021 15:49
Show Gist options
  • Save artizirk/e9f5e6f1d4d495b2f2d95f87e25f079c to your computer and use it in GitHub Desktop.
Save artizirk/e9f5e6f1d4d495b2f2d95f87e25f079c to your computer and use it in GitHub Desktop.
Minimal CANOpen in plain standalone Python3 using only standard lib
#!/usr/bin/python3
"""Minimal CANOpen in plain standalone Python3 using only standard lib
This is incomplete but works well enough to read and write small Objects using SDO protocol
"""
import ctypes
import enum
import socket
import struct
import time
# CAN frame packing/unpacking (see 'struct can_frame' in <linux/can.h>)
can_frame_fmt = "=IB3x8s"
can_frame_size = struct.calcsize(can_frame_fmt)
NODE_ID = 3
# Broadcast function codes
NMT = 0b0000 # 0x0
SYNC = 0b0001 # 0x1
TIME = 0b0010 # 0x2
# Peer to Peer object function codes
EMCY = 0b0001 # 0x1
PDO1_TX = 0b0011 # 0x3 3 From device to Rpi
PDO1_RX = 0b0100 # 0x4 4 From Rpi to Device
PDO2_TX = 0b0101 # 0x5 5
PDO2_RX = 0b0110 # 0x6 6
SDO_TX = 0b1011 # 0xB 11
SDO_RX = 0b1100 # 0xC 12
NMT_ERR = 0b1110 # 0xE 14
P2POFC = {
EMCY: "EMCY",
PDO1_TX: "PDO1_TX",
PDO1_RX: "PDO1_RX",
PDO2_TX: "PDO2_TX",
PDO2_RX: "PDO2_RX",
SDO_TX: "SDO_TX",
SDO_RX: "SDO_RX",
NMT_ERR: "NMT_ERR",
}
class SDO_CS(enum.IntEnum):
"""Service Data Object Command Specifier"""
DOWNLOAD_INIT_REQ = 1
DOWNLOAD_INIT_RESP = 3
DOWNLOAD_SEGMENT_REQ = 0
DOWNLOAD_SEGMENT_RESP = 1
UPLOAD_INIT_REQ = 2
UPLOAD_INIT_RESP = 2
UPLOAD_SEGMENT_REQ = 3
UPLOAD_SEGMENT_RESP = 0
ABORT = 4
class PrettyStructure(ctypes.BigEndianStructure):
def __repr__(self) -> str:
values = ", ".join(f"{name}={value}" for name, value in self._asdict().items())
return f"<{self.__class__.__name__}: {values}>"
def _asdict(self) -> dict:
return {field[0]: getattr(self, field[0]) for field in self._fields_}
def mkcob(func, node=NODE_ID):
"""Build COB-ID form Function Code and Node ID
COB-ID is a fancy name for can_id in the can packet"""
return (func & 0xf) << 7 | (node & 0x7f)
def parscob(cob):
"""Return Function Code and Node ID tuble form COB-ID"""
return (cob >> 7) & 0xf, cob & 0x7f
def build_can_frame(can_id, data):
can_dlc = len(data)
data = data.ljust(8, b'\x00')
return struct.pack(can_frame_fmt, can_id, can_dlc, data)
def dissect_can_frame(frame):
can_id, can_dlc, data = struct.unpack(can_frame_fmt, frame)
return can_id, can_dlc, data[:can_dlc]
def build_nmt(cs, node):
"""Build a Network Management packet form command specifier (cs) and node id"""
return build_can_frame(mkcob(NMT, 0), cs.to_bytes(1, 'little')+node.to_bytes(1, 'little'))
class SDOCmd(PrettyStructure):
"""SDO Protocol Initiate Command from CiA301 7.2.4.3.3
Used as Download Request or as Upload Response"""
_pack_ = 1
_fields_ = (
("cs", ctypes.c_uint8, 3), # command specifier in SDO_CS
("x", ctypes.c_uint8, 1), # not used, always 0
("n", ctypes.c_uint8, 2), # size of data if transfer is expedited and `s` is set, else 0
("e", ctypes.c_uint8, 1), # transfer type, 0: normal (use segmented transfer), 1: expedited (aka data is in last 4 bytes)
("s", ctypes.c_uint8, 1), # size indicator, 0: size is not indicated, 1: size is indicated
)
# SDO Packet format
# | 0 | 1 2 | 3 | 4 5 6 7|
# |CMD|OBJID|IDX| DATA |
# Where OBJID and IDX combo is often called multiplexer
def build_sdo_upload_init(node, object_id, index=0):
"""Read object_id from device"""
cmd = SDOCmd(cs=SDO_CS.UPLOAD_INIT_REQ)
multiplexer = object_id.to_bytes(2, "little") + index.to_bytes(1, "little")
return build_can_frame(mkcob(SDO_RX, node), bytes(cmd) + multiplexer + b'\x00\x00\x00\x00')
def build_sdo_download_init(node, object_id, index=0, data=b''):
"""Write data to object_id on device"""
cmd = SDOCmd(cs=SDO_CS.DOWNLOAD_INIT_REQ, e=1, s=1, n=4-len(data))
multiplexer = object_id.to_bytes(2, "little") + index.to_bytes(1, "little")
return build_can_frame(mkcob(SDO_RX, node), bytes(cmd) + multiplexer + data + (b"\x00"*cmd.n))
def recv(s):
"""Receive can messages from socket and pretty print them"""
cf, addr = s.recvfrom(can_frame_size)
cobid, l, data = dissect_can_frame(cf)
func_id, node = parscob(cobid)
print(f"From node {node} func {P2POFC.get(func_id)}({func_id}) with data {' '.join(['{:02x}'.format(x) for x in data])}")
if func_id in {SDO_TX, SDO_RX}:
sdo_cs = SDOCmd.from_buffer_copy(data[0:1])
object_id = int.from_bytes(data[1:3], 'little')
print(f" {str(SDO_CS(sdo_cs.cs))}:{sdo_cs} {hex(object_id)}:0x{data[3:4].hex()} {data[4:7].hex()}")
def main():
s = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
s.bind(('can0',))
print(f"Sending NMT reset node to {NODE_ID}")
s.send(build_nmt(129, NODE_ID))
recv(s)
# print("Sending NMT start remote node")
# s.send(build_nmt(1, NODE_ID))
print("get device type")
s.send(build_sdo_upload_init(NODE_ID, 0x1000))
recv(s)
print("get errors")
s.send(build_sdo_upload_init(NODE_ID, 0x1001))
recv(s)
print("Sending NMT start remote node")
s.send(build_nmt(1, NODE_ID))
recv(s)
recv(s)
print("get errors")
s.send(build_sdo_upload_init(NODE_ID, 0x1001))
recv(s)
print("get current mode of operation")
s.send(build_sdo_upload_init(NODE_ID, 0x6061))
recv(s)
print("set mode of operation to velocity mode")
s.send(build_sdo_download_init(NODE_ID, 0x6060, data=b'\x03'))
recv(s)
recv(s)
print("get current profile acceleration")
s.send(build_sdo_upload_init(NODE_ID, 0x6083))
recv(s)
print("set acceleration")
s.send(build_sdo_download_init(NODE_ID, 0x6083, data=(1000000).to_bytes(4, 'little')))
print("Switch statemachine to READY_TO_SWITCH_ON state")
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x06\x00'))
recv(s)
recv(s)
print("Switch statemachine to SWITCHED_ON state")
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x07\x00'))
recv(s)
recv(s)
print("Switch statemachine to OPERATION_ENABLED state")
s.send(build_sdo_download_init(NODE_ID, 0x6040, data=b'\x0f\x00'))
recv(s)
recv(s)
print("Set target velocity")
s.send(build_sdo_download_init(NODE_ID, 0x60ff, data=(5000000).to_bytes(4, 'little')))
time.sleep(3)
print("Set target velocity")
s.send(build_sdo_download_init(NODE_ID, 0x60ff, data=(0).to_bytes(4, 'little')))
while True:
recv(s)
# try:
# s.send(cf)
# except OSError:
# print('Error sending CAN frame')
#
# try:
# s.send(build_can_frame(0x01, b'\x01\x02\x03'))
# except OSError:
# print('Error sending CAN frame')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment