Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created November 10, 2014 14:36
Show Gist options
  • Save autrilla/5ad0a9b63ad681559eca to your computer and use it in GitHub Desktop.
Save autrilla/5ad0a9b63ad681559eca to your computer and use it in GitHub Desktop.
"""
Simulation of a RS485 network and its devices, for the purpose of testing
and implementing a protocol.
"""
import crc16
from collections import namedtuple
from time import sleep
import struct
import random
import logging
#### Offsets from STX start
PACKET_STX = 0
PACKET_ADDR_TO = 1
PACKET_ADDR_FROM = 2
PACKET_FLAGS = 3
PACKET_LENGTH = 4
PACKET_MESSAGE = 5
### Offsets from STX end
### Offset from the message length start
PACKET_CRC1 = 5
PACKET_CRC2 = 6
PACKET_ETX = 7
### Offset from the message length end
FLAG_ACK = 0x01
FLAG_NACK = 0x02
FLAG_ACK_REQ = 0x04
FLAG_ID_REQ = 0x08
FLAG_ADDR_NEW = 0x10
FLAG_ADDR_CONFIRM = 0x20
ADDR_NEW_INTERVAL = 50
PACKET_TIMEOUT = 5
MAX_RESENDS = 5
devices = []
class Device(object):
"""
A device.
"""
def __init__(self):
self._address = 0x00
self._tickcounter = 0
self._buf = bytearray()
self._timeout = 0
self.serial_number = '%8x' % random.randrange(16**8)
def tick(self):
self._tickcounter += 1
if self._tickcounter % 10 == 0:
self.log_debug("Status update: [serial= " + self.serial_number
+ ", address=" + hex(self._address) + "]")
def log_debug(self, message):
logging.debug('"' + self.__class__.__name__ + '-' +
self.serial_number + '": ' + message)
#print()
def get_packet(self):
"""
Returns a packet if there is one in the buffer, else returns None
"""
# "del self._buf[0]" shifts the buffer by one.
# This way, the leading 0x02 will get removed, and
# on the next iteration it will get shifter to the next 0x02
if self._timeout >= PACKET_TIMEOUT:
# Timed out. Reset the timeout counter
self._timeout = 0
if len(self._buf) > 0:
del self._buf[0]
# Shift to the next 0x02
for i in range(len(self._buf)):
if self._buf[i] == 0x02:
self._buf = self._buf[i:]
break
if len(self._buf) < PACKET_LENGTH:
# Packet is too small, doesn't contain a length.
self._timeout += 1
elif len(self._buf) >= self._buf[PACKET_LENGTH]+8:
if self._buf[PACKET_ETX+self._buf[PACKET_LENGTH]] == 0x03:
# ETX OK
crc = crc16.crc16xmodem(bytes(self._buf[PACKET_ADDR_TO:\
self._buf[PACKET_LENGTH]+PACKET_MESSAGE]))
if (crc == (self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*256
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2])):
# CRC OK
packet = Packet(self._buf[PACKET_ADDR_TO],
self._buf[PACKET_ADDR_FROM],
self._buf[PACKET_FLAGS],
self._buf[PACKET_LENGTH],
self._buf[PACKET_MESSAGE:PACKET_MESSAGE
+self._buf[PACKET_LENGTH]],
self._buf[self._buf[PACKET_LENGTH]
+PACKET_CRC1]*16*16
+ self._buf[self._buf[PACKET_LENGTH]
+PACKET_CRC2])
# Remove the packet from the buffer
del self._buf[0:PACKET_ETX+self._buf[PACKET_LENGTH]]
return packet
else:
self._timeout += 1
return None
class A1(Device):
"""
Control Panel. Master of the network
"""
def __init__(self):
super().__init__()
self._address = 0x01
self.awaiting_response_from = None
self.resend_attempts = 0
self.last_packet = None
self.addresses = dict()
self.addresses[0x00] = "0"
self.addresses[self._address] = self.serial_number
def tick(self):
super().tick()
packet = self.get_packet()
# Response processing start
if self._timeout > PACKET_TIMEOUT:
self.log_debug("Timed out")
if self.resend_attempts < MAX_RESENDS:
self.resend_attempts += 1
self.log_debug("Resending")
send_packet(self.last_packet)
else:
# Too many resends. Give up.
self._timeout = 0
self.awaiting_response_from = None
self.resend_attempts = 0
self.log_debug("Too many resends for packet " + str(self.last_packet))
else:
if self.awaiting_response_from != None and packet == None:
self._timeout += 1
self.log_debug("Awaiting response...")
if packet != None:
if (packet.target == self._address
and self.awaiting_response_from == packet.sender):
self.log_debug("Got response")
self.awaiting_response_from = None
if packet.flags & FLAG_ADDR_CONFIRM == FLAG_ADDR_CONFIRM:
if packet.sender in self.addresses:
# Device is already registered.
send_packet(generate_packet(packet.sender,
self._address,
FLAG_NACK,
packet.message.decode('ascii')))
print("######################################")
else:
send_packet(generate_packet(packet.sender,
self._address,
FLAG_ACK, packet.message))
#self.addresses[packet.sender] = str(packet.message)
print("######################################")
#Response processing end
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1:
response = generate_packet(0x00, self._address, FLAG_ADDR_NEW |
FLAG_ACK_REQ, get_free_address())
send_packet(response)
self.last_packet = response
self.awaiting_response_from = get_free_address()
class AT1(Device):
"""
Temperature probe. Provides temperature data to the master.
"""
def __init__(self):
super().__init__()
self.awaiting_response_from = None
self.resend_attempts = 0
self.last_packet = None
def tick(self):
super().tick()
packet = self.get_packet()
# Response processing start
if self._timeout > PACKET_TIMEOUT:
self.log_debug("Timed out")
if self.resend_attempts < MAX_RESENDS:
self.resend_attempts += 1
self.log_debug("Resending")
send_packet(self.last_packet)
else:
# Too many resends. Give up.
self._timeout = 0
self.awaiting_response_from = None
self.resend_attempts = 0
self.log_debug("Too many resends for packet " + str(self.last_packet))
if self.last_packet.flags == FLAG_ADDR_CONFIRM:
self._address = 0x00
else:
if self.awaiting_response_from != None and packet == None:
self._timeout += 1
self.log_debug("Awaiting response...")
if packet != None:
if (packet.target == self._address
and self.awaiting_response_from == packet.sender):
self.log_debug("Got response")
self.awaiting_response_from = None
if self.last_packet.flags == FLAG_ADDR_CONFIRM:
self.log_debug("Packet serial: " + packet.message.decode('ascii'))
if packet.message.decode('ascii') == self.serial_number:
if packet.flags == FLAG_ACK:
self.log_debug("Address confirmed!")
else:
self._address = 0x00
else:
self._address = 0x00
#Response processing end
if packet != None:
if packet.target == 0x00 or packet.target == self._address:
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW
and self._address == 0x00):
# Got a packet with an address!
self.log_debug('Got packet! Sending FLAG_ADDR_CONFIRM back.')
self._address = int(packet.message)
response = generate_packet(packet.sender,
int(packet.message),
FLAG_ADDR_CONFIRM,
self.serial_number)
send_packet(response)
self.last_packet = response
self.awaiting_response_from = packet.sender
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length',
'message', 'crc'])):
"""
A packet.
This is some data that gets moved around.
"""
def format(self):
return (b'\x02' + struct.pack('B', self.target)
+ struct.pack('B', self.sender) + struct.pack('B', self.flags)
+ struct.pack('B', self.length) + self.message
+ struct.pack('!H', self.crc) + b'\x03')
def send_packet(packet):
"""
Appends the packet's bytes to the every device's buffer.
"""
logging.debug(str(packet))
for device in devices:
device._buf += packet.format()
def get_free_address():
"""
Returns a free address.
"""
addresses = set(device._address for device in devices);
for x in range(255):
if x not in addresses:
return x
def generate_packet(target, sender, flags, message):
"""
Method for easier creation of packets
"""
content = (struct.pack('B', target) + struct.pack('B', sender)
+ struct.pack('B', flags)
+ struct.pack('B', len(ascii(message).encode('ascii')))
+ ascii(message).encode('ascii'))
return Packet(target, sender, flags, len(ascii(message).encode('ascii')),
ascii(message).encode('ascii'),
crc16.crc16xmodem(content))
logging.basicConfig(level=logging.DEBUG)
logging.info("Starting..")
# Add two test devices
devices.append(A1())
devices.append(AT1())
devices.append(AT1())
# Tick all devices
while True:
for device in devices:
sleep(0.5)
device.tick()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment