Skip to content

Instantly share code, notes, and snippets.

@noteed
Last active July 27, 2020 22:11
Show Gist options
  • Save noteed/9374057 to your computer and use it in GitHub Desktop.
Save noteed/9374057 to your computer and use it in GitHub Desktop.
Dumb workflow engine (in Python)
"""
Simple workflow engine.
A workflow is a directed graph where nodes are activities. The returned
value of an activity is used to dispatch on its outgoing transitions.
Activities are functions, accepting a `state` argument.
def ACTIVITY(state):
pass
The graph is defined as a dictionary where keys are pairs of (activity,
transition).
transitions = {
(ACTIVITY, token): OTHER_ACTIVITY,
...
}
A workflow is created by passing the transitions, an initial activity, and a
list of possible final activities.
Running a workflow is done by using the method `run()`, passing the state to
manipulate.
Usually finite-state machines are fed tokens (or signals), that are mapped to
transitions. Here the activities can generate (return) directly the tokens or
defer to the workflow engine the responsibility to generate tokens (i.e. spawn
an asynchronous task and wait for its result). The transitions show the
possible outcome of each activity (instead of the possible signals that are
accepted after each activity).
If you need to wait for a signal, you write an activity that does the waiting,
or use the asynchronous tasks offered by the workflow engine. In the
`example_workflow` below, the `SECOND` activity waits for some user input.
"""
class STATE(object):
"""
Dummy class, useful to set arbitrary attributes instead of using a dict.
"""
pass
"""
A special token used in transition to match every generated tokens.
"""
DEFAULT = object()
class TOKEN(object):
"""
If a single string is not enough for you, you can use instances of this
class to provide additional data to the next activity.
"""
pass
class TASK(object):
"""
Tasks can be returned by activities. If an activity returns a task, the
workflow engine will run the task, and use its result (which should be a
token) as if it was returned by the activity (and thus select the next
activity as usual).
Tasks are used to allow workflows to do some work asynchronously.
"""
pass
class Workflow(object):
def __init__(self, name, transitions, initial, finals, state=None, verbose=False):
self.transitions = transitions
self.initial = initial
self.finals = finals
self.verbose = verbose
def run_activity(self, a, s, r):
if self.verbose:
if a.__doc__:
print '{} -- {}{}{}'.format(
a.__name__,
s.name + ' ' if hasattr(s, 'name') else '',
a.__doc__,
' from ' + str(r) if r else '')
else:
print a.__name__,
try:
if a.func_code.co_argcount == 2:
return a(s, r)
else:
return a(s)
except Exception, e:
self.exit('Exception running activity {}:\n{}'.format(a.__name__, str(e)))
def next_activity(self, a, r):
"""
Select the next activity to run, based on the result of the current
activity. That result must match a transition. A transition matches if
it has the same value, or if it is the DEFAULT transition.
"""
r = r.__class__.__name__ if isinstance(r, TOKEN) else r
n = self.transitions.get((a, r)) or \
self.transitions.get((a, DEFAULT))
if n is None:
self.exit('No transition found for {}/{}.'.format(a.__name__, r))
return n
def run(self, state):
"""
Run the workflow on a state. The state must accepts that its attributes
`current_activity` and `waiting_task` are set by the workflow.
Return None or a task if the workflow waits for it to complete.
"""
state.current_activity = self.initial.__name__
# state.waiting_task is set when the current activity waits for a
# task's result.
state.waiting_task = False
return self.step(state)
def step(self, state, token=None):
"""
Run the workflow, possibly resuming after a task as completed. In such
a case, pass the task result as the `token` kwarg.
Return None or a task if the workflow waits for it to complete.
"""
activity = self.get_activity(state)
result = None
while activity not in self.finals:
if state.waiting_task:
state.waiting_task = False
result = token
else:
result = self.run_activity(activity, state, result)
if isinstance(result, TASK):
if self.verbose:
print '{} -- Spawning task...'.format(state.current_activity)
state.waiting_task = True
return result
else:
state.current_activity = self.next_activity(activity, result).__name__
activity = self.get_activity(state)
# Run the final activity. TODO Support TASKs.
result = self.run_activity(activity, state, result)
if self.verbose and result is not None:
print '(FINAL activity has a not-none result: {}.)'.format(result)
def get_activity(self, state):
for a in [self.initial] + self.transitions.values():
if a.__name__ == state.current_activity:
return a
def exit(self, msg):
exit(msg)
def INIT(state):
"""Initialization..."""
state.count = 0
def SECOND(state):
"""Get input (`bye` to exit)..."""
state.count += 1
import sys
line = sys.stdin.readline().strip()
if line == 'bye':
return 'BYE'
print line
def THIRD(state):
"""Will wait for a task completion..."""
return TASK()
def FINAL(state):
print 'Count:', state.count
return 19137927
transitions = {
(INIT, None): SECOND,
(SECOND, None): SECOND,
(SECOND, 'BYE'): THIRD,
(THIRD, 'TASK_RESULT'): FINAL,
}
example_workflow = Workflow("example", transitions, INIT, [FINAL], verbose=True)
if __name__ == '__main__':
state = STATE()
example_workflow.run(state)
example_workflow.step(state, 'TASK_RESULT')
"""
Example code using goto.py.
"""
import goto
import random
import sys
class SHOOT(goto.TASK):
def __init__(self, ticks, target, damage):
self.ticks = ticks
self.target = target
self.damage = damage
class HIT(goto.TOKEN):
def __init__(self, damage):
self.damage = damage
def __str__(self):
return 'HIT {}'.format(self.damage)
def INIT(marine):
"""Initializing marine..."""
marine.life = 100
return goto.TASK()
def THINK(marine):
"""Thinking..."""
if marine.name == 'Sigourney':
target = 'Kurt'
does_shoot = random.randint(0,3)
if does_shoot:
damage = random.randint(1,26)
return SHOOT(3, target, damage)
else:
target = 'Sigourney'
does_shoot = random.randint(0,1)
if does_shoot:
damage = random.randint(15,35)
return SHOOT(4, target, damage)
return goto.TASK()
def DAMAGE(marine, projectile):
"""Getting damage..."""
damage = projectile.damage
marine.life -= damage
if marine.life < 0:
marine.life = 0
return goto.TASK()
def FINAL(marine):
print 'Marine `{}` is {}'.format(marine.name, marine.life)
transitions = {
(INIT, 'THINK'): THINK,
(THINK, 'THINK'): THINK,
(THINK, 'HIT'): DAMAGE,
(THINK, 'END'): FINAL,
(DAMAGE, 'THINK'): THINK,
(DAMAGE, 'END'): FINAL,
}
marine_workflow = goto.Workflow("marines", transitions, INIT, [FINAL], verbose=True)
class Marine(object):
def __init__(self, name):
self.name = name
if __name__ == '__main__':
# Initialize all marines.
marines = []
for name in ['Kurt', 'Sigourney']:
marine = Marine(name)
marine_workflow.run(marine)
marines.append(marine)
def get_marine(name):
for marine in marines:
if marine.name == name:
return marine
ticks = 0
projectiles = []
# While they're all alive, let them think, then give them some damage.
while all(map(lambda m: m.life, marines)):
for marine in marines:
task = marine_workflow.step(marine, 'THINK')
if isinstance(task, SHOOT):
task.ticks = ticks + task.ticks
projectiles.append(task)
filtered_projectiles = []
for projectile in projectiles:
if projectile.ticks == ticks:
marine_workflow.step(get_marine(projectile.target), HIT(projectile.damage))
else:
filtered_projectiles.append(projectile)
ticks += 1
projectiles = filtered_projectiles
# At least one marine is dead. It's the end.
for marine in marines:
marine_workflow.step(marine, 'END')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment