Last active
July 27, 2020 22:11
-
-
Save noteed/9374057 to your computer and use it in GitHub Desktop.
Dumb workflow engine (in Python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple workflow engine. | |
A workflow is a directed graph where nodes are activities. The returned | |
value of an activity is used to dispatch on its outgoing transitions. | |
Activities are functions, accepting a `state` argument. | |
def ACTIVITY(state): | |
pass | |
The graph is defined as a dictionary where keys are pairs of (activity, | |
transition). | |
transitions = { | |
(ACTIVITY, token): OTHER_ACTIVITY, | |
... | |
} | |
A workflow is created by passing the transitions, an initial activity, and a | |
list of possible final activities. | |
Running a workflow is done by using the method `run()`, passing the state to | |
manipulate. | |
Usually finite-state machines are fed tokens (or signals), that are mapped to | |
transitions. Here the activities can generate (return) directly the tokens or | |
defer to the workflow engine the responsibility to generate tokens (i.e. spawn | |
an asynchronous task and wait for its result). The transitions show the | |
possible outcome of each activity (instead of the possible signals that are | |
accepted after each activity). | |
If you need to wait for a signal, you write an activity that does the waiting, | |
or use the asynchronous tasks offered by the workflow engine. In the | |
`example_workflow` below, the `SECOND` activity waits for some user input. | |
""" | |
class STATE(object): | |
""" | |
Dummy class, useful to set arbitrary attributes instead of using a dict. | |
""" | |
pass | |
""" | |
A special token used in transition to match every generated tokens. | |
""" | |
DEFAULT = object() | |
class TOKEN(object): | |
""" | |
If a single string is not enough for you, you can use instances of this | |
class to provide additional data to the next activity. | |
""" | |
pass | |
class TASK(object): | |
""" | |
Tasks can be returned by activities. If an activity returns a task, the | |
workflow engine will run the task, and use its result (which should be a | |
token) as if it was returned by the activity (and thus select the next | |
activity as usual). | |
Tasks are used to allow workflows to do some work asynchronously. | |
""" | |
pass | |
class Workflow(object): | |
def __init__(self, name, transitions, initial, finals, state=None, verbose=False): | |
self.transitions = transitions | |
self.initial = initial | |
self.finals = finals | |
self.verbose = verbose | |
def run_activity(self, a, s, r): | |
if self.verbose: | |
if a.__doc__: | |
print '{} -- {}{}{}'.format( | |
a.__name__, | |
s.name + ' ' if hasattr(s, 'name') else '', | |
a.__doc__, | |
' from ' + str(r) if r else '') | |
else: | |
print a.__name__, | |
try: | |
if a.func_code.co_argcount == 2: | |
return a(s, r) | |
else: | |
return a(s) | |
except Exception, e: | |
self.exit('Exception running activity {}:\n{}'.format(a.__name__, str(e))) | |
def next_activity(self, a, r): | |
""" | |
Select the next activity to run, based on the result of the current | |
activity. That result must match a transition. A transition matches if | |
it has the same value, or if it is the DEFAULT transition. | |
""" | |
r = r.__class__.__name__ if isinstance(r, TOKEN) else r | |
n = self.transitions.get((a, r)) or \ | |
self.transitions.get((a, DEFAULT)) | |
if n is None: | |
self.exit('No transition found for {}/{}.'.format(a.__name__, r)) | |
return n | |
def run(self, state): | |
""" | |
Run the workflow on a state. The state must accepts that its attributes | |
`current_activity` and `waiting_task` are set by the workflow. | |
Return None or a task if the workflow waits for it to complete. | |
""" | |
state.current_activity = self.initial.__name__ | |
# state.waiting_task is set when the current activity waits for a | |
# task's result. | |
state.waiting_task = False | |
return self.step(state) | |
def step(self, state, token=None): | |
""" | |
Run the workflow, possibly resuming after a task as completed. In such | |
a case, pass the task result as the `token` kwarg. | |
Return None or a task if the workflow waits for it to complete. | |
""" | |
activity = self.get_activity(state) | |
result = None | |
while activity not in self.finals: | |
if state.waiting_task: | |
state.waiting_task = False | |
result = token | |
else: | |
result = self.run_activity(activity, state, result) | |
if isinstance(result, TASK): | |
if self.verbose: | |
print '{} -- Spawning task...'.format(state.current_activity) | |
state.waiting_task = True | |
return result | |
else: | |
state.current_activity = self.next_activity(activity, result).__name__ | |
activity = self.get_activity(state) | |
# Run the final activity. TODO Support TASKs. | |
result = self.run_activity(activity, state, result) | |
if self.verbose and result is not None: | |
print '(FINAL activity has a not-none result: {}.)'.format(result) | |
def get_activity(self, state): | |
for a in [self.initial] + self.transitions.values(): | |
if a.__name__ == state.current_activity: | |
return a | |
def exit(self, msg): | |
exit(msg) | |
def INIT(state): | |
"""Initialization...""" | |
state.count = 0 | |
def SECOND(state): | |
"""Get input (`bye` to exit)...""" | |
state.count += 1 | |
import sys | |
line = sys.stdin.readline().strip() | |
if line == 'bye': | |
return 'BYE' | |
print line | |
def THIRD(state): | |
"""Will wait for a task completion...""" | |
return TASK() | |
def FINAL(state): | |
print 'Count:', state.count | |
return 19137927 | |
transitions = { | |
(INIT, None): SECOND, | |
(SECOND, None): SECOND, | |
(SECOND, 'BYE'): THIRD, | |
(THIRD, 'TASK_RESULT'): FINAL, | |
} | |
example_workflow = Workflow("example", transitions, INIT, [FINAL], verbose=True) | |
if __name__ == '__main__': | |
state = STATE() | |
example_workflow.run(state) | |
example_workflow.step(state, 'TASK_RESULT') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example code using goto.py. | |
""" | |
import goto | |
import random | |
import sys | |
class SHOOT(goto.TASK): | |
def __init__(self, ticks, target, damage): | |
self.ticks = ticks | |
self.target = target | |
self.damage = damage | |
class HIT(goto.TOKEN): | |
def __init__(self, damage): | |
self.damage = damage | |
def __str__(self): | |
return 'HIT {}'.format(self.damage) | |
def INIT(marine): | |
"""Initializing marine...""" | |
marine.life = 100 | |
return goto.TASK() | |
def THINK(marine): | |
"""Thinking...""" | |
if marine.name == 'Sigourney': | |
target = 'Kurt' | |
does_shoot = random.randint(0,3) | |
if does_shoot: | |
damage = random.randint(1,26) | |
return SHOOT(3, target, damage) | |
else: | |
target = 'Sigourney' | |
does_shoot = random.randint(0,1) | |
if does_shoot: | |
damage = random.randint(15,35) | |
return SHOOT(4, target, damage) | |
return goto.TASK() | |
def DAMAGE(marine, projectile): | |
"""Getting damage...""" | |
damage = projectile.damage | |
marine.life -= damage | |
if marine.life < 0: | |
marine.life = 0 | |
return goto.TASK() | |
def FINAL(marine): | |
print 'Marine `{}` is {}'.format(marine.name, marine.life) | |
transitions = { | |
(INIT, 'THINK'): THINK, | |
(THINK, 'THINK'): THINK, | |
(THINK, 'HIT'): DAMAGE, | |
(THINK, 'END'): FINAL, | |
(DAMAGE, 'THINK'): THINK, | |
(DAMAGE, 'END'): FINAL, | |
} | |
marine_workflow = goto.Workflow("marines", transitions, INIT, [FINAL], verbose=True) | |
class Marine(object): | |
def __init__(self, name): | |
self.name = name | |
if __name__ == '__main__': | |
# Initialize all marines. | |
marines = [] | |
for name in ['Kurt', 'Sigourney']: | |
marine = Marine(name) | |
marine_workflow.run(marine) | |
marines.append(marine) | |
def get_marine(name): | |
for marine in marines: | |
if marine.name == name: | |
return marine | |
ticks = 0 | |
projectiles = [] | |
# While they're all alive, let them think, then give them some damage. | |
while all(map(lambda m: m.life, marines)): | |
for marine in marines: | |
task = marine_workflow.step(marine, 'THINK') | |
if isinstance(task, SHOOT): | |
task.ticks = ticks + task.ticks | |
projectiles.append(task) | |
filtered_projectiles = [] | |
for projectile in projectiles: | |
if projectile.ticks == ticks: | |
marine_workflow.step(get_marine(projectile.target), HIT(projectile.damage)) | |
else: | |
filtered_projectiles.append(projectile) | |
ticks += 1 | |
projectiles = filtered_projectiles | |
# At least one marine is dead. It's the end. | |
for marine in marines: | |
marine_workflow.step(marine, 'END') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment