Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created November 9, 2014 13:26
Show Gist options
  • Save autrilla/89a6df8cdbc95df3b77c to your computer and use it in GitHub Desktop.
Save autrilla/89a6df8cdbc95df3b77c to your computer and use it in GitHub Desktop.
import crc16
from collections import namedtuple
from time import sleep
import struct
#### Offsets from STX start
PACKET_STX = 0
PACKET_ADDR_TO = 1
PACKET_ADDR_FROM = 2
PACKET_FLAGS = 3
PACKET_LENGTH = 4
PACKET_MESSAGE = 5
### Offsets from STX end
### Offset from the message length start
PACKET_CRC1 = 5
PACKET_CRC2 = 6
PACKET_ETX = 7
### Offset from the message length end
FLAG_ACK = 0x01
FLAG_NACK = 0x02
FLAG_ACK_REQ = 0x04
FLAG_ID_REQ = 0x08
FLAG_ADDR_NEW = 0x10
FLAG_ADDR_CONFIRM = 0x20
ADDR_NEW_INTERVAL = 50
PACKET_TIMEOUT = 5
devices = []
class Device(object):
"""
A device.
"""
def __init__(self):
self._address = 0x00
self._tickcounter = 0
self._buf = bytearray()
self._timeout = 0
def tick(self):
self._tickcounter += 1
if self._tickcounter % 10 == 0:
self.log("Status update: [address=" + hex(self._address) + "]")
def log(self, message):
print('"' + self.__class__.__name__ + '": ' + message)
def getPacket(self):
# "del self._buf[0]" shifts the buffer by one.
# This way, the leading 0x02 will get removed, and
# on the next iteration it will get shifter to the next 0x02
if self._timeout >= PACKET_TIMEOUT:
# Timed out. Reset the timeout counter
self._timeout = 0
if len(self._buf) > 0: del self._buf[0]
# Shift to the next 0x02
for i in range(len(self._buf)):
if self._buf[i] == 0x02:
self._buf = self._buf[i:]
break
if len(self._buf) < PACKET_LENGTH:
# Packet is too small, doesn't contain a length.
self._timeout += 1
elif len(self._buf) >= self._buf[PACKET_LENGTH]+8:
if self._buf[PACKET_ETX+self._buf[PACKET_LENGTH]] == 0x03:
# ETX OK
crc = crc16.crc16xmodem(bytes(self._buf[PACKET_ADDR_TO:\
self._buf[PACKET_LENGTH]+PACKET_MESSAGE]))
if (crc == (self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*256
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2])):
# CRC OK
packet = Packet(self._buf[PACKET_ADDR_TO],
self._buf[PACKET_ADDR_FROM], self._buf[PACKET_FLAGS],
self._buf[PACKET_LENGTH],
self._buf[PACKET_MESSAGE:PACKET_MESSAGE
+self._buf[PACKET_LENGTH]],
self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*16*16
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2])
# Remove the packet from the buffer
del self._buf[0:PACKET_ETX+self._buf[PACKET_LENGTH]]
return packet
else:
self._timeout += 1
return None;
class A1(Device):
"""
Control Panel
"""
_address = 0x0
def tick(self):
super().tick()
packet = self.getPacket()
if (packet != None):
self.log('Got a packet with flags '+ hex(packet.flags))
if packet.flags & FLAG_ADDR_CONFIRM == FLAG_ADDR_CONFIRM:
#Got response packet
self.log("Got response. Confirming address...")
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1:
content = (struct.pack('B', 0x00) + struct.pack('B', self._address)
+ struct.pack('B', FLAG_ADDR_NEW | FLAG_ACK_REQ)
+ struct.pack('B', 0x01) + ascii(getFreeAddress()).encode('ascii'))
self.log('Sending packet with free addres, ' + hex(getFreeAddress()))
sendPacket(Packet(0x00, self._address, FLAG_ADDR_NEW | FLAG_ACK_REQ,
0x01, ascii(getFreeAddress()).encode('ascii'),
crc16.crc16xmodem(content)))
# We now wait for a response
class AT1(Device):
"""
Temperature probe()
"""
awaiting_new_addr_response = 0
def tick(self):
super().tick()
packet = self.getPacket()
if (packet != None
and (packet.target == 0x00 or packet.target == self._address)):
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW
and self._address == 0x00):
# Got a packet with an address!
self.log('Got packet! Sending FLAG_ADDR_CONFIRM back.')
# self._address = int(packet.message)
sendPacket(generatePacket(packet.sender, self._address,
FLAG_ADDR_CONFIRM, ascii(self._address).encode('ascii')))
# Now we wait until timeout or response
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length',
'message', 'crc'])):
"""
A packet.
This is some data that gets moved around.
"""
def format(self):
return (b'\x02' + struct.pack('B', self.target)
+ struct.pack('B', self.sender) + struct.pack('B', self.flags)
+ struct.pack('B', self.length) + self.message
+ struct.pack('!H', self.crc) + b'\x03')
def sendPacket(packet):
for device in devices:
device._buf += packet.format()
def getFreeAddress():
addresses = set(device._address for device in devices);
for x in range(255):
if x not in addresses:
return x
def generatePacket(target, sender, flags, message):
content = (struct.pack('B', target) + struct.pack('B', sender)
+ struct.pack('B', flags)
+ struct.pack('B', len(message)) + ascii(message).encode('ascii'))
return Packet(target, sender, flags, len(message),
ascii(message).encode('ascii'),
crc16.crc16xmodem(content))
devices.append(A1())
devices.append(AT1())
while True:
for device in devices:
sleep(0.5)
device.tick()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment