Skip to content

Instantly share code, notes, and snippets.

@aronchick
Created April 29, 2022 19:31
Show Gist options
  • Save aronchick/0dfc57d2a794c1bd4fb9bff9962cfbd6 to your computer and use it in GitHub Desktop.
Save aronchick/0dfc57d2a794c1bd4fb9bff9962cfbd6 to your computer and use it in GitHub Desktop.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: root-pipeline-compilation-
annotations:
{
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12,
pipelines.kubeflow.org/pipeline_compilation_time: "2022-04-29T19:09:18.818069",
pipelines.kubeflow.org/pipeline_spec:
'{"inputs": [{"default": "", "name": "context",
"optional": true, "type": "String"}, {"default": "", "name": "metadata_url",
"optional": true, "type": "String"}], "name": "root_pipeline_compilation"}',
}
labels: { pipelines.kubeflow.org/kfp_sdk_version: 1.8.12 }
spec:
entrypoint: root-pipeline-compilation
templates:
- name: root-pipeline-compilation
inputs:
artifacts:
- { name: metadata_url }
dag:
tasks:
- { name: run-info-fn, template: run-info-fn }
- name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
template: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
dependencies: [run-info-fn]
arguments:
artifacts:
- {
name: metadata_url,
from: "{{inputs.artifacts.metadata_url}}",
}
- {
name: run-info-fn-run_info,
from: "{{tasks.run-info-fn.outputs.artifacts.run-info-fn-run_info}}",
}
- name: same-step-001-45bda231c8c94938af131969af2dced5-fn
template: same-step-001-45bda231c8c94938af131969af2dced5-fn
dependencies:
[run-info-fn, same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn]
arguments:
artifacts:
- {
name: metadata_url,
from: "{{inputs.artifacts.metadata_url}}",
}
- {
name: run-info-fn-run_info,
from: "{{tasks.run-info-fn.outputs.artifacts.run-info-fn-run_info}}",
}
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
from: "{{tasks.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn.outputs.artifacts.same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path}}",
}
- name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
template: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
dependencies:
[run-info-fn, same-step-001-45bda231c8c94938af131969af2dced5-fn]
arguments:
artifacts:
- {
name: metadata_url,
from: "{{inputs.artifacts.metadata_url}}",
}
- {
name: run-info-fn-run_info,
from: "{{tasks.run-info-fn.outputs.artifacts.run-info-fn-run_info}}",
}
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
from: "{{tasks.same-step-001-45bda231c8c94938af131969af2dced5-fn.outputs.artifacts.same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path}}",
}
- name: run-info-fn
container:
args: [--executor_input, "{{$}}", --function_to_execute, run_info_fn]
command:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp' 'dill' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def run_info_fn(
run_id: str,
) -> NamedTuple("RunInfoOutput", [("run_info", str),]):
from base64 import urlsafe_b64encode
from collections import namedtuple
import datetime
import base64
import dill
import kfp
client = kfp.Client(host="http://ml-pipeline:8888")
run_info = client.get_run(run_id=run_id)
run_info_dict = {
"run_id": run_info.run.id,
"name": run_info.run.name,
"created_at": run_info.run.created_at.isoformat(),
"pipeline_id": run_info.run.pipeline_spec.pipeline_id,
}
# Track kubernetes resources associated wth the run.
for r in run_info.run.resource_references:
run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id
# Base64-encoded as value is visible in kubeflow ui.
output = urlsafe_b64encode(dill.dumps(run_info_dict))
return namedtuple("RunInfoOutput", ["run_info"])(
str(output, encoding="ascii")
)
image: python:3.7
inputs:
artifacts:
- name: run_id
path: /tmp/inputs/run_id/data
raw: { data: "{{workflow.uid}}" }
outputs:
artifacts:
- { name: run-info-fn-run_info, path: /tmp/outputs/run_info/data }
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations:
{
pipelines.kubeflow.org/component_spec:
'{"implementation": {"container":
{"args": ["--executor_input", {"executorInput": null}, "--function_to_execute",
"run_info_fn"], "command": ["sh", "-c", "\nif ! [ -x \"$(command -v pip)\"
]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get
install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''kfp'' ''dill'' ''kfp==1.8.12''
&& \"$0\" \"$@\"\n", "sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\"
\"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing
import *\n\ndef run_info_fn(\n run_id: str,\n) -> NamedTuple(\"RunInfoOutput\",
[(\"run_info\", str),]):\n from base64 import urlsafe_b64encode\n from
collections import namedtuple\n import datetime\n import base64\n import
dill\n import kfp\n\n client = kfp.Client(host=\"http://ml-pipeline:8888\")\n run_info
= client.get_run(run_id=run_id)\n\n run_info_dict = {\n \"run_id\":
run_info.run.id,\n \"name\": run_info.run.name,\n \"created_at\":
run_info.run.created_at.isoformat(),\n \"pipeline_id\": run_info.run.pipeline_spec.pipeline_id,\n }\n\n #
Track kubernetes resources associated wth the run.\n for r in run_info.run.resource_references:\n run_info_dict[f\"{r.key.type.lower()}_id\"]
= r.key.id\n\n # Base64-encoded as value is visible in kubeflow ui.\n output
= urlsafe_b64encode(dill.dumps(run_info_dict))\n\n return namedtuple(\"RunInfoOutput\",
[\"run_info\"])(\n str(output, encoding=\"ascii\")\n )\n\n"],
"image": "python:3.7"}}, "inputs": [{"name": "run_id", "type": "String"}],
"name": "Run info fn", "outputs": [{"name": "run_info", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: "{}",
}
- name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn
container:
args:
[
--executor_input,
"{{$}}",
--function_to_execute,
same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn,
]
command:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASVTQIAAAAAAABYRgIAAGRhdGFzZXQgPSAnc2FtcGxlX2RhdGEnCmdwdV90eXBlID0gJ0ExMDAnCmltcG9ydCB0ZW5zb3JmbG93CmltcG9ydCBkYXRldGltZQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSAxMApiID0gYSArIDUgIzE1CmZyb20gSVB5dGhvbi5kaXNwbGF5IGltcG9ydCBJbWFnZQoKdXJsID0gJ2h0dHBzOi8vcmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbS9TQU1FLVByb2plY3QvU0FNRS1zYW1wbGVzL21haW4vdGVzdC1hcnRpZmFjdHMvRmFyb2VJc2xhbmRzLmpwZWcnCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIzI1Cgpmcm9tIElQeXRob24gaW1wb3J0IGRpc3BsYXkKZGlzcGxheS5JbWFnZSh1cmwpCgppbXBvcnQgcGxvdGx5CgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKZGVmIHNvbWVfbWF0aCh4LCB6KSAtPiB0dXBsZToKICAgIHJldHVybiAocm91bmQoeCArIHosIDIpLCByb3VuZCh4IC8geiwgMikpCgphID0gYSAqIDIwCmIgPSBiICogMTAwICMyNTAwCgpwcmludChmIkIgPSB7Yn0iKZQu"))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
image: library/python:3.9-slim-buster
inputs:
artifacts:
- name: input_context_path
path: /tmp/inputs/input_context_path/data
raw: { data: "" }
- { name: metadata_url, path: /tmp/inputs/metadata_url/data }
- { name: run-info-fn-run_info, path: /tmp/inputs/run_info/data }
outputs:
artifacts:
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations:
{
pipelines.kubeflow.org/component_spec:
'{"implementation": {"container":
{"args": ["--executor_input", {"executorInput": null}, "--function_to_execute",
"same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn"], "command": ["sh",
"-c", "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip
|| python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' ''kfp==1.8.12'' && \"$0\" \"$@\"\n",
"sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3
-m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing
import *\n\ndef same_step_000_7495a05863f8450a9b26a0d2da4042eb_fn(\n input_context_path:
InputPath(str),\n output_context_path: OutputPath(str),\n run_info:
str = \"gAR9lC4=\",\n metadata_url: str = \"\",\n):\n from base64
import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import Path\n import
datetime\n import requests\n import tempfile\n import dill\n import
os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif
{ len(input_context) } > 0:\n dill.load_session(\"{ input_context_path
}\")\n\n{dill.loads(urlsafe_b64decode(\"gASVTQIAAAAAAABYRgIAAGRhdGFzZXQgPSAnc2FtcGxlX2RhdGEnCmdwdV90eXBlID0gJ0ExMDAnCmltcG9ydCB0ZW5zb3JmbG93CmltcG9ydCBkYXRldGltZQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSAxMApiID0gYSArIDUgIzE1CmZyb20gSVB5dGhvbi5kaXNwbGF5IGltcG9ydCBJbWFnZQoKdXJsID0gJ2h0dHBzOi8vcmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbS9TQU1FLVByb2plY3QvU0FNRS1zYW1wbGVzL21haW4vdGVzdC1hcnRpZmFjdHMvRmFyb2VJc2xhbmRzLmpwZWcnCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIzI1Cgpmcm9tIElQeXRob24gaW1wb3J0IGRpc3BsYXkKZGlzcGxheS5JbWFnZSh1cmwpCgppbXBvcnQgcGxvdGx5CgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKZGVmIHNvbWVfbWF0aCh4LCB6KSAtPiB0dXBsZToKICAgIHJldHVybiAocm91bmQoeCArIHosIDIpLCByb3VuZCh4IC8geiwgMikpCgphID0gYSAqIDIwCmIgPSBiICogMTAwICMyNTAwCgpwcmludChmIkIgPSB7Yn0iKZQu\"))}\n\n#
Remove anything from the global namespace that cannot be serialised.\n#
TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys
= []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n try:\n dill.dumps(globals()[k])\n except
TypeError:\n _bad_keys.append(k)\n\nfor k in _bad_keys:\n del
globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context_path",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true, "type": "String"}, {"default": "", "name": "metadata_url", "optional":
true, "type": "String"}], "name": "Same step 000 7495a05863f8450a9b26a0d2da4042eb
fn", "outputs": [{"name": "output_context_path", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: "{}",
pipelines.kubeflow.org/max_cache_staleness: P0D,
}
- name: same-step-001-45bda231c8c94938af131969af2dced5-fn
container:
args:
[
--executor_input,
"{{$}}",
--function_to_execute,
same_step_001_45bda231c8c94938af131969af2dced5_fn,
]
command:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_001_45bda231c8c94938af131969af2dced5_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASVVwIAAAAAAABYUAIAAGltcG9ydCBudW1weSBhcyBucAppbXBvcnQgbWF0cGxvdGxpYi5weXBsb3QgYXMgcGx0CmltcG9ydCBzY2lweS5zdGF0cyBhcyBzdGF0cwoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCm11ID0gMApzdGQgPSAxCgp4ID0gbnAubGluc3BhY2Uoc3RhcnQ9LTQsIHN0b3A9NCwgbnVtPTEwMCkKeSA9IHN0YXRzLm5vcm0ucGRmKHgsIG11LCBzdGQpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDI1MTUgCgpwbHQucGxvdCh4LCB5KQpwbHQuc2hvdygpCmltcG9ydCByZXF1ZXN0cwppbXBvcnQgcGFuZGFzIGFzIHBkCmltcG9ydCBwbG90bHkuZmlndXJlX2ZhY3RvcnkgYXMgZmYKaW1wb3J0IGNoYXJ0X3N0dWRpby5wbG90bHkgYXMgcHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy90ZXN0LmNzdicKZGYgPSBwZC5yZWFkX2Nzdih1cmwpCgphID0gYSAqIDEwMDAKYiA9IGIgLyA2NyAjIDM3LjUzNzMxMzQzMjgKCmRmLmRlc2NyaWJlKCmULg=="))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
image: library/python:3.9-slim-buster
inputs:
artifacts:
- {
name: same-step-000-7495a05863f8450a9b26a0d2da4042eb-fn-output_context_path,
path: /tmp/inputs/input_context_path/data,
}
- { name: metadata_url, path: /tmp/inputs/metadata_url/data }
- { name: run-info-fn-run_info, path: /tmp/inputs/run_info/data }
outputs:
artifacts:
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations:
{
pipelines.kubeflow.org/component_spec:
'{"implementation": {"container":
{"args": ["--executor_input", {"executorInput": null}, "--function_to_execute",
"same_step_001_45bda231c8c94938af131969af2dced5_fn"], "command": ["sh",
"-c", "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip
|| python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' ''kfp==1.8.12'' && \"$0\" \"$@\"\n",
"sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3
-m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing
import *\n\ndef same_step_001_45bda231c8c94938af131969af2dced5_fn(\n input_context_path:
InputPath(str),\n output_context_path: OutputPath(str),\n run_info:
str = \"gAR9lC4=\",\n metadata_url: str = \"\",\n):\n from base64
import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import Path\n import
datetime\n import requests\n import tempfile\n import dill\n import
os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif
{ len(input_context) } > 0:\n dill.load_session(\"{ input_context_path
}\")\n\n{dill.loads(urlsafe_b64decode(\"gASVVwIAAAAAAABYUAIAAGltcG9ydCBudW1weSBhcyBucAppbXBvcnQgbWF0cGxvdGxpYi5weXBsb3QgYXMgcGx0CmltcG9ydCBzY2lweS5zdGF0cyBhcyBzdGF0cwoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCm11ID0gMApzdGQgPSAxCgp4ID0gbnAubGluc3BhY2Uoc3RhcnQ9LTQsIHN0b3A9NCwgbnVtPTEwMCkKeSA9IHN0YXRzLm5vcm0ucGRmKHgsIG11LCBzdGQpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDI1MTUgCgpwbHQucGxvdCh4LCB5KQpwbHQuc2hvdygpCmltcG9ydCByZXF1ZXN0cwppbXBvcnQgcGFuZGFzIGFzIHBkCmltcG9ydCBwbG90bHkuZmlndXJlX2ZhY3RvcnkgYXMgZmYKaW1wb3J0IGNoYXJ0X3N0dWRpby5wbG90bHkgYXMgcHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy90ZXN0LmNzdicKZGYgPSBwZC5yZWFkX2Nzdih1cmwpCgphID0gYSAqIDEwMDAKYiA9IGIgLyA2NyAjIDM3LjUzNzMxMzQzMjgKCmRmLmRlc2NyaWJlKCmULg==\"))}\n\n#
Remove anything from the global namespace that cannot be serialised.\n#
TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys
= []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n try:\n dill.dumps(globals()[k])\n except
TypeError:\n _bad_keys.append(k)\n\nfor k in _bad_keys:\n del
globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context_path",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true, "type": "String"}, {"default": "", "name": "metadata_url", "optional":
true, "type": "String"}], "name": "Same step 001 45bda231c8c94938af131969af2dced5
fn", "outputs": [{"name": "output_context_path", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: "{}",
pipelines.kubeflow.org/max_cache_staleness: P0D,
}
- name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn
container:
args:
[
--executor_input,
"{{$}}",
--function_to_execute,
same_step_002_cbffced023ac4096a647fa26bca93da5_fn,
]
command:
- sh
- -c
- |2
if ! [ -x "$(command -v pip)" ]; then
python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi
PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
python3 -m kfp.v2.components.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
- |2+
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import *
from typing import *
def same_step_002_cbffced023ac4096a647fa26bca93da5_fn(
input_context_path: InputPath(str),
output_context_path: OutputPath(str),
run_info: str = "gAR9lC4=",
metadata_url: str = "",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
import dill
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
{dill.loads(urlsafe_b64decode("gASV9wEAAAAAAABY8AEAAGEgPSBhICsgNQpiID0gYiArIDEwICMgNDcuNTM3MzEzNDMyOApwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpnID0gc29tZV9tYXRoKDgsIDIxKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpqID0gZ1swXQprID0gZ1sxXQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSBhICsgNQpiID0gYiArIDEwICMgNTcuNTM3MzEzNDMyOAoKcHJpbnQoZiJqOiB7an0iKQpwcmludChmIms6IHtrfSIpCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA2Ny41MzczMTM0MzI4CnByaW50KCIwLjAuMiIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCnByaW50KGYiQWNjZXNzaW5nIHRoZSB2YWx1ZSBvZiBCOiB7Yn0iKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQqULg=="))}
# Remove anything from the global namespace that cannot be serialised.
# TODO: this will include things like pandas dataframes, needs sdk support?
_bad_keys = []
_all_keys = list(globals().keys())
for k in _all_keys:
try:
dill.dumps(globals()[k])
except TypeError:
_bad_keys.append(k)
for k in _bad_keys:
del globals()[k]
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
image: library/python:3.9-slim-buster
inputs:
artifacts:
- {
name: same-step-001-45bda231c8c94938af131969af2dced5-fn-output_context_path,
path: /tmp/inputs/input_context_path/data,
}
- { name: metadata_url, path: /tmp/inputs/metadata_url/data }
- { name: run-info-fn-run_info, path: /tmp/inputs/run_info/data }
outputs:
artifacts:
- {
name: same-step-002-cbffced023ac4096a647fa26bca93da5-fn-output_context_path,
path: /tmp/outputs/output_context_path/data,
}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations:
{
pipelines.kubeflow.org/component_spec:
'{"implementation": {"container":
{"args": ["--executor_input", {"executorInput": null}, "--function_to_execute",
"same_step_002_cbffced023ac4096a647fa26bca93da5_fn"], "command": ["sh",
"-c", "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip
|| python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' ''kfp==1.8.12'' && \"$0\" \"$@\"\n",
"sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3
-m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing
import *\n\ndef same_step_002_cbffced023ac4096a647fa26bca93da5_fn(\n input_context_path:
InputPath(str),\n output_context_path: OutputPath(str),\n run_info:
str = \"gAR9lC4=\",\n metadata_url: str = \"\",\n):\n from base64
import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import Path\n import
datetime\n import requests\n import tempfile\n import dill\n import
os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif
{ len(input_context) } > 0:\n dill.load_session(\"{ input_context_path
}\")\n\n{dill.loads(urlsafe_b64decode(\"gASV9wEAAAAAAABY8AEAAGEgPSBhICsgNQpiID0gYiArIDEwICMgNDcuNTM3MzEzNDMyOApwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpnID0gc29tZV9tYXRoKDgsIDIxKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpqID0gZ1swXQprID0gZ1sxXQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSBhICsgNQpiID0gYiArIDEwICMgNTcuNTM3MzEzNDMyOAoKcHJpbnQoZiJqOiB7an0iKQpwcmludChmIms6IHtrfSIpCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA2Ny41MzczMTM0MzI4CnByaW50KCIwLjAuMiIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCnByaW50KGYiQWNjZXNzaW5nIHRoZSB2YWx1ZSBvZiBCOiB7Yn0iKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQqULg==\"))}\n\n#
Remove anything from the global namespace that cannot be serialised.\n#
TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys
= []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n try:\n dill.dumps(globals()[k])\n except
TypeError:\n _bad_keys.append(k)\n\nfor k in _bad_keys:\n del
globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context_path",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true, "type": "String"}, {"default": "", "name": "metadata_url", "optional":
true, "type": "String"}], "name": "Same step 002 cbffced023ac4096a647fa26bca93da5
fn", "outputs": [{"name": "output_context_path", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: "{}",
pipelines.kubeflow.org/max_cache_staleness: P0D,
}
arguments:
parameters:
- { name: context, value: "" }
- { name: metadata_url, value: "" }
artifacts:
- name: metadata_url
raw: { data: "{{workflow.parameters.metadata_url}}" }
serviceAccountName: pipeline-runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment