Skip to content

Instantly share code, notes, and snippets.

@ChristianTremblay
Created October 3, 2016 00:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChristianTremblay/56309026ce4e279fa9916978003d435a to your computer and use it in GitHub Desktop.
Save ChristianTremblay/56309026ce4e279fa9916978003d435a to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 16 18:13:39 2016
@author: CTremblay
"""
import fysom
from ..crc import calc_incr_header_crc, data_crc
class Receive_Frame_FSM(object):
def __init__(self, port):
self.port = port
# Flush buffer
self.port.datalink.uart.flush()
self.timeout_flag = False
self.state_machine = fysom.Fysom(initial='idle',
events= [
{'name': 'eat_an_octet', 'src': 'idle', 'dst': '='},
{'name': 'eat_an_error', 'src': 'idle', 'dst': '='},
{'name': 'repeat_idle', 'src': 'idle', 'dst': '='},
{'name': 'timeout', 'src': ['preamble, header, skip_data, data'], 'dst': 'idle'},
{'name': 'error', 'src': ['preamble, header, skip_data, data'], 'dst': 'idle'},
{'name': 'preamble_one', 'src': 'idle', 'dst': 'preamble'},
{'name': 'not_preamble', 'src': 'preamble', 'dst': 'idle'},
{'name': 'repeated_preamble_one', 'src': 'preamble', 'dst': '='},
{'name': 'preamble_two', 'src': 'preamble', 'dst': 'header'},
{'name': 'check_header_crc', 'src': 'header', 'dst': 'header_crc'},
{'name': 'was_frame_type', 'src': 'header', 'dst': '='},
{'name': 'was_destination', 'src': 'header', 'dst': '='},
{'name': 'was_source', 'src': 'header', 'dst': '='},
{'name': 'length_one', 'src': 'header', 'dst': '='},
{'name': 'length_two', 'src': 'header', 'dst': '='},
{'name': 'not_for_us', 'src': 'header_crc', 'dst': 'idle'},
{'name': 'no_data', 'src': 'header_crc', 'dst': 'idle'},
{'name': 'bad_crc', 'src': 'header_crc', 'dst': 'idle'},
{'name': 'data', 'src': 'header_crc', 'dst': 'data'},
{'name': 'data_not_for_us', 'src': 'header_crc', 'dst': 'skip_data'},
{'name': 'frame_too_long', 'src': 'header_crc', 'dst': 'skip_data'},
{'name': 'done', 'src': 'skip_data', 'dst': 'idle'},
{'name': 'data_octet', 'src': ['skip_data', 'data'], 'dst': '='},
{'name': 'crc_one', 'src': 'data', 'dst': '='},
{'name': 'crc_two', 'src': 'data', 'dst': 'data_crc'},
{'name': 'bad_crc', 'src': 'data_crc', 'dst': 'idle'},
{'name': 'good_crc', 'src': 'data_crc', 'dst': 'idle'}
],
callbacks= {
'onenteridle': self._do_idle,
'onreenteridle': self._do_idle,
'onleaveidle': self._reset_silence_and_increment_event_count,
'onenterpreamble': self._do_preamble,
'onreenterpreamble': self._do_preamble,
'onleavepreamble': self._leave_timeout,
'onenterheader': self._do_header,
'onreenterheader': self._do_header,
'onleaveheader': self._leave_timeout,
'onenterheader_crc': self._do_header_crc,
'onenterdata' : self._do_data,
'onreenterdata': self._do_data,
'onleavedata': self._leave_data,
'onenterdata_crc': self._do_data_crc,
'onenterskip_data': self._do_skip_data,
'onreenterskip_data': self._do_skip_data,
'onleaveskip_data': self._leave_timeout,
})
"""
Some general functions to be used in transitions
"""
def _before_state_read_byte(self, event):
"""
Will retreive a byte from UART buffer
"""
print('read called')
self.port.RS485_read_byte()
return
def _reset_silence_and_increment_event_count(self, event):
self.port.silence_timer.reset()
self.port.event_count += 1
return
def _leave_timeout(self, event):
"""
Typically, when leaving on timeout, don't touch to silence timer
nor event count
"""
if not self.timeout_flag:
self._reset_silence_and_increment_event_count()
else:
self.timeout_flag = False
return
"""
Receive FSM
"""
def _do_idle(self, event):
"""
In the IDLE state, the node waits for the beginning of a frame
"""
print('idle', event)
if self.port.receive_error:
self.port.receive_error = False
self.state_machine.eat_an_error()
return
if self.port.data_available and self.port.data_register == b'\x55':
self.port.data_available = False
self.state_machine.trigger('preamble_one')
return
elif self.port.data_available and self.port.data_register != b'\x55':
self.port.data_available = False
self.state_machine.eat_an_octet()
return
return
def _do_preamble(self, event):
"""
In the preamble state, the node waits for the second
octet of the preamble
"""
print('preamble')
self._before_state_read_byte()
# Timeout
if self.port.silence_timer.value < self.mstp_param.TFRAME_ABORT:
self.timeout_flag = True
self.state_machine.timeout()
return
# Error
if self.port.receive_error:
self.port.receive_error = False
self.state_machine.error()
return
# Repeated preamble 1
if self.port.data_available \
and self.port.data_register == b'\x55':
self.port.data_available = False
self.state_machine.repeated_preamble_one()
return
# Not preamble
if self.port.data_available \
and self.port.data_register not in b'\xff\x55':
self.port.data_available = False
self.state_machine.not_preamble()
return
# Preamble 2
if self.port.data_available \
and self.port.data_register == b'\xff':
self.port.data_available = False
self.port.index = 0
self.port.header_crc = b'\xff'
self.state_machine.preamble_two()
return
return
def _do_header(self, event):
"""
In the header state, the node waits for the fixed message header
"""
print('header')
self._before_state_read_byte()
#Timeout
if self.port.silence_timer.value < self.mstp_param.TFRAME_ABORT:
self.port.received_invalid_frame = True
self.timeout_flag = True
self.state_machine.timeout()
return
# Error
if self.port.receive_error:
self.port.receive_error = False
self.port.received_invalid_frame = True
self.state_machine.error()
return
# Frame Type
if self.port.data_available and self.port.index == 0:
self.port.data_available = False
self.port.frame_type = self.port.data_register
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.index = 1
self.state_machine.was_frame_type()
return
# Destination
if self.port.data_available and self.port.index == 1:
self.port.data_available = False
self.port.destination_address = self.port.data_register
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.index = 2
self.state_machine.was_destination()
return
# Source
if self.port.data_available and self.port.index == 2:
self.port.data_available = False
self.port.source_address = self.port.data_register
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.index = 3
self.state_machine.was_source()
return
# Length1
if self.port.data_available and self.port.index == 3:
self.port.data_available = False
self.port.data_length = self.port.data_register * 256
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.index = 4
self.state_machine.length_one()
return
# Length2
if self.port.data_available and self.port.index == 4:
self.port.data_available = False
self.port.data_length += self.port.data_register
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.index = 5
self.state_machine.length_two()
return
# Header_CRC
if self.port.data_available and self.port.index == 5:
self.port.data_available = False
self.port.header_crc = calc_incr_header_crc(self.port.data_register, self.port.header_crc)
self.port.header_crc_actual = self.port.data_register
self.state_machine.check_header_crc()
return
return
def _do_header_crc(self, event):
"""
In the header_crc state, the node validate the CRC
on the fixed message header
"""
print('header_crc')
#self._before_state_read_byte()
# Bad CRC
if self.port.header_crc != b'\x55':
self.port.received_invalid_frame = True
self.state_machine.bad_crc()
return
# Frame not for us
if self.port.data_length == 0 \
and self.port.destination_address != self.port.this_station \
and self.port.destination_address != b'\xff':
self.state_machine.not_for_us()
return
# Data not for us
if self.port.data_length > 0 \
and self.port.destination_address != self.port.this_station \
and self.port.destination_address != b'\xff':
self.state_machine.data_not_for_us()
return
# Frame too long
if self.port.data_length > len(self.port.inputbuffer) \
and (self.port.destination_address == self.port.this_station \
or self.port.destination_address == b'\xff'):
self.port.received_invalid_frame = True
self.port.index = 0
self.state_machine.frame_too_long()
return
# No Data
if self.port.data_length == 0 \
and (self.port.destination_address == self.port.this_station \
or self.port.destination_address == b'\xff'):
self.port.received_invalid_frame = True
self.port.index = 0
self.state_machine.no_data()
return
# Data
if self.port.data_length > 0 \
and self.port.data_length <= len(self.port.inputbuffer) \
and (self.port.destination_address == self.port.this_station \
or self.port.destination_address == b'\xff'):
self.port.index = 0
self.port.data_CRC = b'\xffff'
self.state_machine.data()
return
return
def _do_data(self, event):
"""
In the data state, the node waits for the data portion of a frame
"""
print('data')
self._before_state_read_byte()
# Timeout
if self.port.silence_timer.value < self.mstp_param.TFRAME_ABORT:
self.port.received_invalid_frame = True
self.timeout_flag = True
self.state_machine.timeout()
return
# Error
if self.port.receive_error:
self.port.receive_error = False
self.port.received_invalid_frame = True
self.state_machine.error()
return
# data_octet
if self.port.data_available \
and self.port.index < self.port.data_length:
self.port.data_available = False
self.port.data_crc = data_crc(self.port.data_register, self.port.data_CRC)
self.port.inputbuffer[self.port.index] = self.port.data_register
self.port.index += 1
self.state_machine.data_octet()
return
# crc_1
if self.port.data_available \
and self.port.index == self.port.data_length:
self.port.data_available = False
self.port.data_crc = crc.data_crc(self.port.data_register, self.port.data_CRC)
self.port.index += 1
self.state_machine.crc_one()
return
# crc_1
if self.port.data_available \
and self.port.index == (self.port.data_length + 1):
self.port.data_available = False
self.port.data_crc = data_crc(self.port.data_register, self.port.data_CRC)
self.state_machine.crc_two()
return
return
def _leave_data(self, event):
"""
When leaving data state, should reset silence_timer only
when no timeout is raised.
In that case, don't touch event count
"""
if not self.timeout_flag:
self.port.silence_timer.reset()
else:
self.timeout_flag = False
return
def _do_data_crc(self, event):
"""
In the Data_CRC state, the node validates the CRC of the message data.
"""
print('data_crc')
if self.port.data_crc != b'\xF0B8':
self.port.receive_invalid_frame = True
self.state_machine.bad_crc()
return
else:
self.port.receive_valid_frame = True
print('Frame : ', self.port.inputbuffer)
#Signal : Done : send frame to MSTP FSM
self.state_machine.good_crc()
return
return
def _do_skip_data(self, event):
"""
In the Skip_data state, the node waits for the data portion of a frame
to be received so that its content can be ignored.
"""
print('skip_data')
self._before_state_read_byte()
# Timeout
if self.port.silence_timer.value < self.mstp_param.TFRAME_ABORT:
self.port.received_invalid_frame = True
self.timeout_flag = True
self.state_machine.timeout()
return
# Error
if self.port.receive_error:
self.port.receive_error = False
self.port.received_invalid_frame = True
self.state_machine.error()
return
# data_octet
if self.port.data_available \
and self.port.index < (self.port.data_length + 1):
self.port.data_available = False
self.port.index += 1
self.state_machine.data_octet()
return
# done
if self.port.data_available \
and self.port.index == (self.port.data_length + 1):
self.port.data_available = False
self.state_machine.done()
return
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment