Created
June 30, 2017 18:48
-
-
Save djnugent/527fccc02c23bf70c99eef8fe91ab8cf to your computer and use it in GitHub Desktop.
HSM in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from StateMachine import StateMachine, State, StateTransition | |
hsm = StateMachine() | |
def count_up_enter(self,kargs): | |
self.i = 0 | |
def count_up_loop(self,kargs): | |
print(self.i) | |
self.i += 1 | |
if self.i == 10: | |
return StateTransition("countdown",self.i) | |
def count_down_enter(self,i): | |
self.i = i | |
def count_down_loop(self,kargs): | |
self.i -= 1 | |
print(self.i) | |
if self.i == 0: | |
return StateTransition("exit") | |
countup = State("countup",5) | |
countup.set_entry(count_up_enter) | |
countup.set_loop(count_up_loop) | |
countdown = State("countdown",10) | |
countdown.set_entry(count_down_enter) | |
countdown.set_loop(count_down_loop) | |
hsm.add_state(countup) | |
hsm.add_state(countdown) | |
hsm.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import types | |
class StateMachine(): | |
def __init__(self): | |
self.states = {} | |
self.start_state = None | |
self.current_state = None | |
self.started = False | |
self.running = True | |
# State info | |
self.state_entered = False | |
self.last_loop = 0 | |
self.entry_kwargs = {} | |
self.loop_kwargs = {} | |
self.exit_kwargs = {} | |
def add_state(self, state): | |
if state.loop is None: | |
raise NotImplementedError("Loop function must be implemented") | |
# Add state to list | |
self.states[state.name.upper()] = state | |
# First state addition become entry point to state machine | |
if self.start_state is None: | |
self.start_state = state | |
def set_start_state(self, name): | |
if name.upper() in self.states: | |
self.start_state = self.states[name.upper()] | |
else: | |
raise KeyError("State does not exist. Please add all states before setting start state") | |
def restart(self): | |
self.started = False | |
self.state_entered = False | |
self.running = True | |
def step(self): | |
# Initialize entry point of state machine | |
if not self.started: | |
self.current_state = self.start_state | |
self.started = True | |
# Enter new state | |
if not self.state_entered: | |
if self.current_state.entry is not None: | |
self.current_state.entry(self.entry_kwargs) | |
self.state_entered = True | |
self.last_loop = 0 | |
# Loop state | |
transition = None | |
# As fast as we can | |
if self.current_state.rate is None: | |
transition = self.current_state.loop(self.loop_kwargs) | |
# At a given rate | |
elif time.time() - self.last_loop > 1.0/self.current_state.rate: | |
transition = self.current_state.loop(self.loop_kwargs) | |
self.last_loop = time.time() | |
# Check for transition and exit state | |
if transition is not None: | |
if transition.next_state_name.upper() == 'EXIT': | |
self.running = False | |
# exit our current_state | |
if self.current_state.exit is not None: | |
self.current_state.exit(self.exit_kwargs) | |
elif transition.next_state_name.upper() in self.states: | |
# exit our current_state | |
if self.current_state.exit is not None: | |
self.current_state.exit(self.exit_kwargs) | |
# transition to next state | |
self.current_state = self.states[transition.next_state_name.upper()] | |
self.entry_kwargs = transition.entry_kwargs | |
self.loop_kwargs = transition.loop_kwargs | |
self.exit_kwargs = transition.exit_kwargs | |
self.state_entered = False | |
else: | |
raise KeyError("State does not exist") | |
def run(self): | |
while self.running: | |
self.step() | |
class State(): | |
def __init__(self,name, rate = None): | |
self.name = name | |
self.rate = rate | |
self.entry = None | |
self.loop = None | |
self.exit = None | |
def set_loop(self,func): | |
setattr(self, 'loop', func.__get__(self, self.__class__)) | |
def set_entry(self,func): | |
setattr(self, 'entry', func.__get__(self, self.__class__)) | |
def set_exit(self,func): | |
setattr(self, 'exit', func.__get__(self, self.__class__)) | |
class StateTransition(): | |
def __init__(self, next_state, entry_kwargs={}, loop_kwargs={}, exit_kwargs={}): | |
self.next_state_name = next_state | |
self.entry_kwargs = entry_kwargs | |
self.loop_kwargs = loop_kwargs | |
self.exit_kwargs = exit_kwargs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment