Created
November 10, 2014 12:38
-
-
Save autrilla/1e4b847fa279c7c5395c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simulation of a RS485 network and its devices, for the purpose of testing | |
and implementing a protocol. | |
""" | |
import crc16 | |
from collections import namedtuple | |
from time import sleep | |
import struct | |
#### Offsets from STX start | |
PACKET_STX = 0 | |
PACKET_ADDR_TO = 1 | |
PACKET_ADDR_FROM = 2 | |
PACKET_FLAGS = 3 | |
PACKET_LENGTH = 4 | |
PACKET_MESSAGE = 5 | |
### Offsets from STX end | |
### Offset from the message length start | |
PACKET_CRC1 = 5 | |
PACKET_CRC2 = 6 | |
PACKET_ETX = 7 | |
### Offset from the message length end | |
FLAG_ACK = 0x01 | |
FLAG_NACK = 0x02 | |
FLAG_ACK_REQ = 0x04 | |
FLAG_ID_REQ = 0x08 | |
FLAG_ADDR_NEW = 0x10 | |
FLAG_ADDR_CONFIRM = 0x20 | |
ADDR_NEW_INTERVAL = 50 | |
PACKET_TIMEOUT = 5 | |
MAX_RESENDS = 5 | |
devices = [] | |
class Device(object): | |
class A1(Device): | |
""" | |
Control Panel. Master of the network | |
""" | |
def __init__(self): | |
super().__init__() | |
self._address = 0x01 | |
self.awaiting_response_from = None | |
def tick(self): | |
super().tick() | |
packet = self.getPacket() | |
# Response processing start | |
if self._timeout > PACKET_TIMEOUT: | |
self.log("Timed out") | |
if self._resendAttempts < MAX_RESENDS: | |
self._resendAttempts += 1 | |
self.log("Resending") | |
sendPacket(self.lastPacket) | |
else: | |
# Too many resends. Give up. | |
self._timeout = 0 | |
self.awaiting_response_from = None | |
self._resendAttempts = 0 | |
self.log("Too many resends for packet " + str(self.lastPacket)) | |
else: | |
if self.awaiting_response_from != None and packet == None: | |
self._timeout += 1 | |
self.log("Awaiting response...") | |
if (packet != None): | |
self.log("Got a packet: " + str(packet)) | |
if (packet.target == self._address | |
and self.awaiting_response_from == packet.sender): | |
self.log("Got response") | |
self.awaiting_response_from = None | |
#Response processing end | |
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1: | |
self.log('Sending packet with free addres, ' + hex(getFreeAddress())) | |
response = generatePacket(0x00, self._address, FLAG_ADDR_NEW | | |
FLAG_ACK_REQ, getFreeAddress()) | |
sendPacket(response) | |
self.lastPacket = response | |
self.awaiting_response_from = getFreeAddress() | |
class AT1(Device): | |
""" | |
Temperature probe. Provides temperature data to the master. | |
""" | |
def __init__(self): | |
super().__init__() | |
awaiting_response_from = None | |
def tick(self): | |
super().tick() | |
packet = self.getPacket() | |
if (packet != None | |
and (packet.target == 0x00 or packet.target == self._address)): | |
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW | |
and self._address == 0x00): | |
# Got a packet with an address! | |
self.log('Got packet! Sending FLAG_ADDR_CONFIRM back.') | |
# self._address = int(packet.message) | |
sendPacket(generatePacket(packet.sender, int(packet.message), | |
FLAG_ADDR_CONFIRM, ascii(self._address).encode('ascii'))) | |
# Now we wait until timeout or response | |
awaiting_response_from = packet.sender | |
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length', | |
'message', 'crc'])): | |
""" | |
A packet. | |
This is some data that gets moved around. | |
""" | |
def format(self): | |
return (b'\x02' + struct.pack('B', self.target) | |
+ struct.pack('B', self.sender) + struct.pack('B', self.flags) | |
+ struct.pack('B', self.length) + self.message | |
+ struct.pack('!H', self.crc) + b'\x03') | |
def sendPacket(packet): | |
""" | |
Appends the packet's bytes to the every device's buffer. | |
""" | |
for device in devices: | |
device._buf += packet.format() | |
def getFreeAddress(): | |
""" | |
Returns a free address. | |
""" | |
addresses = set(device._address for device in devices); | |
for x in range(255): | |
if x not in addresses: | |
return x | |
def generatePacket(target, sender, flags, message): | |
""" | |
Method for easier creation of packets | |
""" | |
content = (struct.pack('B', target) + struct.pack('B', sender) | |
+ struct.pack('B', flags) | |
+ struct.pack('B', len(ascii(message).encode('ascii'))) | |
+ ascii(message).encode('ascii')) | |
return Packet(target, sender, flags, len(ascii(message).encode('ascii')), | |
ascii(message).encode('ascii'), | |
crc16.crc16xmodem(content)) | |
# Add two test devices | |
devices.append(A1()) | |
devices.append(AT1()) | |
# Tick all devices | |
while True: | |
for device in devices: | |
sleep(0.5) | |
device.tick() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment