Skip to content

Instantly share code, notes, and snippets.

@johnsom
Created December 13, 2015 21:59
Show Gist options
  • Save johnsom/4878395e92a82e8c11a6 to your computer and use it in GitHub Desktop.
Save johnsom/4878395e92a82e8c11a6 to your computer and use it in GitHub Desktop.
# decider subflow accessing upstraem storage example
#
# Using taskflow 1.25.0
#
# Output:
# MyID
# In decider subflow
# Took branch A
# MyID
from taskflow import engines
from taskflow import task
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import graph_flow as gf
import logging
###########################
# Create some example tasks
###########################
class StoreID(task.Task):
def execute(self):
return "MyID"
class PrintID(task.Task):
def execute(self, TheID):
print(TheID)
class PrintA(task.Task):
def execute(self):
print('Took branch A')
class PrintB(task.Task):
def execute(self):
print('Took branch B')
class PrintInDecider(task.Task):
def execute(self):
print('In decider subflow')
###############################
# define my tasks for the flows
###############################
storetask = StoreID(provides='AnID')
# These need to be named different as all of the tasks in a flow must
# have unique names, but I want to reuse them
printtask = PrintID(name="printtask",
requires='TheID',
rebind={'TheID': 'AnID'})
printtask2 = PrintID(name="printtask2",
requires='TheID',
rebind={'TheID': 'AnID'})
printtask3 = PrintID(name="printtask3",
requires='TheID',
rebind={'TheID': 'AnID'})
print_a_task = PrintA()
print_b_task = PrintB()
print_in_decider_task = PrintInDecider()
##################
# define the flows
##################
MainFlow = lf.Flow("main")
# Task that stores data in the flow as 'AnID'
MainFlow.add(storetask)
MainFlow.add(printtask)
# Subflow for branch A
BranchAFlow = lf.Flow("branchA")
BranchAFlow.add(print_a_task)
BranchAFlow.add(printtask2)
# Subflow for branch B
BranchBFlow = lf.Flow("branchB")
BranchBFlow.add(print_b_task)
BranchBFlow.add(printtask3)
DeciderFlow = gf.Flow("decider")
DeciderFlow.add(print_in_decider_task, BranchAFlow, BranchBFlow)
DeciderFlow.link(print_in_decider_task, # upstraem task to link to
BranchAFlow, # downstream task or subflow
decider=lambda history: True) # if true, take the branch
DeciderFlow.link(print_in_decider_task,
BranchBFlow,
decider=lambda history: False)
MainFlow.add(DeciderFlow)
########
# Run it
########
e = engines.load(MainFlow)
e.run()
@johnsom
Copy link
Author

johnsom commented Dec 13, 2015

decidersample

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment