Skip to content

Instantly share code, notes, and snippets.

@zijianjoy
Last active October 6, 2022 17:33
Show Gist options
  • Save zijianjoy/57ac3e6a507f9664cbdc13d61d5f18da to your computer and use it in GitHub Desktop.
Save zijianjoy/57ac3e6a507f9664cbdc13d61d5f18da to your computer and use it in GitHub Desktop.
dag-pipeline.yaml
components:
comp-args-generator:
executorLabel: exec-args-generator
outputDefinitions:
parameters:
Output:
parameterType: LIST
comp-for-loop-1:
dag:
tasks:
print-text:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-text
inputs:
parameters:
msg:
componentInputParameter: pipelinechannel--loop_parameter-loop-item
taskInfo:
name: print-text
inputDefinitions:
parameters:
pipelinechannel--loop_parameter:
parameterType: LIST
pipelinechannel--loop_parameter-loop-item:
parameterType: STRING
comp-for-loop-2:
dag:
tasks:
print-struct:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-struct
inputs:
parameters:
struct:
componentInputParameter: pipelinechannel--args-generator-Output-loop-item
taskInfo:
name: print-struct
print-text-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-text-2
inputs:
parameters:
msg:
componentInputParameter: pipelinechannel--args-generator-Output-loop-item
parameterExpressionSelector: 'parseJson(string_value)["A_a"]'
taskInfo:
name: print-text-2
print-text-3:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-text-3
inputs:
parameters:
msg:
componentInputParameter: pipelinechannel--args-generator-Output-loop-item
parameterExpressionSelector: 'parseJson(string_value)["B_b"]'
taskInfo:
name: print-text-3
inputDefinitions:
parameters:
pipelinechannel--args-generator-Output:
parameterType: LIST
pipelinechannel--args-generator-Output-loop-item:
parameterType: STRUCT
comp-for-loop-4:
dag:
tasks:
print-struct-2:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-struct-2
inputs:
parameters:
struct:
componentInputParameter: pipelinechannel--loop-item-param-3
taskInfo:
name: print-struct-2
print-text-4:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-text-4
inputs:
parameters:
msg:
componentInputParameter: pipelinechannel--loop-item-param-3
parameterExpressionSelector: 'parseJson(string_value)["A_a"]'
taskInfo:
name: print-text-4
print-text-5:
cachingOptions:
enableCache: true
componentRef:
name: comp-print-text-5
inputs:
parameters:
msg:
componentInputParameter: pipelinechannel--loop-item-param-3
parameterExpressionSelector: 'parseJson(string_value)["B_b"]'
taskInfo:
name: print-text-5
inputDefinitions:
parameters:
pipelinechannel--loop-item-param-3:
parameterType: STRUCT
comp-print-struct:
executorLabel: exec-print-struct
inputDefinitions:
parameters:
struct:
parameterType: STRUCT
comp-print-struct-2:
executorLabel: exec-print-struct-2
inputDefinitions:
parameters:
struct:
parameterType: STRUCT
comp-print-text:
executorLabel: exec-print-text
inputDefinitions:
parameters:
msg:
parameterType: STRING
comp-print-text-2:
executorLabel: exec-print-text-2
inputDefinitions:
parameters:
msg:
parameterType: STRING
comp-print-text-3:
executorLabel: exec-print-text-3
inputDefinitions:
parameters:
msg:
parameterType: STRING
comp-print-text-4:
executorLabel: exec-print-text-4
inputDefinitions:
parameters:
msg:
parameterType: STRING
comp-print-text-5:
executorLabel: exec-print-text-5
inputDefinitions:
parameters:
msg:
parameterType: STRING
deploymentSpec:
executors:
exec-args-generator:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- args_generator
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def args_generator() -> List[Dict[str, str]]:
return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}]
image: 'python:3.7'
exec-print-struct:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_struct
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_struct(struct: Dict):
print(struct)
image: 'python:3.7'
exec-print-struct-2:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_struct
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_struct(struct: Dict):
print(struct)
image: 'python:3.7'
exec-print-text:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_text
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_text(msg: str):
print(msg)
image: 'python:3.7'
exec-print-text-2:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_text
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_text(msg: str):
print(msg)
image: 'python:3.7'
exec-print-text-3:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_text
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_text(msg: str):
print(msg)
image: 'python:3.7'
exec-print-text-4:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_text
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_text(msg: str):
print(msg)
image: 'python:3.7'
exec-print-text-5:
container:
args:
- '--executor_input'
- '{{$}}'
- '--function_to_execute'
- print_text
command:
- sh
- '-c'
- >
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp==2.0.0-beta.5' && "$0" "$@"
- sh
- '-ec'
- >
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.components.executor_main
--component_module_path
"$program_path/ephemeral_component.py" "$@"
- |+
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *
def print_text(msg: str):
print(msg)
image: 'python:3.7'
pipelineInfo:
name: pipeline/dag-pipeline
root:
dag:
tasks:
args-generator:
cachingOptions:
enableCache: true
componentRef:
name: comp-args-generator
taskInfo:
name: args-generator
for-loop-1:
componentRef:
name: comp-for-loop-1
inputs:
parameters:
pipelinechannel--loop_parameter:
componentInputParameter: loop_parameter
parameterIterator:
itemInput: pipelinechannel--loop_parameter-loop-item
items:
inputParameter: pipelinechannel--loop_parameter
taskInfo:
name: for-loop-1
for-loop-2:
componentRef:
name: comp-for-loop-2
dependentTasks:
- args-generator
inputs:
parameters:
pipelinechannel--args-generator-Output:
taskOutputParameter:
outputParameterKey: Output
producerTask: args-generator
parameterIterator:
itemInput: pipelinechannel--args-generator-Output-loop-item
items:
inputParameter: pipelinechannel--args-generator-Output
taskInfo:
name: for-loop-2
for-loop-4:
componentRef:
name: comp-for-loop-4
parameterIterator:
itemInput: pipelinechannel--loop-item-param-3
items:
raw: '[{"A_a": "1", "B_b": "2"}, {"A_a": "10", "B_b": "20"}]'
taskInfo:
name: for-loop-4
inputDefinitions:
artifacts:
input_data:
artifactType:
schemaTitle: system.Dataset
schemaVersion: 0.0.1
parameters:
loop_parameter:
parameterType: LIST
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment