Created
November 10, 2014 14:36
-
-
Save autrilla/5ad0a9b63ad681559eca to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simulation of a RS485 network and its devices, for the purpose of testing | |
and implementing a protocol. | |
""" | |
import crc16 | |
from collections import namedtuple | |
from time import sleep | |
import struct | |
import random | |
import logging | |
#### Offsets from STX start | |
PACKET_STX = 0 | |
PACKET_ADDR_TO = 1 | |
PACKET_ADDR_FROM = 2 | |
PACKET_FLAGS = 3 | |
PACKET_LENGTH = 4 | |
PACKET_MESSAGE = 5 | |
### Offsets from STX end | |
### Offset from the message length start | |
PACKET_CRC1 = 5 | |
PACKET_CRC2 = 6 | |
PACKET_ETX = 7 | |
### Offset from the message length end | |
FLAG_ACK = 0x01 | |
FLAG_NACK = 0x02 | |
FLAG_ACK_REQ = 0x04 | |
FLAG_ID_REQ = 0x08 | |
FLAG_ADDR_NEW = 0x10 | |
FLAG_ADDR_CONFIRM = 0x20 | |
ADDR_NEW_INTERVAL = 50 | |
PACKET_TIMEOUT = 5 | |
MAX_RESENDS = 5 | |
devices = [] | |
class Device(object): | |
""" | |
A device. | |
""" | |
def __init__(self): | |
self._address = 0x00 | |
self._tickcounter = 0 | |
self._buf = bytearray() | |
self._timeout = 0 | |
self.serial_number = '%8x' % random.randrange(16**8) | |
def tick(self): | |
self._tickcounter += 1 | |
if self._tickcounter % 10 == 0: | |
self.log_debug("Status update: [serial= " + self.serial_number | |
+ ", address=" + hex(self._address) + "]") | |
def log_debug(self, message): | |
logging.debug('"' + self.__class__.__name__ + '-' + | |
self.serial_number + '": ' + message) | |
#print() | |
def get_packet(self): | |
""" | |
Returns a packet if there is one in the buffer, else returns None | |
""" | |
# "del self._buf[0]" shifts the buffer by one. | |
# This way, the leading 0x02 will get removed, and | |
# on the next iteration it will get shifter to the next 0x02 | |
if self._timeout >= PACKET_TIMEOUT: | |
# Timed out. Reset the timeout counter | |
self._timeout = 0 | |
if len(self._buf) > 0: | |
del self._buf[0] | |
# Shift to the next 0x02 | |
for i in range(len(self._buf)): | |
if self._buf[i] == 0x02: | |
self._buf = self._buf[i:] | |
break | |
if len(self._buf) < PACKET_LENGTH: | |
# Packet is too small, doesn't contain a length. | |
self._timeout += 1 | |
elif len(self._buf) >= self._buf[PACKET_LENGTH]+8: | |
if self._buf[PACKET_ETX+self._buf[PACKET_LENGTH]] == 0x03: | |
# ETX OK | |
crc = crc16.crc16xmodem(bytes(self._buf[PACKET_ADDR_TO:\ | |
self._buf[PACKET_LENGTH]+PACKET_MESSAGE])) | |
if (crc == (self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC1]*256 | |
+ self._buf[self._buf[PACKET_LENGTH]+PACKET_CRC2])): | |
# CRC OK | |
packet = Packet(self._buf[PACKET_ADDR_TO], | |
self._buf[PACKET_ADDR_FROM], | |
self._buf[PACKET_FLAGS], | |
self._buf[PACKET_LENGTH], | |
self._buf[PACKET_MESSAGE:PACKET_MESSAGE | |
+self._buf[PACKET_LENGTH]], | |
self._buf[self._buf[PACKET_LENGTH] | |
+PACKET_CRC1]*16*16 | |
+ self._buf[self._buf[PACKET_LENGTH] | |
+PACKET_CRC2]) | |
# Remove the packet from the buffer | |
del self._buf[0:PACKET_ETX+self._buf[PACKET_LENGTH]] | |
return packet | |
else: | |
self._timeout += 1 | |
return None | |
class A1(Device): | |
""" | |
Control Panel. Master of the network | |
""" | |
def __init__(self): | |
super().__init__() | |
self._address = 0x01 | |
self.awaiting_response_from = None | |
self.resend_attempts = 0 | |
self.last_packet = None | |
self.addresses = dict() | |
self.addresses[0x00] = "0" | |
self.addresses[self._address] = self.serial_number | |
def tick(self): | |
super().tick() | |
packet = self.get_packet() | |
# Response processing start | |
if self._timeout > PACKET_TIMEOUT: | |
self.log_debug("Timed out") | |
if self.resend_attempts < MAX_RESENDS: | |
self.resend_attempts += 1 | |
self.log_debug("Resending") | |
send_packet(self.last_packet) | |
else: | |
# Too many resends. Give up. | |
self._timeout = 0 | |
self.awaiting_response_from = None | |
self.resend_attempts = 0 | |
self.log_debug("Too many resends for packet " + str(self.last_packet)) | |
else: | |
if self.awaiting_response_from != None and packet == None: | |
self._timeout += 1 | |
self.log_debug("Awaiting response...") | |
if packet != None: | |
if (packet.target == self._address | |
and self.awaiting_response_from == packet.sender): | |
self.log_debug("Got response") | |
self.awaiting_response_from = None | |
if packet.flags & FLAG_ADDR_CONFIRM == FLAG_ADDR_CONFIRM: | |
if packet.sender in self.addresses: | |
# Device is already registered. | |
send_packet(generate_packet(packet.sender, | |
self._address, | |
FLAG_NACK, | |
packet.message.decode('ascii'))) | |
print("######################################") | |
else: | |
send_packet(generate_packet(packet.sender, | |
self._address, | |
FLAG_ACK, packet.message)) | |
#self.addresses[packet.sender] = str(packet.message) | |
print("######################################") | |
#Response processing end | |
if self._tickcounter % ADDR_NEW_INTERVAL == 0 or self._tickcounter == 1: | |
response = generate_packet(0x00, self._address, FLAG_ADDR_NEW | | |
FLAG_ACK_REQ, get_free_address()) | |
send_packet(response) | |
self.last_packet = response | |
self.awaiting_response_from = get_free_address() | |
class AT1(Device): | |
""" | |
Temperature probe. Provides temperature data to the master. | |
""" | |
def __init__(self): | |
super().__init__() | |
self.awaiting_response_from = None | |
self.resend_attempts = 0 | |
self.last_packet = None | |
def tick(self): | |
super().tick() | |
packet = self.get_packet() | |
# Response processing start | |
if self._timeout > PACKET_TIMEOUT: | |
self.log_debug("Timed out") | |
if self.resend_attempts < MAX_RESENDS: | |
self.resend_attempts += 1 | |
self.log_debug("Resending") | |
send_packet(self.last_packet) | |
else: | |
# Too many resends. Give up. | |
self._timeout = 0 | |
self.awaiting_response_from = None | |
self.resend_attempts = 0 | |
self.log_debug("Too many resends for packet " + str(self.last_packet)) | |
if self.last_packet.flags == FLAG_ADDR_CONFIRM: | |
self._address = 0x00 | |
else: | |
if self.awaiting_response_from != None and packet == None: | |
self._timeout += 1 | |
self.log_debug("Awaiting response...") | |
if packet != None: | |
if (packet.target == self._address | |
and self.awaiting_response_from == packet.sender): | |
self.log_debug("Got response") | |
self.awaiting_response_from = None | |
if self.last_packet.flags == FLAG_ADDR_CONFIRM: | |
self.log_debug("Packet serial: " + packet.message.decode('ascii')) | |
if packet.message.decode('ascii') == self.serial_number: | |
if packet.flags == FLAG_ACK: | |
self.log_debug("Address confirmed!") | |
else: | |
self._address = 0x00 | |
else: | |
self._address = 0x00 | |
#Response processing end | |
if packet != None: | |
if packet.target == 0x00 or packet.target == self._address: | |
if (packet.flags & FLAG_ADDR_NEW == FLAG_ADDR_NEW | |
and self._address == 0x00): | |
# Got a packet with an address! | |
self.log_debug('Got packet! Sending FLAG_ADDR_CONFIRM back.') | |
self._address = int(packet.message) | |
response = generate_packet(packet.sender, | |
int(packet.message), | |
FLAG_ADDR_CONFIRM, | |
self.serial_number) | |
send_packet(response) | |
self.last_packet = response | |
self.awaiting_response_from = packet.sender | |
class Packet(namedtuple("Packet", ['target', 'sender', 'flags', 'length', | |
'message', 'crc'])): | |
""" | |
A packet. | |
This is some data that gets moved around. | |
""" | |
def format(self): | |
return (b'\x02' + struct.pack('B', self.target) | |
+ struct.pack('B', self.sender) + struct.pack('B', self.flags) | |
+ struct.pack('B', self.length) + self.message | |
+ struct.pack('!H', self.crc) + b'\x03') | |
def send_packet(packet): | |
""" | |
Appends the packet's bytes to the every device's buffer. | |
""" | |
logging.debug(str(packet)) | |
for device in devices: | |
device._buf += packet.format() | |
def get_free_address(): | |
""" | |
Returns a free address. | |
""" | |
addresses = set(device._address for device in devices); | |
for x in range(255): | |
if x not in addresses: | |
return x | |
def generate_packet(target, sender, flags, message): | |
""" | |
Method for easier creation of packets | |
""" | |
content = (struct.pack('B', target) + struct.pack('B', sender) | |
+ struct.pack('B', flags) | |
+ struct.pack('B', len(ascii(message).encode('ascii'))) | |
+ ascii(message).encode('ascii')) | |
return Packet(target, sender, flags, len(ascii(message).encode('ascii')), | |
ascii(message).encode('ascii'), | |
crc16.crc16xmodem(content)) | |
logging.basicConfig(level=logging.DEBUG) | |
logging.info("Starting..") | |
# Add two test devices | |
devices.append(A1()) | |
devices.append(AT1()) | |
devices.append(AT1()) | |
# Tick all devices | |
while True: | |
for device in devices: | |
sleep(0.5) | |
device.tick() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment