Skip to content

Instantly share code, notes, and snippets.

@aronchick
Created April 29, 2022 19:32
Show Gist options
  • Save aronchick/473060503ae189b360fbded04d802c80 to your computer and use it in GitHub Desktop.
Save aronchick/473060503ae189b360fbded04d802c80 to your computer and use it in GitHub Desktop.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: root-pipeline-compilation-
annotations:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline_compilation_time: "2022-04-29T19:09:44.296461"
pipelines.kubeflow.org/pipeline_spec:
'{"inputs": [{"default": "", "name": "context",
"optional": true, "type": "String"}, {"default": "", "name": "metadata_url",
"optional": true, "type": "String"}, {"default": "", "name": "pipeline-root"},
{"default": "pipeline/root_pipeline_compilation", "name": "pipeline-name"}],
"name": "root_pipeline_compilation"}'
pipelines.kubeflow.org/v2_pipeline: "true"
labels:
pipelines.kubeflow.org/v2_pipeline: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
spec:
entrypoint: root-pipeline-compilation
templates:
- name: root-pipeline-compilation
inputs:
parameters:
- { name: metadata_url }
- { name: pipeline-name }
- { name: pipeline-root }
dag:
tasks:
- name: run-info-fn
template: run-info-fn
arguments:
parameters:
- {
name: pipeline-name,
value: "{{inputs.parameters.pipeline-name}}",
}
- {
name: pipeline-root,
value: "{{inputs.parameters.pipeline-root}}",
}
- name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
template: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
dependencies: [run-info-fn]
arguments:
parameters:
- {
name: metadata_url,
value: "{{inputs.parameters.metadata_url}}",
}
- {
name: pipeline-name,
value: "{{inputs.parameters.pipeline-name}}",
}
- {
name: pipeline-root,
value: "{{inputs.parameters.pipeline-root}}",
}
- {
name: run-info-fn-run_info,
value: "{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}",
}
- name: same-step-001-45bda231c8c94938af131969af2dced5-fn
template: same-step-001-45bda231c8c94938af131969af2dced5-fn
dependencies:
[run-info-fn, same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn]
arguments:
parameters:
- {
name: metadata_url,
value: "{{inputs.parameters.metadata_url}}",
}
- {
name: pipeline-name,
value: "{{inputs.parameters.pipeline-name}}",
}
- {
name: pipeline-root,
value: "{{inputs.parameters.pipeline-root}}",
}
- {
name: run-info-fn-run_info,
value: "{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}",
}
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
value: "{{tasks.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn.outputs.parameters.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path}}",
}
- name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
template: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
dependencies:
[run-info-fn, same-step-001-45bda231c8c94938af131969af2dced5-fn]
arguments:
parameters:
- {
name: metadata_url,
value: "{{inputs.parameters.metadata_url}}",
}
- {
name: pipeline-name,
value: "{{inputs.parameters.pipeline-name}}",
}
- {
name: pipeline-root,
value: "{{inputs.parameters.pipeline-root}}",
}
- {
name: run-info-fn-run_info,
value: "{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}",
}
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
value: "{{tasks.same-step-001-45bda231c8c94938af131969af2dced5-fn.outputs.parameters.same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path}}",
}
- name: run-info-fn
container:
args:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp' 'dill' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def run_info_fn(
run_id: str,
) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
from base64 import urlsafe_b64encode
from collections import namedtuple
import datetime
import base64
import dill
import kfp
client = kfp.Client(host="http://ml-pipeline:8888")
run_info = client.get_run(run_id=run_id)
run_info_dict = {
"run_id": run_info.run.id,
"name": run_info.run.name,
"created_at": run_info.run.created_at.isoformat(),
"pipeline_id": run_info.run.pipeline_spec.pipeline_id,
}
# Track kubernetes resources associated wth the run.
for r in run_info.run.resource_references:
run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id
# Base64-encoded as value is visible in kubeflow ui.
output = urlsafe_b64encode(dill.dumps(run_info_dict))
return namedtuple("RunInfoOutput", ["run_info"])(
str(output, encoding="ascii")
)
- --executor_input
- "{{$}}"
- --function_to_execute
- run_info_fn
command:
[
/kfp-launcher/launch,
--mlmd_server_address,
$(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port,
$(METADATA_GRPC_SERVICE_PORT),
--runtime_info_json,
$(KFP_V2_RUNTIME_INFO),
--container_image,
$(KFP_V2_IMAGE),
--task_name,
run-info-fn,
--pipeline_name,
"{{inputs.parameters.pipeline-name}}",
--run_id,
$(KFP_RUN_ID),
--run_resource,
workflows.argoproj.io/$(WORKFLOW_ID),
--namespace,
$(KFP_NAMESPACE),
--pod_name,
$(KFP_POD_NAME),
--pod_uid,
$(KFP_POD_UID),
--pipeline_root,
"{{inputs.parameters.pipeline-root}}",
--enable_caching,
$(ENABLE_CACHING),
--,
"run_id={{workflow.uid}}",
--,
]
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: { fieldPath: metadata.name }
- name: KFP_POD_UID
valueFrom:
fieldRef: { fieldPath: metadata.uid }
- name: KFP_NAMESPACE
valueFrom:
fieldRef: { fieldPath: metadata.namespace }
- name: WORKFLOW_ID
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['workflows.argoproj.io/workflow']",
}
- name: KFP_RUN_ID
valueFrom:
fieldRef: { fieldPath: "metadata.labels['pipeline/runid']" }
- name: ENABLE_CACHING
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['pipelines.kubeflow.org/enable_caching']",
}
- { name: KFP_V2_IMAGE, value: "python:3.7" }
- {
name: KFP_V2_RUNTIME_INFO,
value: '{"inputParameters": {"run_id": {"type":
"STRING"}}, "inputArtifacts": {}, "outputParameters": {"run_info": {"type":
"STRING", "path": "/tmp/outputs/run_info/data"}}, "outputArtifacts": {}}',
}
envFrom:
- configMapRef: { name: metadata-grpc-configmap, optional: true }
image: python:3.7
volumeMounts:
- { mountPath: /kfp-launcher, name: kfp-launcher }
inputs:
parameters:
- { name: pipeline-name }
- { name: pipeline-root }
outputs:
parameters:
- name: run-info-fn-run_info
valueFrom: { path: /tmp/outputs/run_info/data }
artifacts:
- { name: run-info-fn-run_info, path: /tmp/outputs/run_info/data }
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_ref: "{}"
pipelines.kubeflow.org/arguments.parameters: '{"run_id": "{{workflow.uid}}"}'
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/enable_caching: "true"
initContainers:
- command: [launcher, --copy, /kfp-launcher/launch]
image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- { name: kfp-launcher }
- name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
container:
args:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASVTQIAAAAAAABYRgIAAGRhdGFzZXQgPSAnc2FtcGxlX2RhdGEnCmdwdV90eXBlID0gJ0ExMDAnCmltcG9ydCB0ZW5zb3JmbG93CmltcG9ydCBkYXRldGltZQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSAxMApiID0gYSArIDUgIzE1CmZyb20gSVB5dGhvbi5kaXNwbGF5IGltcG9ydCBJbWFnZQoKdXJsID0gJ2h0dHBzOi8vcmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbS9TQU1FLVByb2plY3QvU0FNRS1zYW1wbGVzL21haW4vdGVzdC1hcnRpZmFjdHMvRmFyb2VJc2xhbmRzLmpwZWcnCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIzI1Cgpmcm9tIElQeXRob24gaW1wb3J0IGRpc3BsYXkKZGlzcGxheS5JbWFnZSh1cmwpCgppbXBvcnQgcGxvdGx5CgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKZGVmIHNvbWVfbWF0aCh4LCB6KSAtPiB0dXBsZToKICAgIHJldHVybiAocm91bmQoeCArIHosIDIpLCByb3VuZCh4IC8geiwgMikpCgphID0gYSAqIDIwCmIgPSBiICogMTAwICMyNTAwCgpwcmludChmIkIgPSB7Yn0iKZQu"))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
- --executor_input
- "{{$}}"
- --function_to_execute
- same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn
command:
[
/kfp-launcher/launch,
--mlmd_server_address,
$(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port,
$(METADATA_GRPC_SERVICE_PORT),
--runtime_info_json,
$(KFP_V2_RUNTIME_INFO),
--container_image,
$(KFP_V2_IMAGE),
--task_name,
same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn,
--pipeline_name,
"{{inputs.parameters.pipeline-name}}",
--run_id,
$(KFP_RUN_ID),
--run_resource,
workflows.argoproj.io/$(WORKFLOW_ID),
--namespace,
$(KFP_NAMESPACE),
--pod_name,
$(KFP_POD_NAME),
--pod_uid,
$(KFP_POD_UID),
--pipeline_root,
"{{inputs.parameters.pipeline-root}}",
--enable_caching,
$(ENABLE_CACHING),
--,
input_context_path=,
"metadata_url={{inputs.parameters.metadata_url}}",
"run_info={{inputs.parameters.run-info-fn-run_info}}",
--,
]
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: { fieldPath: metadata.name }
- name: KFP_POD_UID
valueFrom:
fieldRef: { fieldPath: metadata.uid }
- name: KFP_NAMESPACE
valueFrom:
fieldRef: { fieldPath: metadata.namespace }
- name: WORKFLOW_ID
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['workflows.argoproj.io/workflow']",
}
- name: KFP_RUN_ID
valueFrom:
fieldRef: { fieldPath: "metadata.labels['pipeline/runid']" }
- name: ENABLE_CACHING
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['pipelines.kubeflow.org/enable_caching']",
}
- { name: KFP_V2_IMAGE, value: "library/python:3.9-slim-buster" }
- {
name: KFP_V2_RUNTIME_INFO,
value: '{"inputParameters": {"input_context_path":
{"type": "STRING"}, "metadata_url": {"type": "STRING"}, "run_info": {"type":
"STRING"}}, "inputArtifacts": {}, "outputParameters": {"output_context_path":
{"type": "STRING", "path": "/tmp/outputs/output_context_path/data"}}, "outputArtifacts":
{}}',
}
envFrom:
- configMapRef: { name: metadata-grpc-configmap, optional: true }
image: library/python:3.9-slim-buster
volumeMounts:
- { mountPath: /kfp-launcher, name: kfp-launcher }
inputs:
parameters:
- { name: metadata_url }
- { name: pipeline-name }
- { name: pipeline-root }
- { name: run-info-fn-run_info }
outputs:
parameters:
- name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path
valueFrom: { path: /tmp/outputs/output_context_path/data }
artifacts:
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_ref: "{}"
pipelines.kubeflow.org/arguments.parameters:
'{"input_context_path": "", "metadata_url":
"{{inputs.parameters.metadata_url}}", "run_info": "{{inputs.parameters.run-info-fn-run_info}}"}'
pipelines.kubeflow.org/max_cache_staleness: P0D
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/enable_caching: "true"
initContainers:
- command: [launcher, --copy, /kfp-launcher/launch]
image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- { name: kfp-launcher }
- name: same-step-001-45bda231c8c94938af131969af2dced5-fn
container:
args:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_001_45bda231c8c94938af131969af2dced5_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASVVwIAAAAAAABYUAIAAGltcG9ydCBudW1weSBhcyBucAppbXBvcnQgbWF0cGxvdGxpYi5weXBsb3QgYXMgcGx0CmltcG9ydCBzY2lweS5zdGF0cyBhcyBzdGF0cwoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCm11ID0gMApzdGQgPSAxCgp4ID0gbnAubGluc3BhY2Uoc3RhcnQ9LTQsIHN0b3A9NCwgbnVtPTEwMCkKeSA9IHN0YXRzLm5vcm0ucGRmKHgsIG11LCBzdGQpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDI1MTUgCgpwbHQucGxvdCh4LCB5KQpwbHQuc2hvdygpCmltcG9ydCByZXF1ZXN0cwppbXBvcnQgcGFuZGFzIGFzIHBkCmltcG9ydCBwbG90bHkuZmlndXJlX2ZhY3RvcnkgYXMgZmYKaW1wb3J0IGNoYXJ0X3N0dWRpby5wbG90bHkgYXMgcHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy90ZXN0LmNzdicKZGYgPSBwZC5yZWFkX2Nzdih1cmwpCgphID0gYSAqIDEwMDAKYiA9IGIgLyA2NyAjIDM3LjUzNzMxMzQzMjgKCmRmLmRlc2NyaWJlKCmULg=="))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
- --executor_input
- "{{$}}"
- --function_to_execute
- same_step_001_45bda231c8c94938af131969af2dced5_fn
command:
[
/kfp-launcher/launch,
--mlmd_server_address,
$(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port,
$(METADATA_GRPC_SERVICE_PORT),
--runtime_info_json,
$(KFP_V2_RUNTIME_INFO),
--container_image,
$(KFP_V2_IMAGE),
--task_name,
same-step-001-45bda231c8c94938af131969af2dced5-fn,
--pipeline_name,
"{{inputs.parameters.pipeline-name}}",
--run_id,
$(KFP_RUN_ID),
--run_resource,
workflows.argoproj.io/$(WORKFLOW_ID),
--namespace,
$(KFP_NAMESPACE),
--pod_name,
$(KFP_POD_NAME),
--pod_uid,
$(KFP_POD_UID),
--pipeline_root,
"{{inputs.parameters.pipeline-root}}",
--enable_caching,
$(ENABLE_CACHING),
--,
"input_context_path={{inputs.parameters.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path}}",
"metadata_url={{inputs.parameters.metadata_url}}",
"run_info={{inputs.parameters.run-info-fn-run_info}}",
--,
]
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: { fieldPath: metadata.name }
- name: KFP_POD_UID
valueFrom:
fieldRef: { fieldPath: metadata.uid }
- name: KFP_NAMESPACE
valueFrom:
fieldRef: { fieldPath: metadata.namespace }
- name: WORKFLOW_ID
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['workflows.argoproj.io/workflow']",
}
- name: KFP_RUN_ID
valueFrom:
fieldRef: { fieldPath: "metadata.labels['pipeline/runid']" }
- name: ENABLE_CACHING
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['pipelines.kubeflow.org/enable_caching']",
}
- { name: KFP_V2_IMAGE, value: "library/python:3.9-slim-buster" }
- {
name: KFP_V2_RUNTIME_INFO,
value: '{"inputParameters": {"input_context_path":
{"type": "STRING"}, "metadata_url": {"type": "STRING"}, "run_info": {"type":
"STRING"}}, "inputArtifacts": {}, "outputParameters": {"output_context_path":
{"type": "STRING", "path": "/tmp/outputs/output_context_path/data"}}, "outputArtifacts":
{}}',
}
envFrom:
- configMapRef: { name: metadata-grpc-configmap, optional: true }
image: library/python:3.9-slim-buster
volumeMounts:
- { mountPath: /kfp-launcher, name: kfp-launcher }
inputs:
parameters:
- { name: metadata_url }
- { name: pipeline-name }
- { name: pipeline-root }
- { name: run-info-fn-run_info }
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
}
outputs:
parameters:
- name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path
valueFrom: { path: /tmp/outputs/output_context_path/data }
artifacts:
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_ref: "{}"
pipelines.kubeflow.org/arguments.parameters:
'{"input_context_path": "{{inputs.parameters.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path}}",
"metadata_url": "{{inputs.parameters.metadata_url}}", "run_info": "{{inputs.parameters.run-info-fn-run_info}}"}'
pipelines.kubeflow.org/max_cache_staleness: P0D
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/enable_caching: "true"
initContainers:
- command: [launcher, --copy, /kfp-launcher/launch]
image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- { name: kfp-launcher }
- name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
container:
args:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_002_cbffced023ac4096a647fa26bca93da5_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASV9wEAAAAAAABY8AEAAGEgPSBhICsgNQpiID0gYiArIDEwICMgNDcuNTM3MzEzNDMyOApwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpnID0gc29tZV9tYXRoKDgsIDIxKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpqID0gZ1swXQprID0gZ1sxXQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSBhICsgNQpiID0gYiArIDEwICMgNTcuNTM3MzEzNDMyOAoKcHJpbnQoZiJqOiB7an0iKQpwcmludChmIms6IHtrfSIpCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA2Ny41MzczMTM0MzI4CnByaW50KCIwLjAuMiIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCnByaW50KGYiQWNjZXNzaW5nIHRoZSB2YWx1ZSBvZiBCOiB7Yn0iKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQqULg=="))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
- --executor_input
- "{{$}}"
- --function_to_execute
- same_step_002_cbffced023ac4096a647fa26bca93da5_fn
command:
[
/kfp-launcher/launch,
--mlmd_server_address,
$(METADATA_GRPC_SERVICE_HOST),
--mlmd_server_port,
$(METADATA_GRPC_SERVICE_PORT),
--runtime_info_json,
$(KFP_V2_RUNTIME_INFO),
--container_image,
$(KFP_V2_IMAGE),
--task_name,
same-step-002-cbffced023ac4096a647fa26bca93da5-fn,
--pipeline_name,
"{{inputs.parameters.pipeline-name}}",
--run_id,
$(KFP_RUN_ID),
--run_resource,
workflows.argoproj.io/$(WORKFLOW_ID),
--namespace,
$(KFP_NAMESPACE),
--pod_name,
$(KFP_POD_NAME),
--pod_uid,
$(KFP_POD_UID),
--pipeline_root,
"{{inputs.parameters.pipeline-root}}",
--enable_caching,
$(ENABLE_CACHING),
--,
"input_context_path={{inputs.parameters.same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path}}",
"metadata_url={{inputs.parameters.metadata_url}}",
"run_info={{inputs.parameters.run-info-fn-run_info}}",
--,
]
env:
- name: KFP_POD_NAME
valueFrom:
fieldRef: { fieldPath: metadata.name }
- name: KFP_POD_UID
valueFrom:
fieldRef: { fieldPath: metadata.uid }
- name: KFP_NAMESPACE
valueFrom:
fieldRef: { fieldPath: metadata.namespace }
- name: WORKFLOW_ID
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['workflows.argoproj.io/workflow']",
}
- name: KFP_RUN_ID
valueFrom:
fieldRef: { fieldPath: "metadata.labels['pipeline/runid']" }
- name: ENABLE_CACHING
valueFrom:
fieldRef:
{
fieldPath: "metadata.labels['pipelines.kubeflow.org/enable_caching']",
}
- { name: KFP_V2_IMAGE, value: "library/python:3.9-slim-buster" }
- {
name: KFP_V2_RUNTIME_INFO,
value: '{"inputParameters": {"input_context_path":
{"type": "STRING"}, "metadata_url": {"type": "STRING"}, "run_info": {"type":
"STRING"}}, "inputArtifacts": {}, "outputParameters": {"output_context_path":
{"type": "STRING", "path": "/tmp/outputs/output_context_path/data"}}, "outputArtifacts":
{}}',
}
envFrom:
- configMapRef: { name: metadata-grpc-configmap, optional: true }
image: library/python:3.9-slim-buster
volumeMounts:
- { mountPath: /kfp-launcher, name: kfp-launcher }
inputs:
parameters:
- { name: metadata_url }
- { name: pipeline-name }
- { name: pipeline-root }
- { name: run-info-fn-run_info }
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
}
outputs:
artifacts:
- {
name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
annotations:
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/component_ref: "{}"
pipelines.kubeflow.org/arguments.parameters:
'{"input_context_path": "{{inputs.parameters.same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path}}",
"metadata_url": "{{inputs.parameters.metadata_url}}", "run_info": "{{inputs.parameters.run-info-fn-run_info}}"}'
pipelines.kubeflow.org/max_cache_staleness: P0D
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/v2_component: "true"
pipelines.kubeflow.org/enable_caching: "true"
initContainers:
- command: [launcher, --copy, /kfp-launcher/launch]
image: gcr.io/ml-pipeline/kfp-launcher:1.8.7
name: kfp-launcher
mirrorVolumeMounts: true
volumes:
- { name: kfp-launcher }
arguments:
parameters:
- { name: context, value: "" }
- { name: metadata_url, value: "" }
- { name: pipeline-root, value: "" }
- { name: pipeline-name, value: pipeline/root_pipeline_compilation }
serviceAccountName: pipeline-runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment