Skip to content

Instantly share code, notes, and snippets.

@moritzmhmk
Created February 6, 2015 10:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moritzmhmk/c83ea57dd124ea79b997 to your computer and use it in GitHub Desktop.
Save moritzmhmk/c83ea57dd124ea79b997 to your computer and use it in GitHub Desktop.
CAN
import time
import random
import socket
import struct
import argparse
class CANMessage:
id = 0
dlc = 0
data = bytearray(0x00)
EFF_MASK=0x1FFFFFFF
EFF_FLAG=0x80000000
def __str__(self):
return 'CANMessage: id=%x, dlc=%x, data=%s' % (self.id, self.dlc, self.data)
@classmethod
def fromFrame(cls, frame, format):
can_id, can_dlc, data = struct.unpack(format, frame)
can_id = can_id&CANMessage.EFF_MASK
msg = CANMessage()
msg.id = can_id
msg.dlc = can_dlc
msg.data = data[:can_dlc]
return msg
@classmethod
def fromTrackMessage(cls, trackmsg):
msg = CANMessage()
can_id = trackmsg.command<<17
can_id |= trackmsg.resp<<16
can_id |= trackmsg.hash
msg.id = can_id
msg.dlc = len(trackmsg.data)
msg.data = trackmsg.data
return msg
def toFrame(self, format):
_id = self.id & CANMessage.EFF_MASK | CANMessage.EFF_FLAG
_data = self.data.ljust(8, b'\x00')
return struct.pack(format, _id, self.dlc, _data)
class TrackMessage:
hash = 0
resp = 0
command = 0
data = bytearray(0x00)
def __str__(self):
return 'TrackMessage: hash=%x, resp=%x, command=%x, data=%s' % (self.hash, self.resp, self.command, self.data)
@classmethod
def fromCANMessage(cls, msg):
trackmsg=TrackMessage()
trackmsg.hash = msg.id&0b1111111111111111
trackmsg.resp = msg.id>>16&0b1
trackmsg.command = msg.id>>17&0b11111111
trackmsg.data = bytearray(msg.data[:msg.dlc])
return trackmsg
class CanSocket(object):
def __init__(self, interface):
if interface=="udp":
self.can_frame_format = "!IB8s"
self.in_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.in_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.in_socket.bind(("", 15730))
self.out_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.out_socket.connect(("localhost", 15731))
else:
self.can_frame_format = "=IB3x8s"
self.in_socket = socket.socket(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
self.in_socket.bind((interface,))
self.out_socket = self.in_socket
def send(self, trackmsg):
msg = CANMessage.fromTrackMessage(trackmsg)
can_frame = msg.toFrame(self.can_frame_format)
self.out_socket.send(can_frame)
def recv(self):
can_frame, addr = self.in_socket.recvfrom(16)
msg = CANMessage.fromFrame(can_frame, self.can_frame_format)
#print(msg)
trackmsg = TrackMessage.fromCANMessage(msg)
#print(trackmsg)
return trackmsg
if __name__ == '__main__':
UID = random.getrandbits(31)|1<<31#TODO
highUID = UID>>16&0xFFFF
lowUID = UID&0xFFFF
parser = argparse.ArgumentParser()
parser.add_argument("interface", help='interface ("canX" or "udp")')
parser.add_argument("command", help="command to send")
parser.add_argument("data", help="data to send")
args = parser.parse_args()
can_socket = CanSocket(args.interface)
trackmsg = TrackMessage()
trackmsg.hash = (highUID ^ lowUID) & 0b1111111101111111 | 0b0000001100000000
trackmsg.resp = 0b0
trackmsg.command = int(args.command, 0)#0x0
trackmsg.data = bytearray.fromhex(args.data)
print(trackmsg)
can_socket.send(trackmsg)
while True:
_trackmsg = can_socket.recv()
if _trackmsg.command==trackmsg.command and bool(_trackmsg.resp):
print("SUCCESS!")
break
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment