Skip to content

Instantly share code, notes, and snippets.

@dankrause
Last active August 29, 2015 14:05
Show Gist options
  • Save dankrause/0309d3509e9cc74c3e31 to your computer and use it in GitHub Desktop.
Save dankrause/0309d3509e9cc74c3e31 to your computer and use it in GitHub Desktop.
Toy finite state machine implementation in Python
import itertools
class FSMException(Exception):
pass
class DuplicateRule(FSMException):
pass
class InvalidStateTransition(FSMException):
pass
class State(object):
def __init__(self, func):
self._func = func
def __call__(self, event, old_state):
return self._func(event, old_state)
@property
def name(self):
return self._func.__name__
class TerminalState(State):
pass
class Event(dict):
def __init__(self, name, *args, **kwargs):
self._name = name
super(Event, self).__init__(*args, **kwargs)
@property
def name(self):
return self._name
class FiniteStateMachine(object):
def __init__(self):
self._state = None
self._rules = {}
def add_rule(self, event_types, valid_states, end_state):
for trigger in list(itertools.product(event_types, valid_states)):
if trigger in self._rules:
raise DuplicateRule(*trigger)
self._rules[trigger] = end_state
def start(self, event):
while self._state is None or not isinstance(self._state, TerminalState):
trigger = (event.name, self.state)
if trigger not in self._rules:
raise InvalidStateTransition(*trigger)
old_state = self.state
self._state = self._rules[trigger]
event = self._state(event, old_state)
def reset(self):
self._state = None
@property
def state(self):
return self._state
from fsm import *
import time
@State
def starting(event, old_state):
print "Initializing... {}".format(event)
time.sleep(2)
return Event("run", **event)
@State
def running(event, old_state):
print "Running... {}".format(event)
time.sleep(1)
countdown = event["countdown"]
total = event.get("total", 0) + countdown
if countdown == 0:
return Event("stop", total=total)
else:
return Event("run", countdown=countdown-1, total=total)
@State
def stopping(event, old_state):
print "Shutting Down... {}".format(event)
time.sleep(1)
if event["total"] < 50:
return Event("error", reason="total is less than 50", total=event["total"])
else:
return Event("halt", **event)
@TerminalState
def stopped(event, old_state):
print "Stopped. {}".format(event)
@TerminalState
def error(event, old_state):
print "An error occurrred in {}: {}".format(old_state, event)
def build_fsm():
f = FiniteStateMachine()
f.add_rule(["start"], [None, error, stopped], starting)
f.add_rule(["run"], [starting, running], running)
f.add_rule(["error"], [None, starting, running, stopping], error)
f.add_rule(["stop"], [running], stopping)
f.add_rule(["halt"], [stopping], stopped)
return f
fsm = build_fsm()
fsm.start(Event("start", countdown=10))
fsm.reset()
fsm.start(Event("start", countdown=5))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment