Created
November 9, 2014 13:26
-
-
Save autrilla/89a6df8cdbc95df3b77c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import crc16 | |
from collections import namedtuple | |
from time import sleep | |
import struct | |
#### Offsets from STX start | |
PACKET_STX = 0 | |
PACKET_ADDR_TO = 1 | |
PACKET_ADDR_FROM = 2 | |
PACKET_FLAGS = 3 | |
PACKET_LENGTH = 4 | |
PACKET_MESSAGE = 5 | |
### Offsets from STX end | |
### Offset from the message length start | |
PACKET_CRC1 = 5 | |
PACKET_CRC2 = 6 | |
PACKET_ETX = 7 | |
### Offset from the message length end | |
FLAG_ACK = 0x01 | |
FLAG_NACK = 0x02 | |
FLAG_ACK_REQ = 0x04 | |
FLAG_ID_REQ = 0x08 | |
FLAG_ADDR_NEW = 0x10 | |
FLAG_ADDR_CONFIRM = 0x20 | |
ADDR_NEW_INTERVAL = 50 | |
PACKET_TIMEOUT = 5 | |
devices = [] | |
class Device(object): | |
""" | |
A device. | |
""" | |
def __init__(self): | |
self._address = 0x00 | |
self._tickcounter = 0 | |
self._buf = bytearray() | |
self._timeout = 0 | |
def tick(self): | |
self._tickcounter += 1 | |
if self._tickcounter % 10 == 0: | |
self.log("Status update: [address=" + hex(self._address) + "]") | |
def log(self, message): | |
print('"' + self.__class__.__name__ + '": ' + message) | |
def getPacket(self): | |
# "del self._buf[0]" shifts the buffer by one. | |
# This way, the leading 0x02 will get removed, and | |
# on the next iteration it will get shifter to the next 0x02 | |
if self._timeout >= PACKET_TIMEOUT: | |
# Timed out. Reset the timeout counter | |
self._timeout = 0 | |
if len(self._buf) > 0: del self._buf[0] | |
# Shift to the next 0x02 | |
for i in range(len(self._buf)): | |
if self._buf[i] == 0x02: | |
self._buf = self._buf[i:] | |
break | |
if len(self._buf) < PACKET_LENGTH: | |
# Packet is too small, doesn't contain a length. | |
self._timeout += 1 | |
elif len(self._buf) >= self._buf[PACKET_LENGTH]+8: | |
if self._buf[PACKET_ETX+self._buf[PACKET_LENGTH]] == 0x03: | |
# ETX OK | |
crc = crc16.crc16xmodem(bytes(self._buf[PACKET_ADDR_TO:\ | |
self._buf[PACKET_LENGTH]+PACKET_MESSAGE])) | |
if (crc == (self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*256 | |
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2])): | |
# CRC OK | |
packet = Packet(self._buf[PACKET_ADDR_TO], | |
self._buf[PACKET_ADDR_FROM], self._buf[PACKET_FLAGS], | |
self._buf[PACKET_LENGTH], | |
self._buf[PACKET_MESSAGE:PACKET_MESSAGE | |
+self._buf[PACKET_LENGTH]], | |
self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*16*16 | |
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2]) | |
# Remove the packet from the buffer | |
del self._buf[0:PACKET_ETX+self._buf[PACKET_LENGTH]] | |
return packet | |
else: | |
self._timeout += 1 | |
return None; | |
class A1(Device): | |
""" | |
Control Panel | |
""" | |
_address = 0x0 | |
def tick(self): | |
super().tick() | |
packet = self.getPacket() | |
if (packet != None): | |
self.log('Got a packet with flags '+ hex(packet.flags)) | |
if packet.flags & FLAG_ADDR_CONFIRM == FLAG_ADDR_CONFIRM: | |
#Got response packet | |
self.log("Got response. Confirming address...") | |
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1: | |
content = (struct.pack('B', 0x00) + struct.pack('B', self._address) | |
+ struct.pack('B', FLAG_ADDR_NEW | FLAG_ACK_REQ) | |
+ struct.pack('B', 0x01) + ascii(getFreeAddress()).encode('ascii')) | |
self.log('Sending packet with free addres, ' + hex(getFreeAddress())) | |
sendPacket(Packet(0x00, self._address, FLAG_ADDR_NEW | FLAG_ACK_REQ, | |
0x01, ascii(getFreeAddress()).encode('ascii'), | |
crc16.crc16xmodem(content))) | |
# We now wait for a response | |
class AT1(Device): | |
""" | |
Temperature probe() | |
""" | |
awaiting_new_addr_response = 0 | |
def tick(self): | |
super().tick() | |
packet = self.getPacket() | |
if (packet != None | |
and (packet.target == 0x00 or packet.target == self._address)): | |
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW | |
and self._address == 0x00): | |
# Got a packet with an address! | |
self.log('Got packet! Sending FLAG_ADDR_CONFIRM back.') | |
# self._address = int(packet.message) | |
sendPacket(generatePacket(packet.sender, self._address, | |
FLAG_ADDR_CONFIRM, ascii(self._address).encode('ascii'))) | |
# Now we wait until timeout or response | |
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length', | |
'message', 'crc'])): | |
""" | |
A packet. | |
This is some data that gets moved around. | |
""" | |
def format(self): | |
return (b'\x02' + struct.pack('B', self.target) | |
+ struct.pack('B', self.sender) + struct.pack('B', self.flags) | |
+ struct.pack('B', self.length) + self.message | |
+ struct.pack('!H', self.crc) + b'\x03') | |
def sendPacket(packet): | |
for device in devices: | |
device._buf += packet.format() | |
def getFreeAddress(): | |
addresses = set(device._address for device in devices); | |
for x in range(255): | |
if x not in addresses: | |
return x | |
def generatePacket(target, sender, flags, message): | |
content = (struct.pack('B', target) + struct.pack('B', sender) | |
+ struct.pack('B', flags) | |
+ struct.pack('B', len(message)) + ascii(message).encode('ascii')) | |
return Packet(target, sender, flags, len(message), | |
ascii(message).encode('ascii'), | |
crc16.crc16xmodem(content)) | |
devices.append(A1()) | |
devices.append(AT1()) | |
while True: | |
for device in devices: | |
sleep(0.5) | |
device.tick() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment