Skip to content

Instantly share code, notes, and snippets.

@knabben
Last active April 26, 2021 01:08
Show Gist options
  • Save knabben/e706a32bd82e87dfd774114444231f29 to your computer and use it in GitHub Desktop.
Save knabben/e706a32bd82e87dfd774114444231f29 to your computer and use it in GitHub Desktop.
kind: Workflow
metadata:
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.0.4}
generateName: conditional-execution-pipeline-with-exit-handler-
annotations: {pipelines.kubeflow.org/pipeline_compilation_time: '2020-10-22T10:08:22.179176',
pipelines.kubeflow.org/pipeline_spec: '{"description": "Shows how to use dsl.Condition()
and dsl.ExitHandler().", "name": "Conditional execution pipeline with exit handler"}',
pipelines.kubeflow.org/kfp_sdk_version: 1.0.4}
spec:
onExit: print-op
entrypoint: conditional-execution-pipeline-with-exit-handler
templates:
- name: condition-2
dag:
tasks:
- when: '{{tasks.get-random-int-op.outputs.parameters.get-random-int-op-Output}}
> 5'
name: condition-3
dependencies: [get-random-int-op]
arguments:
parameters:
- {value: '{{tasks.get-random-int-op.outputs.parameters.get-random-int-op-Output}}',
name: get-random-int-op-Output}
template: condition-3
- when: '{{tasks.get-random-int-op.outputs.parameters.get-random-int-op-Output}}
<= 5'
name: condition-4
dependencies: [get-random-int-op]
arguments:
parameters:
- {value: '{{tasks.get-random-int-op.outputs.parameters.get-random-int-op-Output}}',
name: get-random-int-op-Output}
template: condition-4
- {template: get-random-int-op, name: get-random-int-op}
- name: condition-3
inputs:
parameters:
- {name: get-random-int-op-Output}
dag:
tasks:
- template: print-op-2
arguments:
parameters:
- {value: '{{inputs.parameters.get-random-int-op-Output}}', name: get-random-int-op-Output}
name: print-op-2
- name: condition-4
inputs:
parameters:
- {name: get-random-int-op-Output}
dag:
tasks:
- template: print-op-3
arguments:
parameters:
- {value: '{{inputs.parameters.get-random-int-op-Output}}', name: get-random-int-op-Output}
name: print-op-3
- name: condition-5
dag:
tasks:
- when: '{{tasks.get-random-int-op-2.outputs.parameters.get-random-int-op-2-Output}}
> 15'
name: condition-6
dependencies: [get-random-int-op-2]
arguments:
parameters:
- {value: '{{tasks.get-random-int-op-2.outputs.parameters.get-random-int-op-2-Output}}',
name: get-random-int-op-2-Output}
template: condition-6
- when: '{{tasks.get-random-int-op-2.outputs.parameters.get-random-int-op-2-Output}}
<= 15'
name: condition-7
dependencies: [get-random-int-op-2]
arguments:
parameters:
- {value: '{{tasks.get-random-int-op-2.outputs.parameters.get-random-int-op-2-Output}}',
name: get-random-int-op-2-Output}
template: condition-7
- {template: get-random-int-op-2, name: get-random-int-op-2}
- name: condition-6
inputs:
parameters:
- {name: get-random-int-op-2-Output}
dag:
tasks:
- template: print-op-4
arguments:
parameters:
- {value: '{{inputs.parameters.get-random-int-op-2-Output}}', name: get-random-int-op-2-Output}
name: print-op-4
- name: condition-7
inputs:
parameters:
- {name: get-random-int-op-2-Output}
dag:
tasks:
- template: print-op-5
arguments:
parameters:
- {value: '{{inputs.parameters.get-random-int-op-2-Output}}', name: get-random-int-op-2-Output}
name: print-op-5
- name: condition-8
dag:
tasks:
- {template: fail-op, name: fail-op}
- name: conditional-execution-pipeline-with-exit-handler
dag:
tasks:
- {template: exit-handler-1, name: exit-handler-1}
- name: exit-handler-1
dag:
tasks:
- when: '"{{tasks.flip-coin-op.outputs.parameters.flip-coin-op-Output}}" ==
"heads"'
name: condition-2
dependencies: [flip-coin-op]
template: condition-2
- when: '"{{tasks.flip-coin-op.outputs.parameters.flip-coin-op-Output}}" ==
"tails"'
name: condition-5
dependencies: [flip-coin-op]
template: condition-5
- when: '"{{tasks.flip-coin-op.outputs.parameters.flip-coin-op-Output}}" ==
"tails"'
name: condition-8
dependencies: [flip-coin-op]
template: condition-8
- {template: flip-coin-op, name: flip-coin-op}
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Fails.", "implementation": {"container": {"args": ["--message", {"inputValue":
"message"}], "command": ["python3", "-u", "-c", "def fail_op(message):\n \"\"\"Fails.\"\"\"\n import
sys\n print(message) \n sys.exit(1)\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Fail op'', description=''Fails.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = fail_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message"}], "name": "Fail
op"}'}
container:
image: python:3.7
command:
- python3
- -u
- -c
- "def fail_op(message):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\
\ \n sys.exit(1)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Fail\
\ op', description='Fails.')\n_parser.add_argument(\"--message\", dest=\"\
message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args\
\ = vars(_parser.parse_args())\n\n_outputs = fail_op(**_parsed_args)\n"
args: [--message, Failing the run to demonstrate that exit handler still gets
executed.]
name: fail-op
- outputs:
parameters:
- valueFrom: {path: /tmp/outputs/Output/data}
name: flip-coin-op-Output
artifacts:
- {path: /tmp/outputs/Output/data, name: flip-coin-op-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def flip_coin_op():
"""Flip a coin and output heads or tails randomly."""
import random
result = random.choice(['heads', 'tails'])
print(result)
return result
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
return str_value
import argparse
_parser = argparse.ArgumentParser(prog='Flip coin op', description='Flip a coin and output heads or tails randomly.')
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = flip_coin_op(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args: ['----output-paths', /tmp/outputs/Output/data]
metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Flip a coin and output heads or tails randomly.", "implementation": {"container":
{"args": ["----output-paths", {"outputPath": "Output"}], "command": ["python3",
"-u", "-c", "def flip_coin_op():\n \"\"\"Flip a coin and output heads
or tails randomly.\"\"\"\n import random\n result = random.choice([''heads'',
''tails''])\n print(result)\n return result\n\ndef _serialize_str(str_value:
str) -> str:\n if not isinstance(str_value, str):\n raise TypeError(''Value
\"{}\" has type \"{}\" instead of str.''.format(str(str_value), str(type(str_value))))\n return
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Flip
coin op'', description=''Flip a coin and output heads or tails randomly.'')\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = flip_coin_op(**_parsed_args)\n\n_outputs
= [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "name": "Flip coin op", "outputs": [{"name": "Output",
"type": "String"}]}'}
name: flip-coin-op
- outputs:
parameters:
- valueFrom: {path: /tmp/outputs/Output/data}
name: get-random-int-op-Output
artifacts:
- {path: /tmp/outputs/Output/data, name: get-random-int-op-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def get_random_int_op(minimum, maximum):
"""Generate a random number between minimum and maximum (inclusive)."""
import random
result = random.randint(minimum, maximum)
print(result)
return result
def _serialize_int(int_value: int) -> str:
if isinstance(int_value, str):
return int_value
if not isinstance(int_value, int):
raise TypeError('Value "{}" has type "{}" instead of int.'.format(str(int_value), str(type(int_value))))
return str(int_value)
import argparse
_parser = argparse.ArgumentParser(prog='Get random int op', description='Generate a random number between minimum and maximum (inclusive).')
_parser.add_argument("--minimum", dest="minimum", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--maximum", dest="maximum", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = get_random_int_op(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_int,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args: [--minimum, '0', --maximum, '9', '----output-paths', /tmp/outputs/Output/data]
metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Generate a random number between minimum and maximum (inclusive).", "implementation":
{"container": {"args": ["--minimum", {"inputValue": "minimum"}, "--maximum",
{"inputValue": "maximum"}, "----output-paths", {"outputPath": "Output"}],
"command": ["python3", "-u", "-c", "def get_random_int_op(minimum, maximum):\n \"\"\"Generate
a random number between minimum and maximum (inclusive).\"\"\"\n import
random\n result = random.randint(minimum, maximum)\n print(result)\n return
result\n\ndef _serialize_int(int_value: int) -> str:\n if isinstance(int_value,
str):\n return int_value\n if not isinstance(int_value, int):\n raise
TypeError(''Value \"{}\" has type \"{}\" instead of int.''.format(str(int_value),
str(type(int_value))))\n return str(int_value)\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Get random int op'', description=''Generate
a random number between minimum and maximum (inclusive).'')\n_parser.add_argument(\"--minimum\",
dest=\"minimum\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--maximum\",
dest=\"maximum\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = get_random_int_op(**_parsed_args)\n\n_outputs
= [_outputs]\n\n_output_serializers = [\n _serialize_int,\n\n]\n\nimport
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "inputs": [{"name": "minimum", "type": "Integer"},
{"name": "maximum", "type": "Integer"}], "name": "Get random int op", "outputs":
[{"name": "Output", "type": "Integer"}]}'}
name: get-random-int-op
- outputs:
parameters:
- valueFrom: {path: /tmp/outputs/Output/data}
name: get-random-int-op-2-Output
artifacts:
- {path: /tmp/outputs/Output/data, name: get-random-int-op-2-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def get_random_int_op(minimum, maximum):
"""Generate a random number between minimum and maximum (inclusive)."""
import random
result = random.randint(minimum, maximum)
print(result)
return result
def _serialize_int(int_value: int) -> str:
if isinstance(int_value, str):
return int_value
if not isinstance(int_value, int):
raise TypeError('Value "{}" has type "{}" instead of int.'.format(str(int_value), str(type(int_value))))
return str(int_value)
import argparse
_parser = argparse.ArgumentParser(prog='Get random int op', description='Generate a random number between minimum and maximum (inclusive).')
_parser.add_argument("--minimum", dest="minimum", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--maximum", dest="maximum", type=int, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = get_random_int_op(**_parsed_args)
_outputs = [_outputs]
_output_serializers = [
_serialize_int,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args: [--minimum, '10', --maximum, '19', '----output-paths', /tmp/outputs/Output/data]
metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Generate a random number between minimum and maximum (inclusive).", "implementation":
{"container": {"args": ["--minimum", {"inputValue": "minimum"}, "--maximum",
{"inputValue": "maximum"}, "----output-paths", {"outputPath": "Output"}],
"command": ["python3", "-u", "-c", "def get_random_int_op(minimum, maximum):\n \"\"\"Generate
a random number between minimum and maximum (inclusive).\"\"\"\n import
random\n result = random.randint(minimum, maximum)\n print(result)\n return
result\n\ndef _serialize_int(int_value: int) -> str:\n if isinstance(int_value,
str):\n return int_value\n if not isinstance(int_value, int):\n raise
TypeError(''Value \"{}\" has type \"{}\" instead of int.''.format(str(int_value),
str(type(int_value))))\n return str(int_value)\n\nimport argparse\n_parser
= argparse.ArgumentParser(prog=''Get random int op'', description=''Generate
a random number between minimum and maximum (inclusive).'')\n_parser.add_argument(\"--minimum\",
dest=\"minimum\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--maximum\",
dest=\"maximum\", type=int, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = get_random_int_op(**_parsed_args)\n\n_outputs
= [_outputs]\n\n_output_serializers = [\n _serialize_int,\n\n]\n\nimport
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "inputs": [{"name": "minimum", "type": "Integer"},
{"name": "maximum", "type": "Integer"}], "name": "Get random int op", "outputs":
[{"name": "Output", "type": "Integer"}]}'}
name: get-random-int-op-2
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Print a message.", "implementation": {"container": {"args": ["--message",
{"inputValue": "message"}], "command": ["python3", "-u", "-c", "def print_op(message):\n \"\"\"Print
a message.\"\"\"\n print(message)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Print
op'', description=''Print a message.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = print_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message", "type": "String"}],
"name": "Print op"}'}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def print_op(message):
"""Print a message."""
print(message)
import argparse
_parser = argparse.ArgumentParser(prog='Print op', description='Print a message.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = print_op(**_parsed_args)
args: [--message, Exit handler has worked!]
name: print-op
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Print a message.", "implementation": {"container": {"args": ["--message",
{"inputValue": "message"}], "command": ["python3", "-u", "-c", "def print_op(message):\n \"\"\"Print
a message.\"\"\"\n print(message)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Print
op'', description=''Print a message.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = print_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message", "type": "String"}],
"name": "Print op"}'}
inputs:
parameters:
- {name: get-random-int-op-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def print_op(message):
"""Print a message."""
print(message)
import argparse
_parser = argparse.ArgumentParser(prog='Print op', description='Print a message.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = print_op(**_parsed_args)
args: [--message, 'heads and {{inputs.parameters.get-random-int-op-Output}}
> 5!']
name: print-op-2
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Print a message.", "implementation": {"container": {"args": ["--message",
{"inputValue": "message"}], "command": ["python3", "-u", "-c", "def print_op(message):\n \"\"\"Print
a message.\"\"\"\n print(message)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Print
op'', description=''Print a message.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = print_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message", "type": "String"}],
"name": "Print op"}'}
inputs:
parameters:
- {name: get-random-int-op-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def print_op(message):
"""Print a message."""
print(message)
import argparse
_parser = argparse.ArgumentParser(prog='Print op', description='Print a message.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = print_op(**_parsed_args)
args: [--message, 'heads and {{inputs.parameters.get-random-int-op-Output}}
<= 5!']
name: print-op-3
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Print a message.", "implementation": {"container": {"args": ["--message",
{"inputValue": "message"}], "command": ["python3", "-u", "-c", "def print_op(message):\n \"\"\"Print
a message.\"\"\"\n print(message)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Print
op'', description=''Print a message.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = print_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message", "type": "String"}],
"name": "Print op"}'}
inputs:
parameters:
- {name: get-random-int-op-2-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def print_op(message):
"""Print a message."""
print(message)
import argparse
_parser = argparse.ArgumentParser(prog='Print op', description='Print a message.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = print_op(**_parsed_args)
args: [--message, 'tails and {{inputs.parameters.get-random-int-op-2-Output}}
> 15!']
name: print-op-4
- metadata:
annotations: {pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/component_spec: '{"description":
"Print a message.", "implementation": {"container": {"args": ["--message",
{"inputValue": "message"}], "command": ["python3", "-u", "-c", "def print_op(message):\n \"\"\"Print
a message.\"\"\"\n print(message)\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Print
op'', description=''Print a message.'')\n_parser.add_argument(\"--message\",
dest=\"message\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
= vars(_parser.parse_args())\n\n_outputs = print_op(**_parsed_args)\n"],
"image": "python:3.7"}}, "inputs": [{"name": "message", "type": "String"}],
"name": "Print op"}'}
inputs:
parameters:
- {name: get-random-int-op-2-Output}
container:
image: python:3.7
command:
- python3
- -u
- -c
- |
def print_op(message):
"""Print a message."""
print(message)
import argparse
_parser = argparse.ArgumentParser(prog='Print op', description='Print a message.')
_parser.add_argument("--message", dest="message", type=str, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = print_op(**_parsed_args)
args: [--message, 'tails and {{inputs.parameters.get-random-int-op-2-Output}}
<= 15!']
name: print-op-5
arguments:
parameters: []
serviceAccountName: pipeline-runner
apiVersion: argoproj.io/v1alpha1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment