Skip to content

Instantly share code, notes, and snippets.

@eliotb
Created September 3, 2015 02:33
Show Gist options
  • Save eliotb/5b647e5498831710f551 to your computer and use it in GitHub Desktop.
Save eliotb/5b647e5498831710f551 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
General CAN functions using Kvaser canlib
"""
from __future__ import print_function
import logging
from sys import exit
# kvaser canlib
from canlib import *
def setup(port_id, bitrate=canBITRATE_250K):
"""Setup kvaser CAN given port_id which may be device channel or serial number"""
cl = canlib()
channels = cl.getNumberOfChannels()
logging.info("Kvaser canlib version: %s, %d channels available" %
(cl.getVersion(), channels))
chans = {}
for ch in range(0, channels):
try:
info = (ch,
int(cl.getChannelData_Serial(ch)),
cl.getChannelData_Name(ch),
cl.getChannelData_EAN(ch))
chans[ch] = info # key on channel number
if info[1] >= channels:
chans[info[1]] = info # key on serial number
except (canError) as ex:
print(ex)
try:
ch = chans[port_id][0]
except KeyError:
print('A valid port number is required for --port option:')
for k in chans.items():
print('%d: %s' % k)
print()
exit()
chan = cl.openChannel(ch, canOPEN_ACCEPT_VIRTUAL | canOPEN_REQUIRE_EXTENDED)
chan.ioCtl_set_timer_scale(100) # microseconds per tick
chan.timer_scale = 100
chan.setBusOutputControl(canDRIVER_NORMAL)
chan.setBusParams(bitrate)
chan.busOn()
return chan, cl
error_flags = (canMSG_ERROR_FRAME | canMSG_NERR |
canMSGERR_HW_OVERRUN | canMSGERR_SW_OVERRUN)
def flags_str(flags):
fl = []
if flags & canMSG_RTR: # 1 Message is a remote request
fl.append('RTR|')
if flags & canMSG_STD: #2 Message has a standard (11-bit) identifier
fl.append('STD')
if flags & canMSG_EXT: #4 Message has a extended (29-bit) identifier
fl.append('EXT')
if flags & canMSG_WAKEUP: #8 Message is a WAKEUP message (SWC hardware.)
fl.append('WAKE')
if flags & canMSG_ERROR_FRAME: #32 Message represents an error frame.
fl.append('ERR')
#The following flags can be returned from canRead() et al, but cannot be passed to canWrite():
if flags & canMSG_NERR: # 16 NERR was active during the message (TJA1054 etc. hardware. See Note 4 below.)/tr>
fl.append('NERR')
if flags & canMSG_TXACK : # 64 Message is a TX ACK (meaning that the message was really sent)/tr>
fl.append('TXACK')
if flags & canMSG_TXRQ: # 128 Message is a TX REQ (meaning that the message was transferred to the CAN controller)/tr>
fl.append('TXRQ')
if flags & canMSGERR_HW_OVERRUN: # 512 Hardware buffer overrun.
fl.append('HWOVR')
if flags & canMSGERR_SW_OVERRUN: # 1024 Software buffer overrun.
fl.append('SWOVR')
return '|'.join(fl)
def add_options(p):
'''Add CAN related options to an optparse instance'''
p.add_option('-b', '--bus', type='int', default=1,
help='CAN bus index 1 or 2. Default=%default')
p.add_option('-p', '--port', type='int', default=0,
help='CAN channel #, typically 0 or 1, or device serial')
p.add_option('-F', '--fake', action='store_true',
help='Send packets direct to localhost MQTT broker instead of serial port')
if __name__ == '__main__':
from collections import defaultdict
from optparse import OptionParser
import random
import time
def parse_options():
p = OptionParser()
add_options(p)
p.add_option('-l', '--loglevel', type='int', default=3,
help='Logging level. Default=%default')
opts, args = p.parse_args()
level = {1: logging.ERROR, 2: logging.WARNING, 3: logging.INFO,
4: logging.DEBUG}.get(opts.loglevel, logging.DEBUG)
opts.loglevel = level
return opts, args
opts, args = parse_options()
logging.basicConfig(level=opts.loglevel)
busname = 'CAN-%d' % opts.bus
ch, lib = setup(opts.port)
num_messages = 100
if args[0].startswith('w'): #
max_std = 2047
logging.info('Generating %d packets to %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port))
sent = []
for i in range(num_messages):
msg = []
for m in range(random.randint(1, 8)):
msg.append(random.randint(0, 255))
id = random.randint(0, max_std)
sent.append(id)
print(id, msg)
ch.write(id, msg)
print(sent)
else:
msgs_received = defaultdict(list)
logging.info('Receiving %d packets from %s (port#%d)' % (num_messages, ch.getChannelData_Name(), opts.port))
for i in range(num_messages):
try:
msg = ch.read(5000) # Ohhh, timeout seems to be in milliseconds, should be float seconds!
except canNoMsg as e:
logging.warning('No messages received for 5 seconds')
continue
except Exception as e:
logging.error(str(e))
continue
# print("%d rx id: %08X msg: %s" % (i, id, [x for x in msg]))
msgs_received[msg[0]].append(msg)
ml = msgs_received.items()
ml.sort()
for k, v in ml:
print('Rx ID: %08X' % k)
for msg in v:
print("dlc: %08X msg: %s" % (msg[2], [x for x in msg[1]]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment