Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created November 10, 2014 12:38
Show Gist options
  • Save autrilla/1e4b847fa279c7c5395c to your computer and use it in GitHub Desktop.
Save autrilla/1e4b847fa279c7c5395c to your computer and use it in GitHub Desktop.
"""
Simulation of a RS485 network and its devices, for the purpose of testing
and implementing a protocol.
"""
import crc16
from collections import namedtuple
from time import sleep
import struct
#### Offsets from STX start
PACKET_STX = 0
PACKET_ADDR_TO = 1
PACKET_ADDR_FROM = 2
PACKET_FLAGS = 3
PACKET_LENGTH = 4
PACKET_MESSAGE = 5
### Offsets from STX end
### Offset from the message length start
PACKET_CRC1 = 5
PACKET_CRC2 = 6
PACKET_ETX = 7
### Offset from the message length end
FLAG_ACK = 0x01
FLAG_NACK = 0x02
FLAG_ACK_REQ = 0x04
FLAG_ID_REQ = 0x08
FLAG_ADDR_NEW = 0x10
FLAG_ADDR_CONFIRM = 0x20
ADDR_NEW_INTERVAL = 50
PACKET_TIMEOUT = 5
MAX_RESENDS = 5
devices = []
class Device(object):
class A1(Device):
"""
Control Panel. Master of the network
"""
def __init__(self):
super().__init__()
self._address = 0x01
self.awaiting_response_from = None
def tick(self):
super().tick()
packet = self.getPacket()
# Response processing start
if self._timeout > PACKET_TIMEOUT:
self.log("Timed out")
if self._resendAttempts < MAX_RESENDS:
self._resendAttempts += 1
self.log("Resending")
sendPacket(self.lastPacket)
else:
# Too many resends. Give up.
self._timeout = 0
self.awaiting_response_from = None
self._resendAttempts = 0
self.log("Too many resends for packet " + str(self.lastPacket))
else:
if self.awaiting_response_from != None and packet == None:
self._timeout += 1
self.log("Awaiting response...")
if (packet != None):
self.log("Got a packet: " + str(packet))
if (packet.target == self._address
and self.awaiting_response_from == packet.sender):
self.log("Got response")
self.awaiting_response_from = None
#Response processing end
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1:
self.log('Sending packet with free addres, ' + hex(getFreeAddress()))
response = generatePacket(0x00, self._address, FLAG_ADDR_NEW |
FLAG_ACK_REQ, getFreeAddress())
sendPacket(response)
self.lastPacket = response
self.awaiting_response_from = getFreeAddress()
class AT1(Device):
"""
Temperature probe. Provides temperature data to the master.
"""
def __init__(self):
super().__init__()
awaiting_response_from = None
def tick(self):
super().tick()
packet = self.getPacket()
if (packet != None
and (packet.target == 0x00 or packet.target == self._address)):
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW
and self._address == 0x00):
# Got a packet with an address!
self.log('Got packet! Sending FLAG_ADDR_CONFIRM back.')
# self._address = int(packet.message)
sendPacket(generatePacket(packet.sender, int(packet.message),
FLAG_ADDR_CONFIRM, ascii(self._address).encode('ascii')))
# Now we wait until timeout or response
awaiting_response_from = packet.sender
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length',
'message', 'crc'])):
"""
A packet.
This is some data that gets moved around.
"""
def format(self):
return (b'\x02' + struct.pack('B', self.target)
+ struct.pack('B', self.sender) + struct.pack('B', self.flags)
+ struct.pack('B', self.length) + self.message
+ struct.pack('!H', self.crc) + b'\x03')
def sendPacket(packet):
"""
Appends the packet's bytes to the every device's buffer.
"""
for device in devices:
device._buf += packet.format()
def getFreeAddress():
"""
Returns a free address.
"""
addresses = set(device._address for device in devices);
for x in range(255):
if x not in addresses:
return x
def generatePacket(target, sender, flags, message):
"""
Method for easier creation of packets
"""
content = (struct.pack('B', target) + struct.pack('B', sender)
+ struct.pack('B', flags)
+ struct.pack('B', len(ascii(message).encode('ascii')))
+ ascii(message).encode('ascii'))
return Packet(target, sender, flags, len(ascii(message).encode('ascii')),
ascii(message).encode('ascii'),
crc16.crc16xmodem(content))
# Add two test devices
devices.append(A1())
devices.append(AT1())
# Tick all devices
while True:
for device in devices:
sleep(0.5)
device.tick()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment