Skip to content

Instantly share code, notes, and snippets.

@binarycrayon
Last active March 9, 2020 18:02
Show Gist options
  • Save binarycrayon/75af90c1cdf660333f9903cd5822245d to your computer and use it in GitHub Desktop.
Save binarycrayon/75af90c1cdf660333f9903cd5822245d to your computer and use it in GitHub Desktop.
argo dsl foreach proposal

foreach macro proposal

I am proposing for a new module that provides a foreach macro. foreach terminology is frequently used in hadoop/sparx/mapreduce pipeline world, so the concept may translate quickly to those who are already have experience with that.

The goal for 'foreach' macro is to allow users to generate tasks dynamically with the list of parameters provided.

syntax

the foreach takes in kwargs in a form of foreach(param=[values..], param2=[values...], ..), which provides the set of parameters to generate splits from. Then, for each combination of parameters in the splits, we generate models using closures.

we use a closure function to pass in the parameters that provides the splits, then define the actual model (such as task) in nested functions.

An example dag foreach_dag.py were provided to showcase an example of the foreach decorator. The actual usage is at line 31.

The benefit of this syntaxs allows users to consistently specify how tasks/models are generated while preserving the way tasks were specified in argo dsl.

In summary, the foreach macro and closure should only relay the parameter splits populate multiple argo models generated by the nested function.

In addition to that, I also think of a design to allow users join tasks by specifying dynamic dependencies against argo models generated by foreach macros.

The syntax looks like @foreach.dependencies(name=closure_function_name), see line 58 of foreach_dag.py.

Current State and Challenges

I am still in the process of understanding the meta programming models implemented in Argo dsl.

A work in progress, proof of concept implementation is in macro.py. I am struggling to figure out how to relay the "models" generated from nested @task decorated functions to parant Workflow spec. Maybe by setting __props__ to the closure function can get picked up by _compile from Workflow class? Then I am also not sure how that can be done at the moment.

Please let me know if this is a feasible design (whether I am on the right path). I'd like to get your feedback and help before I keep going further. Thank you!!

from kubernetes.client.models.v1_container import V1Container
from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.macros import foreach
from argo.workflows.dsl.templates import *
load_kube_config()
api = V1alpha1Api()
class DagDiamond(Workflow):
@task
@parameter(name="message", value="A")
def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name='abc', value='haha')
@parameter(name="message", value="B")
@dependencies(["A"])
def B(self, abc:V1alpha1Parameter, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@foreach(message=['hello', 'world'])
def tasks(message=None):
"""
should populate multiple tasks per value for message
{'arguments': {'parameters': [{'name': 'message',
'value': 'hello'}]},
'dependencies': ['A'],
'name': 'C_MESSAGE_HELLO',
'template': 'echo'},
{'arguments': {'parameters': [{'name': 'message',
'value': 'world'}]},
'dependencies': ['A'],
'name': 'C_MESSAGE_WORLD',
'template': 'echo'},
"""
@task
@parameter(name="message", value=message)
@dependencies(["A"])
def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
return C
@task
@parameter(name="message", value="D")
@dependencies(["B", "C"])
#@foreach.dependencies(name='tasks')
def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@template
@inputs.parameter(name="message")
def echo(self, message: V1alpha1Parameter) -> V1Container:
container = V1Container(
image="alpine:3.7",
name="echo",
command=["echo", "{{inputs.parameters.message}}"],
)
return container
wf = DagDiamond()
wf.name = ''
print(wf.to_yaml)
# result = api.create_namespaced_workflow('mwong', wf)
# print(result.metadata.name)
import itertools
def split_fn(**kwargs):
"""
Generate splits based on the product of multiple parameter values.
e.g. for kwarg specified beflow:
{
param_1 : [1,2]
param_2 : [2,3]
}
generate entries of all permutations of param_1 * param_2
[{param_1:1, param_2:2}, {param_1:2, param_2:2}..]
"""
all_param_values = []
for param, values in kwargs.items():
param_values = []
for val in values:
param_values.append((param, val))
all_param_values.append(param_values)
return [dict(param_pairs) for param_pairs in \
itertools.product(*all_param_values)]
class foreach():
"""Decorator example defined entirely as class."""
def __init__(self, *args, **kwargs):
_split_fn = args[0] if args else split_fn
self.split_functions = split_fn(**kwargs)
def __call__(self, *args, **kw):
func = args[0]
# klass_name = func.__qualname__.split('.')[0]
# klass = globals()[klass_name]
# print(id(klass))
for i, kwargs in enumerate(self.split_functions):
function = func(**kwargs)
# param_name_value permutations e.g. param_1_1_param_2_2_param_3_4
name = '_'.join(map(lambda x: ('{0}_{1}'.format(*x)), kwargs.items()))
# C => C_param_1
function.name = name
# setattr(Animal, f"{function.__name__}_{i}", function)
# setattr(Animal, name, function)
# function.__name__ = name
function.__doc__ = 'generated'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment