Skip to content

Instantly share code, notes, and snippets.

@johnsom
Created December 14, 2015 00:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnsom/5ac6a9bd7d814743eab7 to your computer and use it in GitHub Desktop.
Save johnsom/5ac6a9bd7d814743eab7 to your computer and use it in GitHub Desktop.
# decider subflow accessing upstraem storage example
#
# Using taskflow 1.25.0
#
# Output:
# MyID
# In decider subflow
# Took branch A
# MyID
from taskflow import engines
from taskflow import task
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import graph_flow as gf
import logging
###########################
# Create some example tasks
###########################
class StoreID(task.Task):
def execute(self):
return "MyID"
class PrintID(task.Task):
def execute(self, AnID):
print(AnID)
class PrintA(task.Task):
def execute(self):
print('Took branch A')
class PrintB(task.Task):
def execute(self):
print('Took branch B')
class PrintInDecider(task.Task):
def execute(self):
print('In decider subflow')
###############################
# define my tasks for the flows
###############################
storetask = StoreID(provides='AnID')
# These need to be named different as all of the tasks in a flow must
# have unique names, but I want to reuse them
printtask = PrintID(name="printtask",
requires='AnID')
printtask2 = PrintID(name="printtask2",
requires='AnID')
printtask3 = PrintID(name="printtask3",
requires='AnID')
print_a_task = PrintA()
print_b_task = PrintB()
print_in_decider_task = PrintInDecider()
##################
# define the flows
##################
MainFlow = lf.Flow("main")
# Task that stores data in the flow as 'AnID'
MainFlow.add(storetask)
MainFlow.add(printtask)
# Subflow for branch A
BranchAFlow = lf.Flow("branchA")
BranchAFlow.add(print_a_task)
BranchAFlow.add(printtask2)
# Subflow for branch B
BranchBFlow = lf.Flow("branchB")
BranchBFlow.add(print_b_task)
BranchBFlow.add(printtask3)
DeciderFlow = gf.Flow("decider")
DeciderFlow.add(print_in_decider_task, BranchAFlow, BranchBFlow)
DeciderFlow.link(print_in_decider_task, # upstraem task to link to
BranchAFlow, # downstream task or subflow
decider=lambda history: True) # if true, take the branch
DeciderFlow.link(print_in_decider_task,
BranchBFlow,
decider=lambda history: False)
MainFlow.add(DeciderFlow)
########
# Run it
########
e = engines.load(MainFlow)
e.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment