Skip to content

Instantly share code, notes, and snippets.

@EngHabu
Created July 7, 2020 15:37
Show Gist options
  • Save EngHabu/d1faab2a9088434aec3ea467b5dcf690 to your computer and use it in GitHub Desktop.
Save EngHabu/d1faab2a9088434aec3ea467b5dcf690 to your computer and use it in GitHub Desktop.
@inputs(in_str=Types.String)
@dynamic_task
def dynamic_wf_task(wf_params, in_str):
wf_params.logging.info("Running dynamic task... yielding a code generated sub workflow")
res = []
nodes = {}
previous_node = None
i = 0
for s in in_str:
current_node = other_task(command=s)
nodes['node-{}'.format(i++)] = current_node
# This piece of code adds execution depedency between nodes to force them to run sequentially.
if previous_node is not None:
previous_node >> current_node
previous_node = current_node
MyUnregisteredWorkflow = workflow(
inputs={},
outputs={},
nodes=nodes,
)
# This is an unfortunate setting that will hopefully not be necessary in the future.
setattr(MyUnregisteredWorkflow, 'auto_assign_name', manual_assign_name)
MyUnregisteredWorkflow._platform_valid_name = 'unregistered'
yield MyUnregisteredWorkflow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment