Created
December 13, 2015 21:59
-
-
Save johnsom/4878395e92a82e8c11a6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# decider subflow accessing upstraem storage example | |
# | |
# Using taskflow 1.25.0 | |
# | |
# Output: | |
# MyID | |
# In decider subflow | |
# Took branch A | |
# MyID | |
from taskflow import engines | |
from taskflow import task | |
from taskflow.patterns import linear_flow as lf | |
from taskflow.patterns import graph_flow as gf | |
import logging | |
########################### | |
# Create some example tasks | |
########################### | |
class StoreID(task.Task): | |
def execute(self): | |
return "MyID" | |
class PrintID(task.Task): | |
def execute(self, TheID): | |
print(TheID) | |
class PrintA(task.Task): | |
def execute(self): | |
print('Took branch A') | |
class PrintB(task.Task): | |
def execute(self): | |
print('Took branch B') | |
class PrintInDecider(task.Task): | |
def execute(self): | |
print('In decider subflow') | |
############################### | |
# define my tasks for the flows | |
############################### | |
storetask = StoreID(provides='AnID') | |
# These need to be named different as all of the tasks in a flow must | |
# have unique names, but I want to reuse them | |
printtask = PrintID(name="printtask", | |
requires='TheID', | |
rebind={'TheID': 'AnID'}) | |
printtask2 = PrintID(name="printtask2", | |
requires='TheID', | |
rebind={'TheID': 'AnID'}) | |
printtask3 = PrintID(name="printtask3", | |
requires='TheID', | |
rebind={'TheID': 'AnID'}) | |
print_a_task = PrintA() | |
print_b_task = PrintB() | |
print_in_decider_task = PrintInDecider() | |
################## | |
# define the flows | |
################## | |
MainFlow = lf.Flow("main") | |
# Task that stores data in the flow as 'AnID' | |
MainFlow.add(storetask) | |
MainFlow.add(printtask) | |
# Subflow for branch A | |
BranchAFlow = lf.Flow("branchA") | |
BranchAFlow.add(print_a_task) | |
BranchAFlow.add(printtask2) | |
# Subflow for branch B | |
BranchBFlow = lf.Flow("branchB") | |
BranchBFlow.add(print_b_task) | |
BranchBFlow.add(printtask3) | |
DeciderFlow = gf.Flow("decider") | |
DeciderFlow.add(print_in_decider_task, BranchAFlow, BranchBFlow) | |
DeciderFlow.link(print_in_decider_task, # upstraem task to link to | |
BranchAFlow, # downstream task or subflow | |
decider=lambda history: True) # if true, take the branch | |
DeciderFlow.link(print_in_decider_task, | |
BranchBFlow, | |
decider=lambda history: False) | |
MainFlow.add(DeciderFlow) | |
######## | |
# Run it | |
######## | |
e = engines.load(MainFlow) | |
e.run() |
Author
johnsom
commented
Dec 13, 2015
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment