Skip to content

Instantly share code, notes, and snippets.

@rlkelly
Last active October 26, 2022 05:58
Show Gist options
  • Save rlkelly/b56e23e02564728510464c186a56cc50 to your computer and use it in GitHub Desktop.
Save rlkelly/b56e23e02564728510464c186a56cc50 to your computer and use it in GitHub Desktop.
import random
import time
from uuid import uuid4
DB = {}
def fish():
time.sleep(random.random() * 3)
return 'CATCH_FISH' if random.random() > 0.5 else 'IS_NIGHT'
def weigh_fish():
return random.random() > 10
class RESTART:
def __init__(self, retries):
self.retries = retries
self.iterations = 0
class Execute:
def __init__(self, state, func):
self.state = state
self.func = func
class Effect:
def __init__(self, name, func, retries=0):
self.name = name
self.func = func
self.uuid = self._assign_uuid()
self.retries = retries
self.iteration = 0
def _assign_uuid(self):
self.uuid = str(uuid4())
class Branch:
def __init__(self, cond, left, right):
self.cond = cond
self.left = left
self.right = right
class WaitFor:
def __init__(self, state, do):
self.state = state
self.do = do
class GOTO:
def __init__(self, state, index):
self.state = state
self.index = index
class Final:
def __init__(self, state):
self.state = state
class StateMachine:
steps = []
state = None
wait_val = None
def __init_subclass__(cls, **kwargs):
def init_decorator(previous_init):
def new_init(self, *args, **kwargs):
previous_init(self, *args, **kwargs)
if type(self) == cls:
self.__post_init__()
return new_init
cls.__init__ = init_decorator(cls.__init__)
def __post_init__(self):
self.steps = self.process(self.steps)
def process(self, steps):
global DB
processed_steps = []
for step in steps:
if isinstance(step, Effect):
DB[step.uuid] = None
processed_steps.append(step)
elif isinstance(step, Branch):
processed_steps.append(
Branch(
step.cond,
self.process(step.left),
self.process(step.right)
)
)
else:
processed_steps.append(step)
return processed_steps
def run(self):
while True:
response = self.execute()
print('run response', response)
if response is None:
continue
if response in ['FAILURE', 'WAITING', 'DONE']:
return response
if isinstance(response, Final):
# print final statee
print(response.state)
return response
if isinstance(response, WaitFor):
return response
def execute(self, steps=None):
while True:
for step in (steps or self.steps):
response = self.execute_step(step)
if response in ['DONE', 'RESTART', 'FAILURE', 'WAITING']:
return response
if isinstance(response, RESTART):
# for the sake of simplicity, this fully reboots the database state
DB = {}
if response.iterations == response.retries:
return 'FAILURE'
self.state = None
break
elif isinstance(response, Final):
return step
elif isinstance(response, WaitFor):
if self.wait_val != step.state:
return 'WAITING'
def execute_step(self, step):
global DB
print('execute', step, self.state)
if self.state in ['DONE', 'FAILURE']:
return self.state
if isinstance(step, Effect):
if step.uuid in DB and DB[step.uuid] is not None:
self.state = DB[step.uuid]
return self.state
else:
self.state = step.func()
step.iteration += 1
DB[step.uuid] = self.state
return self.state
elif isinstance(step, Branch):
if step.cond(self.state):
return self.execute(step.left)
else:
return self.execute(step.right)
elif isinstance(step, Final):
self.state = 'DONE'
return step
elif isinstance(step, WaitFor):
# WAIT FOR
return step
elif isinstance(step, Execute):
step.func()
self.state = step.state
return step
elif isinstance(step, RESTART):
step.iterations += 1
if step.iterations == step.retries:
self.state = 'FAILURE'
return 'FAILURE'
DB = {}
return 'RESTART'
class FishingStateMachine(StateMachine):
""" fish for 3 days """
def __init__(self):
super().__init__()
self.steps = [
Effect('FISH', fish, retries=3),
Branch(
cond=lambda x: x == 'IS_NIGHT',
left=[
Execute('SLEEP', lambda: time.sleep(3)),
RESTART(retries=3),
],
right=[
WaitFor('CATCH_FISH', weigh_fish),
Branch(
cond=lambda x: random.random() * 10 > 5,
left=[
Final('EAT_FISH'),
],
right=[
Final('GIVE_UP'),
],
),
],
),
]
if __name__ == '__main__':
fsm = FishingStateMachine()
response = fsm.run()
if response == 'WAITING':
# This is just to model the idea that it's waiting for an external
# effect to modify it.
print('waiting...')
time.sleep(1)
fsm.wait_val = 'CATCH_FISH'
fsm.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment