Skip to content

Instantly share code, notes, and snippets.

@aronchick
Created May 7, 2022 00:40
Show Gist options
  • Save aronchick/25d6fca71df0ef86846c40bec5cbc2c3 to your computer and use it in GitHub Desktop.
Save aronchick/25d6fca71df0ef86846c40bec5cbc2c3 to your computer and use it in GitHub Desktop.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: compilation-of-pipelines-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12, pipelines.kubeflow.org/pipeline_compilation_time: '2022-05-06T23:57:02.426196',
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "", "name": "context",
"optional": true}, {"default": "", "name": "metadata_url", "optional": true},
{"default": "minio", "name": "AWS_ACCESS_KEY_ID", "optional": true, "type":
"String"}, {"default": "minio123", "name": "AWS_SECRET_ACCESS_KEY", "optional":
true, "type": "String"}, {"default": "http://combinator-minio.mlflow.svc.cluster.local:9000",
"name": "MLFLOW_S3_ENDPOINT_URL", "optional": true, "type": "String"}, {"default":
"http://combinator-mlflow.mlflow.svc.cluster.local:5000", "name": "MLFLOW_TRACKING_URI",
"optional": true, "type": "String"}], "name": "Compilation of pipelines"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12}
spec:
entrypoint: compilation-of-pipelines
templates:
- name: compilation-of-pipelines
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
dag:
tasks:
- {name: run-info-fn, template: run-info-fn}
- name: same-step-000-a29af0569ab04006b678357f767987db-fn
template: same-step-000-a29af0569ab04006b678357f767987db-fn
dependencies: [run-info-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn
template: same-step-001-60ad85fb69284d98992da58abd28344c-fn
dependencies: [run-info-fn, same-step-000-a29af0569ab04006b678357f767987db-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context,
from: '{{tasks.same-step-000-a29af0569ab04006b678357f767987db-fn.outputs.artifacts.same-step-000-a29af0569ab04006b678357f767987db-fn-output_context}}'}
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
template: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
dependencies: [run-info-fn, same-step-001-60ad85fb69284d98992da58abd28344c-fn]
arguments:
parameters:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
- {name: metadata_url, value: '{{inputs.parameters.metadata_url}}'}
- {name: run-info-fn-run_info, value: '{{tasks.run-info-fn.outputs.parameters.run-info-fn-run_info}}'}
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context,
from: '{{tasks.same-step-001-60ad85fb69284d98992da58abd28344c-fn.outputs.artifacts.same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context}}'}
- name: run-info-fn
container:
args: [--run-id, '{{workflow.uid}}', '----output-paths', /tmp/outputs/run_info/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'kfp' 'dill' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
--no-warn-script-location 'kfp' 'dill' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def run_info_fn(
run_id,
):
from base64 import urlsafe_b64encode
from collections import namedtuple
import datetime
import base64
import dill
import kfp
client = kfp.Client(host="http://ml-pipeline:8888")
run_info = client.get_run(run_id=run_id)
run_info_dict = {
"run_id": run_info.run.id,
"name": run_info.run.name,
"created_at": run_info.run.created_at.isoformat(),
"pipeline_id": run_info.run.pipeline_spec.pipeline_id,
}
# Track kubernetes resources associated wth the run.
for r in run_info.run.resource_references:
run_info_dict[f"{r.key.type.lower()}_id"] = r.key.id
# Base64-encoded as value is visible in kubeflow ui.
output = urlsafe_b64encode(dill.dumps(run_info_dict))
return namedtuple("RunInfoOutput", ["run_info"])(
str(output, encoding="ascii")
)
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(
str(str_value), str(type(str_value))))
return str_value
import argparse
_parser = argparse.ArgumentParser(prog='Run info fn', description='')
_parser.add_argument("--run-id", dest="run_id", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = run_info_fn(**_parsed_args)
_output_serializers = [
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: python:3.7
outputs:
parameters:
- name: run-info-fn-run_info
valueFrom: {path: /tmp/outputs/run_info/data}
artifacts:
- {name: run-info-fn-run_info, path: /tmp/outputs/run_info/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--run-id", {"inputValue": "run_id"}, "----output-paths", {"outputPath":
"run_info"}], "command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''kfp'' ''dill''
|| PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
''kfp'' ''dill'' --user) && \"$0\" \"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf
\"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def run_info_fn(\n run_id,\n):\n from base64 import urlsafe_b64encode\n from
collections import namedtuple\n import datetime\n import base64\n import
dill\n import kfp\n\n client = kfp.Client(host=\"http://ml-pipeline:8888\")\n run_info
= client.get_run(run_id=run_id)\n\n run_info_dict = {\n \"run_id\":
run_info.run.id,\n \"name\": run_info.run.name,\n \"created_at\":
run_info.run.created_at.isoformat(),\n \"pipeline_id\": run_info.run.pipeline_spec.pipeline_id,\n }\n\n #
Track kubernetes resources associated wth the run.\n for r in run_info.run.resource_references:\n run_info_dict[f\"{r.key.type.lower()}_id\"]
= r.key.id\n\n # Base64-encoded as value is visible in kubeflow ui.\n output
= urlsafe_b64encode(dill.dumps(run_info_dict))\n\n return namedtuple(\"RunInfoOutput\",
[\"run_info\"])(\n str(output, encoding=\"ascii\")\n )\n\ndef
_serialize_str(str_value: str) -> str:\n if not isinstance(str_value,
str):\n raise TypeError(''Value \"{}\" has type \"{}\" instead of
str.''.format(\n str(str_value), str(type(str_value))))\n return
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Run
info fn'', description='''')\n_parser.add_argument(\"--run-id\", dest=\"run_id\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = run_info_fn(**_parsed_args)\n\n_output_serializers
= [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "python:3.7"}}, "inputs": [{"name": "run_id", "type": "String"}],
"name": "Run info fn", "outputs": [{"name": "run_info", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: '{}', pipelines.kubeflow.org/arguments.parameters: '{"run_id":
"{{workflow.uid}}"}'}
- name: same-step-000-a29af0569ab04006b678357f767987db-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_000_a29af0569ab04006b678357f767987db_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_000",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 000 a29af0569ab04006b678357f767987db fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- name: input_context
path: /tmp/inputs/input_context/data
raw: {data: ''}
outputs:
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_000_a29af0569ab04006b678357f767987db_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"ZGF0YXNldCA9ICdzYW1wbGVfZGF0YScKZ3B1X3R5cGUgPSAnQTEwMCcKaW1wb3J0IHRlbnNvcmZsb3cKaW1wb3J0IGRhdGV0aW1lCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IDEwCmIgPSBhICsgNSAjMTUKZnJvbSBJUHl0aG9uLmRpc3BsYXkgaW1wb3J0IEltYWdlCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy9GYXJvZUlzbGFuZHMuanBlZycKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjMjUKCmZyb20gSVB5dGhvbiBpbXBvcnQgZGlzcGxheQpkaXNwbGF5LkltYWdlKHVybCkKCmltcG9ydCBwbG90bHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgpkZWYgc29tZV9tYXRoKHgsIHopIC0-IHR1cGxlOgogICAgcmV0dXJuIChyb3VuZCh4ICsgeiwgMiksIHJvdW5kKHggLyB6LCAyKSkKCmEgPSBhICogMjAKYiA9IGIgKiAxMDAgIzI1MDAKCnByaW50KGYiQiA9IHtifSIp\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_000\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 000 a29af0569ab04006b678357f767987db
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_000_a29af0569ab04006b678357f767987db_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 000 a29af0569ab04006b678357f767987db fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
- name: same-step-001-60ad85fb69284d98992da58abd28344c-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_001_60ad85fb69284d98992da58abd28344c_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_001",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 001 60ad85fb69284d98992da58abd28344c fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- {name: same-step-000-a29af0569ab04006b678357f767987db-fn-output_context, path: /tmp/inputs/input_context/data}
outputs:
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_001_60ad85fb69284d98992da58abd28344c_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"aW1wb3J0IG51bXB5IGFzIG5wCmltcG9ydCBtYXRwbG90bGliLnB5cGxvdCBhcyBwbHQKaW1wb3J0IHNjaXB5LnN0YXRzIGFzIHN0YXRzCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKbXUgPSAwCnN0ZCA9IDEKCnggPSBucC5saW5zcGFjZShzdGFydD0tNCwgc3RvcD00LCBudW09MTAwKQp5ID0gc3RhdHMubm9ybS5wZGYoeCwgbXUsIHN0ZCkKCmEgPSBhICsgNQpiID0gYiArIDEwICMgMjUxNSAKCnBsdC5wbG90KHgsIHkpCnBsdC5zaG93KCkKaW1wb3J0IHJlcXVlc3RzCmltcG9ydCBwYW5kYXMgYXMgcGQKaW1wb3J0IHBsb3RseS5maWd1cmVfZmFjdG9yeSBhcyBmZgppbXBvcnQgY2hhcnRfc3R1ZGlvLnBsb3RseSBhcyBweQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCnVybCA9ICdodHRwczovL3Jhdy5naXRodWJ1c2VyY29udGVudC5jb20vU0FNRS1Qcm9qZWN0L1NBTUUtc2FtcGxlcy9tYWluL3Rlc3QtYXJ0aWZhY3RzL3Rlc3QuY3N2JwpkZiA9IHBkLnJlYWRfY3N2KHVybCkKCmEgPSBhICogMTAwMApiID0gYiAvIDY3ICMgMzcuNTM3MzEzNDMyOAoKZGYuZGVzY3JpYmUoKQ==\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_001\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 001 60ad85fb69284d98992da58abd28344c
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_001_60ad85fb69284d98992da58abd28344c_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 001 60ad85fb69284d98992da58abd28344c fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
- name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn
container:
args: [--input-context, /tmp/inputs/input_context/data, --run-info, '{{inputs.parameters.run-info-fn-run_info}}',
--metadata-url, '{{inputs.parameters.metadata_url}}', --output-context, /tmp/outputs/output_context/data]
command:
- sh
- -c
- (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location
'dill' 'requests' 'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy'
'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location 'dill' 'requests'
'pympler==1.0.1' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly'
'Requests' 'scipy' 'tensorflow' --user) && "$0" "$@"
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
def _make_parent_dirs_and_return_path(file_path: str):
import os
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path
def same_step_002_b7dbb05511774bd7bab385301d43b388_fn(
input_context_path,
output_context_path,
run_info="gAR9lC4=",
metadata_url="",
):
from base64 import urlsafe_b64encode, urlsafe_b64decode
from pathlib import Path
import datetime
import requests
import tempfile
import dill
import os
input_context = None
with Path(input_context_path).open("rb") as reader:
input_context = reader.read()
# Helper function for posting metadata to mlflow.
def post_metadata(json):
if metadata_url == "":
return
try:
req = requests.post(metadata_url, json=json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print(f"Error posting metadata: {err}")
# Move to writable directory as user might want to do file IO.
# TODO: won't persist across steps, might need support in SDK?
os.chdir(tempfile.mkdtemp())
# Load information about the current experiment run:
run_info = dill.loads(urlsafe_b64decode(run_info))
# Post session context to mlflow.
if len(input_context) > 0:
input_context_str = urlsafe_b64encode(input_context)
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "input",
"metadata_value": input_context_str,
"metadata_time": datetime.datetime.now().isoformat(),
})
# User code for step, which we run in its own execution frame.
user_code = f"""
from base64 import urlsafe_b64decode
from pympler import asizeof
from tempfile import mktemp
import dill
import os
# Install dependencies from requirements file.
_req_file = mktemp()
with open(_req_file, "w") as _file:
_file.write(urlsafe_b64decode("bnVtcHkgPT0gMS4yMS4yCg==").decode())
os.system("pip install -r " + _req_file)
# Load the exploding variables module:
{urlsafe_b64decode("").decode()}
# Load session context into global namespace:
if { len(input_context) } > 0:
dill.load_session("{ input_context_path }")
# Run the user's notebook code:
{urlsafe_b64decode("YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==").decode()}
# Various types of serialisation failures we'd like to inform the user of:
def _cannot_pickle_msg(k):
return "Variable '" + k +"' was defined in previous steps, but could not be serialised as it was not supported by 'dill'."
def _too_large_msg(k, mem):
return "Variable '" + k + "' was defined in previous steps, but could not be serialised as it used too much memory (" + str(mem) + "B)."
# Replace unserialisable variables with exploding variables so that the user
# knows why they cannot use them in future steps:
_all_keys = list(globals().keys())
for _k in _all_keys:
if not dill.pickles(globals()[_k], safe=True):
globals()[_k] = ExplodingVariable(Exception(_cannot_pickle_msg(_k)))
continue
_mem = asizeof.asizeof(globals()[_k])
if _mem >= 52428800:
globals()[_k] = ExplodingVariable(Exception(_too_large_msg(_k, _mem)))
continue
# Save new session context to disk for the next component:
dill.dump_session("{output_context_path}")
"""
# Runs the user code in a new execution frame. Context from the previous
# component in the run is loaded into the session dynamically, and we run
# with a single globals() namespace to simulate top-level execution.
exec(user_code, globals(), globals())
# Post new session context to mlflow:
with Path(output_context_path).open("rb") as reader:
context = urlsafe_b64encode(reader.read())
post_metadata({
"experiment_id": run_info["experiment_id"],
"run_id": run_info["run_id"],
"step_id": "same_step_002",
"metadata_type": "output",
"metadata_value": context,
"metadata_time": datetime.datetime.now().isoformat(),
})
import argparse
_parser = argparse.ArgumentParser(prog='Same step 002 b7dbb05511774bd7bab385301d43b388 fn', description='')
_parser.add_argument("--input-context", dest="input_context_path", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--run-info", dest="run_info", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--metadata-url", dest="metadata_url", type=str, required=False, default=argparse.SUPPRESS)
_parser.add_argument("--output-context", dest="output_context_path", type=_make_parent_dirs_and_return_path, required=True, default=argparse.SUPPRESS)
_parsed_args = vars(_parser.parse_args())
_outputs = same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args)
env:
- {name: AWS_ACCESS_KEY_ID, value: '{{inputs.parameters.AWS_ACCESS_KEY_ID}}'}
- {name: AWS_SECRET_ACCESS_KEY, value: '{{inputs.parameters.AWS_SECRET_ACCESS_KEY}}'}
- {name: MLFLOW_S3_ENDPOINT_URL, value: '{{inputs.parameters.MLFLOW_S3_ENDPOINT_URL}}'}
- {name: MLFLOW_TRACKING_URI, value: '{{inputs.parameters.MLFLOW_TRACKING_URI}}'}
image: library/python:3.9-slim-buster
inputs:
parameters:
- {name: AWS_ACCESS_KEY_ID}
- {name: AWS_SECRET_ACCESS_KEY}
- {name: MLFLOW_S3_ENDPOINT_URL}
- {name: MLFLOW_TRACKING_URI}
- {name: metadata_url}
- {name: run-info-fn-run_info}
artifacts:
- {name: same-step-001-60ad85fb69284d98992da58abd28344c-fn-output_context, path: /tmp/inputs/input_context/data}
outputs:
artifacts:
- {name: same-step-002-b7dbb05511774bd7bab385301d43b388-fn-output_context, path: /tmp/outputs/output_context/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["--input-context", {"inputPath": "input_context"}, {"if": {"cond":
{"isPresent": "run_info"}, "then": ["--run-info", {"inputValue": "run_info"}]}},
{"if": {"cond": {"isPresent": "metadata_url"}, "then": ["--metadata-url",
{"inputValue": "metadata_url"}]}}, "--output-context", {"outputPath": "output_context"}],
"command": ["sh", "-c", "(PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip
install --quiet --no-warn-script-location ''dill'' ''requests'' ''pympler==1.0.1''
''chart_studio'' ''ipython'' ''matplotlib'' ''numpy'' ''pandas'' ''plotly''
''Requests'' ''scipy'' ''tensorflow'' || PIP_DISABLE_PIP_VERSION_CHECK=1
python3 -m pip install --quiet --no-warn-script-location ''dill'' ''requests''
''pympler==1.0.1'' ''chart_studio'' ''ipython'' ''matplotlib'' ''numpy''
''pandas'' ''plotly'' ''Requests'' ''scipy'' ''tensorflow'' --user) && \"$0\"
\"$@\"", "sh", "-ec", "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
-u \"$program_path\" \"$@\"\n", "def _make_parent_dirs_and_return_path(file_path:
str):\n import os\n os.makedirs(os.path.dirname(file_path), exist_ok=True)\n return
file_path\n\ndef same_step_002_b7dbb05511774bd7bab385301d43b388_fn(\n input_context_path,\n output_context_path,\n run_info=\"gAR9lC4=\",\n metadata_url=\"\",\n):\n from
base64 import urlsafe_b64encode, urlsafe_b64decode\n from pathlib import
Path\n import datetime\n import requests\n import tempfile\n import
dill\n import os\n\n input_context = None\n with Path(input_context_path).open(\"rb\")
as reader:\n input_context = reader.read()\n\n # Helper function
for posting metadata to mlflow.\n def post_metadata(json):\n if
metadata_url == \"\":\n return\n\n try:\n req
= requests.post(metadata_url, json=json)\n req.raise_for_status()\n except
requests.exceptions.HTTPError as err:\n print(f\"Error posting
metadata: {err}\")\n\n # Move to writable directory as user might want
to do file IO.\n # TODO: won''t persist across steps, might need support
in SDK?\n os.chdir(tempfile.mkdtemp())\n\n # Load information about
the current experiment run:\n run_info = dill.loads(urlsafe_b64decode(run_info))\n\n #
Post session context to mlflow.\n if len(input_context) > 0:\n input_context_str
= urlsafe_b64encode(input_context)\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"input\",\n \"metadata_value\":
input_context_str,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\n #
User code for step, which we run in its own execution frame.\n user_code
= f\"\"\"\nfrom base64 import urlsafe_b64decode\nfrom pympler import asizeof\nfrom
tempfile import mktemp\nimport dill\nimport os\n\n# Install dependencies
from requirements file.\n_req_file = mktemp()\nwith open(_req_file, \"w\")
as _file:\n _file.write(urlsafe_b64decode(\"bnVtcHkgPT0gMS4yMS4yCg==\").decode())\nos.system(\"pip
install -r \" + _req_file)\n\n# Load the exploding variables module:\n{urlsafe_b64decode(\"\").decode()}\n\n#
Load session context into global namespace:\nif { len(input_context) } >
0:\n dill.load_session(\"{ input_context_path }\")\n\n# Run the user''s
notebook code:\n{urlsafe_b64decode(\"YSA9IGEgKyA1CmIgPSBiICsgMTAgIyA0Ny41MzczMTM0MzI4CnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmcgPSBzb21lX21hdGgoOCwgMjEpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCmogPSBnWzBdCmsgPSBnWzFdCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA1Ny41MzczMTM0MzI4CgpwcmludChmImo6IHtqfSIpCnByaW50KGYiazoge2t9IikKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDY3LjUzNzMxMzQzMjgKcHJpbnQoIjAuMC4yIikKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKcHJpbnQoZiJBY2Nlc3NpbmcgdGhlIHZhbHVlIG9mIEI6IHtifSIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCg==\").decode()}\n\n#
Various types of serialisation failures we''d like to inform the user of:\ndef
_cannot_pickle_msg(k):\n return \"Variable ''\" + k +\"'' was defined
in previous steps, but could not be serialised as it was not supported by
''dill''.\"\n\ndef _too_large_msg(k, mem):\n return \"Variable ''\" +
k + \"'' was defined in previous steps, but could not be serialised as it
used too much memory (\" + str(mem) + \"B).\"\n\n# Replace unserialisable
variables with exploding variables so that the user\n# knows why they cannot
use them in future steps:\n_all_keys = list(globals().keys())\nfor _k in
_all_keys:\n if not dill.pickles(globals()[_k], safe=True):\n globals()[_k]
= ExplodingVariable(Exception(_cannot_pickle_msg(_k)))\n continue\n\n _mem
= asizeof.asizeof(globals()[_k])\n if _mem >= 52428800:\n globals()[_k]
= ExplodingVariable(Exception(_too_large_msg(_k, _mem)))\n continue\n\n#
Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n #
Runs the user code in a new execution frame. Context from the previous\n #
component in the run is loaded into the session dynamically, and we run\n #
with a single globals() namespace to simulate top-level execution.\n exec(user_code,
globals(), globals())\n\n # Post new session context to mlflow:\n with
Path(output_context_path).open(\"rb\") as reader:\n context = urlsafe_b64encode(reader.read())\n post_metadata({\n \"experiment_id\":
run_info[\"experiment_id\"],\n \"run_id\": run_info[\"run_id\"],\n \"step_id\":
\"same_step_002\",\n \"metadata_type\": \"output\",\n \"metadata_value\":
context,\n \"metadata_time\": datetime.datetime.now().isoformat(),\n })\n\nimport
argparse\n_parser = argparse.ArgumentParser(prog=''Same step 002 b7dbb05511774bd7bab385301d43b388
fn'', description='''')\n_parser.add_argument(\"--input-context\", dest=\"input_context_path\",
type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--run-info\",
dest=\"run_info\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--metadata-url\",
dest=\"metadata_url\", type=str, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"--output-context\",
dest=\"output_context_path\", type=_make_parent_dirs_and_return_path, required=True,
default=argparse.SUPPRESS)\n_parsed_args = vars(_parser.parse_args())\n\n_outputs
= same_step_002_b7dbb05511774bd7bab385301d43b388_fn(**_parsed_args)\n"],
"image": "library/python:3.9-slim-buster"}}, "inputs": [{"name": "input_context",
"type": "String"}, {"default": "gAR9lC4=", "name": "run_info", "optional":
true}, {"default": "", "name": "metadata_url", "optional": true}], "name":
"Same step 002 b7dbb05511774bd7bab385301d43b388 fn", "outputs": [{"name":
"output_context", "type": "String"}]}', pipelines.kubeflow.org/component_ref: '{}',
pipelines.kubeflow.org/arguments.parameters: '{"metadata_url": "{{inputs.parameters.metadata_url}}",
"run_info": "{{inputs.parameters.run-info-fn-run_info}}"}', pipelines.kubeflow.org/max_cache_staleness: P0D}
arguments:
parameters:
- {name: context, value: ''}
- {name: metadata_url, value: ''}
- {name: AWS_ACCESS_KEY_ID, value: minio}
- {name: AWS_SECRET_ACCESS_KEY, value: minio123}
- {name: MLFLOW_S3_ENDPOINT_URL, value: 'http://combinator-minio.mlflow.svc.cluster.local:9000'}
- {name: MLFLOW_TRACKING_URI, value: 'http://combinator-mlflow.mlflow.svc.cluster.local:5000'}
serviceAccountName: pipeline-runner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment