Skip to content

Instantly share code, notes, and snippets.

@xujiboy
Last active May 22, 2020 21:22
Show Gist options
  • Save xujiboy/08224c8a6e5803a5c8faf8888471cc75 to your computer and use it in GitHub Desktop.
Save xujiboy/08224c8a6e5803a5c8faf8888471cc75 to your computer and use it in GitHub Desktop.
A foreach metaflow to test
from metaflow import FlowSpec, step, Parameter
def script_path(filename):
"""
A convenience function to get the absolute path to a file in this
tutorial's directory. This allows the tutorial to be launched from any
directory.
"""
import os
filepath = os.path.join(os.path.dirname(__file__))
return os.path.join(filepath, filename)
class TestForeachFlow(FlowSpec):
"""
A flow to test foreach flow.
The flow performs the following steps:
1) Setup parameters.
2) Fork to foreach step.
3) Alter the value of input c.
4) Joins all the altered c as a list.
5) End.
"""
# initial attributes
init_a = Parameter('init_a',
help = 'the initial attribute a',
default = 100)
@step
def start(self):
"""
Setup parameters.
"""
self.param = dict(foo = 2)
self.next(self.fork)
@step
def fork(self):
"""
Fork to foreach step.
"""
self.c_list = [1,2,3,4]
self.next(self.alter_c, foreach='c_list')
@step
def alter_c(self):
"""
This step alters the value of input c.
"""
self.c = self.input
self.altered_c = self.c + 1000
self.next(self.join)
@step
def join(self, inputs):
"""
This step joins all the altered c as a list
"""
self.altered_c_list = [input.altered_c for input in inputs]
self.merge_artifacts(inputs, exclude=['c', 'altered_c'])
self.foo = self.param.get('foo')
self.result = [i+ self.foo for i in self.altered_c_list]
self.next(self.end)
@step
def end(self):
"""
end.
"""
pass
if __name__ == '__main__':
TestForeachFlow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment