Created
December 14, 2015 00:16
-
-
Save johnsom/5ac6a9bd7d814743eab7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# decider subflow accessing upstraem storage example | |
# | |
# Using taskflow 1.25.0 | |
# | |
# Output: | |
# MyID | |
# In decider subflow | |
# Took branch A | |
# MyID | |
from taskflow import engines | |
from taskflow import task | |
from taskflow.patterns import linear_flow as lf | |
from taskflow.patterns import graph_flow as gf | |
import logging | |
########################### | |
# Create some example tasks | |
########################### | |
class StoreID(task.Task): | |
def execute(self): | |
return "MyID" | |
class PrintID(task.Task): | |
def execute(self, AnID): | |
print(AnID) | |
class PrintA(task.Task): | |
def execute(self): | |
print('Took branch A') | |
class PrintB(task.Task): | |
def execute(self): | |
print('Took branch B') | |
class PrintInDecider(task.Task): | |
def execute(self): | |
print('In decider subflow') | |
############################### | |
# define my tasks for the flows | |
############################### | |
storetask = StoreID(provides='AnID') | |
# These need to be named different as all of the tasks in a flow must | |
# have unique names, but I want to reuse them | |
printtask = PrintID(name="printtask", | |
requires='AnID') | |
printtask2 = PrintID(name="printtask2", | |
requires='AnID') | |
printtask3 = PrintID(name="printtask3", | |
requires='AnID') | |
print_a_task = PrintA() | |
print_b_task = PrintB() | |
print_in_decider_task = PrintInDecider() | |
################## | |
# define the flows | |
################## | |
MainFlow = lf.Flow("main") | |
# Task that stores data in the flow as 'AnID' | |
MainFlow.add(storetask) | |
MainFlow.add(printtask) | |
# Subflow for branch A | |
BranchAFlow = lf.Flow("branchA") | |
BranchAFlow.add(print_a_task) | |
BranchAFlow.add(printtask2) | |
# Subflow for branch B | |
BranchBFlow = lf.Flow("branchB") | |
BranchBFlow.add(print_b_task) | |
BranchBFlow.add(printtask3) | |
DeciderFlow = gf.Flow("decider") | |
DeciderFlow.add(print_in_decider_task, BranchAFlow, BranchBFlow) | |
DeciderFlow.link(print_in_decider_task, # upstraem task to link to | |
BranchAFlow, # downstream task or subflow | |
decider=lambda history: True) # if true, take the branch | |
DeciderFlow.link(print_in_decider_task, | |
BranchBFlow, | |
decider=lambda history: False) | |
MainFlow.add(DeciderFlow) | |
######## | |
# Run it | |
######## | |
e = engines.load(MainFlow) | |
e.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment